Cleaned up NOTE and TODO comments, fixed user-email mapping, removed commented print statements

Signed-off-by: CocoByte <nicolle.leclair@gmail.com>
This commit is contained in:
CocoByte 2023-10-04 20:48:42 -06:00
parent 25e24db99e
commit 1b0c261729
No known key found for this signature in database
GPG key ID: BBFAA2526384C97F

View file

@ -1,5 +1,6 @@
"""Load domain invitations for existing domains and their contacts.""" """Load domain invitations for existing domains and their contacts."""
import sys
import csv import csv
import logging import logging
import argparse import argparse
@ -115,29 +116,25 @@ class Command(BaseCommand):
def get_domain_user_dict(self, domain_statuses_filename, sep): def get_domain_user_dict(self, domain_statuses_filename, sep):
"""Creates a mapping of domain name -> status""" """Creates a mapping of domain name -> status"""
# TODO: figure out latest status
domain_status_dictionary = defaultdict(str) domain_status_dictionary = defaultdict(str)
# NOTE: how to determine "most recent" status?
logger.info("Reading domain statuses data file %s", domain_statuses_filename) logger.info("Reading domain statuses data file %s", domain_statuses_filename)
with open(domain_statuses_filename, "r") as domain_statuses_file: # noqa with open(domain_statuses_filename, "r") as domain_statuses_file: # noqa
for row in csv.reader(domain_statuses_file, delimiter=sep): for row in csv.reader(domain_statuses_file, delimiter=sep):
domainName = row[0].lower() domainName = row[0].lower()
domainStatus = row[1].lower() domainStatus = row[1].lower()
# print("adding "+domainName+", "+domainStatus)
domain_status_dictionary[domainName] = domainStatus domain_status_dictionary[domainName] = domainStatus
logger.info("Loaded statuses for %d domains", len(domain_status_dictionary)) logger.info("Loaded statuses for %d domains", len(domain_status_dictionary))
return domain_status_dictionary return domain_status_dictionary
def get_user_emails_dict(self, contacts_filename, sep): def get_user_emails_dict(self, contacts_filename, sep):
"""Creates mapping of userId -> emails""" """Creates mapping of userId -> emails"""
# NOTE: is this one to many??
user_emails_dictionary = defaultdict(list) user_emails_dictionary = defaultdict(list)
logger.info("Reading domain-contacts data file %s", contacts_filename) logger.info("Reading domain-contacts data file %s", contacts_filename)
with open(contacts_filename, "r") as contacts_file: with open(contacts_filename, "r") as contacts_file:
for row in csv.reader(contacts_file, delimiter=sep): for row in csv.reader(contacts_file, delimiter=sep):
userId = row[0] user_id = row[0]
user_email = row[6] user_email = row[6]
user_emails_dictionary[userId].append(user_email) user_emails_dictionary[user_id] = user_email
logger.info("Loaded emails for %d users", len(user_emails_dictionary)) logger.info("Loaded emails for %d users", len(user_emails_dictionary))
return user_emails_dictionary return user_emails_dictionary
@ -154,13 +151,13 @@ class Command(BaseCommand):
"created": TransitionDomain.StatusChoices.READY, "created": TransitionDomain.StatusChoices.READY,
"ok": TransitionDomain.StatusChoices.READY, "ok": TransitionDomain.StatusChoices.READY,
} }
return status_maps[status_to_map] return status_maps.get(status_to_map)
def print_summary_duplications( def print_summary_duplications(
self, duplicate_domain_user_combos, duplicate_domains, users_without_email self, duplicate_domain_user_combos, duplicate_domains, users_without_email
): ):
totalDupDomainUserPairs = len(duplicate_domain_user_combos) total_duplicate_pairs = len(duplicate_domain_user_combos)
totalDupDomains = len(duplicate_domains) total_duplicate_domains = len(duplicate_domains)
total_users_without_email = len(users_without_email) total_users_without_email = len(users_without_email)
if total_users_without_email > 0: if total_users_without_email > 0:
logger.warning( logger.warning(
@ -168,11 +165,11 @@ class Command(BaseCommand):
", ".join(map(str, users_without_email)) ", ".join(map(str, users_without_email))
) )
) )
if totalDupDomainUserPairs > 0 or totalDupDomains > 0: if total_duplicate_pairs > 0 or total_duplicate_domains > 0:
temp_dupPairsAsString = "{}".format( duplicate_pairs_as_string = "{}".format(
", ".join(map(str, duplicate_domain_user_combos)) ", ".join(map(str, duplicate_domain_user_combos))
) )
temp_dupDomainsAsString = "{}".format( duplicate_domains_as_string = "{}".format(
", ".join(map(str, duplicate_domains)) ", ".join(map(str, duplicate_domains))
) )
logger.warning( logger.warning(
@ -180,15 +177,15 @@ class Command(BaseCommand):
----DUPLICATES FOUND----- ----DUPLICATES FOUND-----
{totalDupDomainUserPairs} DOMAIN - USER pairs {total_duplicate_pairs} DOMAIN - USER pairs
were NOT unique in the supplied data files; were NOT unique in the supplied data files;
{temp_dupPairsAsString} {duplicate_pairs_as_string}
{totalDupDomains} DOMAINS were NOT unique in {total_duplicate_domains} DOMAINS were NOT unique in
the supplied data files; the supplied data files;
{temp_dupDomainsAsString} {duplicate_domains_as_string}
{termColors.ENDC}""" {termColors.ENDC}"""
) )
@ -196,7 +193,7 @@ class Command(BaseCommand):
total_domains_without_status = len(domains_without_status) total_domains_without_status = len(domains_without_status)
total_outlier_statuses = len(outlier_statuses) total_outlier_statuses = len(outlier_statuses)
if total_domains_without_status > 0: if total_domains_without_status > 0:
temp_arrayToString = "{}".format( domains_without_status_as_string = "{}".format(
", ".join(map(str, domains_without_status)) ", ".join(map(str, domains_without_status))
) )
logger.warning( logger.warning(
@ -207,12 +204,12 @@ class Command(BaseCommand):
without a status (defaulted to READY) without a status (defaulted to READY)
--------------------------------------------- ---------------------------------------------
{temp_arrayToString} {domains_without_status_as_string}
{termColors.ENDC}""" {termColors.ENDC}"""
) )
if total_outlier_statuses > 0: if total_outlier_statuses > 0:
temp_arrayToString = "{}".format( domains_without_status_as_string = "{}".format(
", ".join(map(str, outlier_statuses)) ", ".join(map(str, outlier_statuses))
) # noqa ) # noqa
logger.warning( logger.warning(
@ -226,7 +223,7 @@ class Command(BaseCommand):
No mappings found for the following statuses No mappings found for the following statuses
(defaulted to Ready): (defaulted to Ready):
{temp_arrayToString} {domains_without_status_as_string}
{termColors.ENDC}""" {termColors.ENDC}"""
) )
@ -242,14 +239,14 @@ class Command(BaseCommand):
sep = options.get("sep") sep = options.get("sep")
if options.get("resetTable"): if options.get("resetTable"):
confirmReset = query_yes_no( confirm_reset = query_yes_no(
f""" f"""
{termColors.FAIL} {termColors.FAIL}
WARNING: Resetting the table will permanently delete all WARNING: Resetting the table will permanently delete all
the data! the data!
Are you sure you want to continue?{termColors.ENDC}""" Are you sure you want to continue?{termColors.ENDC}"""
) )
if confirmReset: if confirm_reset:
logger.info( logger.info(
f"""{termColors.YELLOW} f"""{termColors.YELLOW}
----------Clearing Table Data---------- ----------Clearing Table Data----------
@ -276,11 +273,8 @@ class Command(BaseCommand):
user_emails_dictionary = self.get_user_emails_dict(contacts_filename, sep) user_emails_dictionary = self.get_user_emails_dict(contacts_filename, sep)
# STEP 3: # STEP 3:
# TODO: Need to add logic for conflicting domain status # Parse the domain_contacts file and create TransitionDomain objects,
# entries # using the dictionaries from steps 1 & 2 to lookup needed information.
# (which should not exist, but might)
# TODO: log statuses found that don't map to the ones
# we have (count occurences)
to_create = [] to_create = []
@ -318,8 +312,7 @@ class Command(BaseCommand):
new_entry_status = TransitionDomain.StatusChoices.READY new_entry_status = TransitionDomain.StatusChoices.READY
new_entry_email = "" new_entry_email = ""
new_entry_emailSent = False new_entry_emailSent = False # set to False by default
# TODO: how to know if e-mail was sent?
if new_entry_domainName not in domain_status_dictionary: if new_entry_domainName not in domain_status_dictionary:
# this domain has no status...default to "Create" # this domain has no status...default to "Create"
@ -327,7 +320,6 @@ class Command(BaseCommand):
domains_without_status.append(new_entry_domainName) domains_without_status.append(new_entry_domainName)
else: else:
original_status = domain_status_dictionary[new_entry_domainName] original_status = domain_status_dictionary[new_entry_domainName]
# print(originalStatus)
mapped_status = self.get_mapped_status(original_status) mapped_status = self.get_mapped_status(original_status)
if mapped_status is None: if mapped_status is None:
logger.info("Unknown status: " + original_status) logger.info("Unknown status: " + original_status)
@ -349,11 +341,11 @@ class Command(BaseCommand):
# However, track duplicate domains for now, # However, track duplicate domains for now,
# since we are still deciding on whether # since we are still deciding on whether
# to make this field unique or not. ~10/25/2023 # to make this field unique or not. ~10/25/2023
tempEntry_domain = next( existing_domain = next(
(x for x in to_create if x.domain_name == new_entry_domainName), (x for x in to_create if x.domain_name == new_entry_domainName),
None, None,
) )
tempEntry_domainUserPair = next( existing_domain_user_pair = next(
( (
x x
for x in to_create for x in to_create
@ -362,57 +354,57 @@ class Command(BaseCommand):
), ),
None, None,
) )
if tempEntry_domain is not None: if existing_domain is not None:
if debug_on: if debug_on:
logger.info( logger.info(
f"{termColors.YELLOW} DUPLICATE Verisign entries found for domain: {new_entry_domainName} {termColors.ENDC}" # noqa f"{termColors.YELLOW} DUPLICATE Verisign entries found for domain: {new_entry_domainName} {termColors.ENDC}" # noqa
) )
if new_entry_domainName not in duplicate_domains: if new_entry_domainName not in duplicate_domains:
duplicate_domains.append(new_entry_domainName) duplicate_domains.append(new_entry_domainName)
if tempEntry_domainUserPair is not None: if existing_domain_user_pair is not None:
if debug_on: if debug_on:
logger.info( logger.info(
f"""{termColors.YELLOW} DUPLICATE Verisign entries found for domain - user {termColors.BackgroundLightYellow} PAIR {termColors.ENDC}{termColors.YELLOW}: f"""{termColors.YELLOW} DUPLICATE Verisign entries found for domain - user {termColors.BackgroundLightYellow} PAIR {termColors.ENDC}{termColors.YELLOW}:
{new_entry_domainName} - {new_entry_email} {termColors.ENDC}""" # noqa {new_entry_domainName} - {new_entry_email} {termColors.ENDC}""" # noqa
) )
if tempEntry_domainUserPair not in duplicate_domain_user_combos: if existing_domain_user_pair not in duplicate_domain_user_combos:
duplicate_domain_user_combos.append(tempEntry_domainUserPair) duplicate_domain_user_combos.append(existing_domain_user_pair)
else: else:
try: try:
existingEntry = TransitionDomain.objects.get( existing_entry = TransitionDomain.objects.get(
username=new_entry_email, domain_name=new_entry_domainName username=new_entry_email, domain_name=new_entry_domainName
) )
if existingEntry.status != new_entry_status: if existing_entry.status != new_entry_status:
# DEBUG: # DEBUG:
if debug_on: if debug_on:
logger.info( logger.info(
f"{termColors.OKCYAN}" f"{termColors.OKCYAN}"
f"Updating entry: {existingEntry}" f"Updating entry: {existing_entry}"
f"Status: {existingEntry.status} > {new_entry_status}" f"Status: {existing_entry.status} > {new_entry_status}"
f"Email Sent: {existingEntry.email_sent} > {new_entry_emailSent}" f"Email Sent: {existing_entry.email_sent} > {new_entry_emailSent}"
f"{termColors.ENDC}" f"{termColors.ENDC}"
) )
existingEntry.status = new_entry_status existing_entry.status = new_entry_status
existingEntry.email_sent = new_entry_emailSent existing_entry.email_sent = new_entry_emailSent
existingEntry.save() existing_entry.save()
except TransitionDomain.DoesNotExist: except TransitionDomain.DoesNotExist:
# no matching entry, make one # no matching entry, make one
newEntry = TransitionDomain( new_entry = TransitionDomain(
username=new_entry_email, username=new_entry_email,
domain_name=new_entry_domainName, domain_name=new_entry_domainName,
status=new_entry_status, status=new_entry_status,
email_sent=new_entry_emailSent, email_sent=new_entry_emailSent,
) )
to_create.append(newEntry) to_create.append(new_entry)
total_new_entries += 1 total_new_entries += 1
# DEBUG: # DEBUG:
if debug_on: if debug_on:
logger.info( logger.info(
f"{termColors.OKCYAN} Adding entry {total_new_entries}: {newEntry} {termColors.ENDC}" # noqa f"{termColors.OKCYAN} Adding entry {total_new_entries}: {new_entry} {termColors.ENDC}" # noqa
) )
except TransitionDomain.MultipleObjectsReturned: except TransitionDomain.MultipleObjectsReturned:
logger.info( logger.info(
@ -422,8 +414,6 @@ class Command(BaseCommand):
f"{new_entry_domainName}" f"{new_entry_domainName}"
f"----------TERMINATING----------" f"----------TERMINATING----------"
) )
import sys
sys.exit() sys.exit()
# DEBUG: # DEBUG: