diff --git a/src/registrar/management/commands/load_transition_domain.py b/src/registrar/management/commands/load_transition_domain.py index 799a87b62..138e16105 100644 --- a/src/registrar/management/commands/load_transition_domain.py +++ b/src/registrar/management/commands/load_transition_domain.py @@ -1,5 +1,6 @@ """Load domain invitations for existing domains and their contacts.""" +import sys import csv import logging import argparse @@ -115,29 +116,25 @@ class Command(BaseCommand): def get_domain_user_dict(self, domain_statuses_filename, sep): """Creates a mapping of domain name -> status""" - # TODO: figure out latest status domain_status_dictionary = defaultdict(str) - # NOTE: how to determine "most recent" status? logger.info("Reading domain statuses data file %s", domain_statuses_filename) with open(domain_statuses_filename, "r") as domain_statuses_file: # noqa for row in csv.reader(domain_statuses_file, delimiter=sep): domainName = row[0].lower() domainStatus = row[1].lower() - # print("adding "+domainName+", "+domainStatus) domain_status_dictionary[domainName] = domainStatus logger.info("Loaded statuses for %d domains", len(domain_status_dictionary)) return domain_status_dictionary def get_user_emails_dict(self, contacts_filename, sep): """Creates mapping of userId -> emails""" - # NOTE: is this one to many?? user_emails_dictionary = defaultdict(list) logger.info("Reading domain-contacts data file %s", contacts_filename) with open(contacts_filename, "r") as contacts_file: for row in csv.reader(contacts_file, delimiter=sep): - userId = row[0] + user_id = row[0] user_email = row[6] - user_emails_dictionary[userId].append(user_email) + user_emails_dictionary[user_id] = user_email logger.info("Loaded emails for %d users", len(user_emails_dictionary)) return user_emails_dictionary @@ -154,13 +151,13 @@ class Command(BaseCommand): "created": TransitionDomain.StatusChoices.READY, "ok": TransitionDomain.StatusChoices.READY, } - return status_maps[status_to_map] + return status_maps.get(status_to_map) def print_summary_duplications( self, duplicate_domain_user_combos, duplicate_domains, users_without_email ): - totalDupDomainUserPairs = len(duplicate_domain_user_combos) - totalDupDomains = len(duplicate_domains) + total_duplicate_pairs = len(duplicate_domain_user_combos) + total_duplicate_domains = len(duplicate_domains) total_users_without_email = len(users_without_email) if total_users_without_email > 0: logger.warning( @@ -168,11 +165,11 @@ class Command(BaseCommand): ", ".join(map(str, users_without_email)) ) ) - if totalDupDomainUserPairs > 0 or totalDupDomains > 0: - temp_dupPairsAsString = "{}".format( + if total_duplicate_pairs > 0 or total_duplicate_domains > 0: + duplicate_pairs_as_string = "{}".format( ", ".join(map(str, duplicate_domain_user_combos)) ) - temp_dupDomainsAsString = "{}".format( + duplicate_domains_as_string = "{}".format( ", ".join(map(str, duplicate_domains)) ) logger.warning( @@ -180,15 +177,15 @@ class Command(BaseCommand): ----DUPLICATES FOUND----- - {totalDupDomainUserPairs} DOMAIN - USER pairs + {total_duplicate_pairs} DOMAIN - USER pairs were NOT unique in the supplied data files; - {temp_dupPairsAsString} + {duplicate_pairs_as_string} - {totalDupDomains} DOMAINS were NOT unique in + {total_duplicate_domains} DOMAINS were NOT unique in the supplied data files; - {temp_dupDomainsAsString} + {duplicate_domains_as_string} {termColors.ENDC}""" ) @@ -196,7 +193,7 @@ class Command(BaseCommand): total_domains_without_status = len(domains_without_status) total_outlier_statuses = len(outlier_statuses) if total_domains_without_status > 0: - temp_arrayToString = "{}".format( + domains_without_status_as_string = "{}".format( ", ".join(map(str, domains_without_status)) ) logger.warning( @@ -207,12 +204,12 @@ class Command(BaseCommand): without a status (defaulted to READY) --------------------------------------------- - {temp_arrayToString} + {domains_without_status_as_string} {termColors.ENDC}""" ) if total_outlier_statuses > 0: - temp_arrayToString = "{}".format( + domains_without_status_as_string = "{}".format( ", ".join(map(str, outlier_statuses)) ) # noqa logger.warning( @@ -226,7 +223,7 @@ class Command(BaseCommand): No mappings found for the following statuses (defaulted to Ready): - {temp_arrayToString} + {domains_without_status_as_string} {termColors.ENDC}""" ) @@ -242,14 +239,14 @@ class Command(BaseCommand): sep = options.get("sep") if options.get("resetTable"): - confirmReset = query_yes_no( + confirm_reset = query_yes_no( f""" {termColors.FAIL} WARNING: Resetting the table will permanently delete all the data! Are you sure you want to continue?{termColors.ENDC}""" ) - if confirmReset: + if confirm_reset: logger.info( f"""{termColors.YELLOW} ----------Clearing Table Data---------- @@ -276,11 +273,8 @@ class Command(BaseCommand): user_emails_dictionary = self.get_user_emails_dict(contacts_filename, sep) # STEP 3: - # TODO: Need to add logic for conflicting domain status - # entries - # (which should not exist, but might) - # TODO: log statuses found that don't map to the ones - # we have (count occurences) + # Parse the domain_contacts file and create TransitionDomain objects, + # using the dictionaries from steps 1 & 2 to lookup needed information. to_create = [] @@ -318,8 +312,7 @@ class Command(BaseCommand): new_entry_status = TransitionDomain.StatusChoices.READY new_entry_email = "" - new_entry_emailSent = False - # TODO: how to know if e-mail was sent? + new_entry_emailSent = False # set to False by default if new_entry_domainName not in domain_status_dictionary: # this domain has no status...default to "Create" @@ -327,7 +320,6 @@ class Command(BaseCommand): domains_without_status.append(new_entry_domainName) else: original_status = domain_status_dictionary[new_entry_domainName] - # print(originalStatus) mapped_status = self.get_mapped_status(original_status) if mapped_status is None: logger.info("Unknown status: " + original_status) @@ -349,11 +341,11 @@ class Command(BaseCommand): # However, track duplicate domains for now, # since we are still deciding on whether # to make this field unique or not. ~10/25/2023 - tempEntry_domain = next( + existing_domain = next( (x for x in to_create if x.domain_name == new_entry_domainName), None, ) - tempEntry_domainUserPair = next( + existing_domain_user_pair = next( ( x for x in to_create @@ -362,57 +354,57 @@ class Command(BaseCommand): ), None, ) - if tempEntry_domain is not None: + if existing_domain is not None: if debug_on: logger.info( f"{termColors.YELLOW} DUPLICATE Verisign entries found for domain: {new_entry_domainName} {termColors.ENDC}" # noqa ) if new_entry_domainName not in duplicate_domains: duplicate_domains.append(new_entry_domainName) - if tempEntry_domainUserPair is not None: + if existing_domain_user_pair is not None: if debug_on: logger.info( f"""{termColors.YELLOW} DUPLICATE Verisign entries found for domain - user {termColors.BackgroundLightYellow} PAIR {termColors.ENDC}{termColors.YELLOW}: {new_entry_domainName} - {new_entry_email} {termColors.ENDC}""" # noqa ) - if tempEntry_domainUserPair not in duplicate_domain_user_combos: - duplicate_domain_user_combos.append(tempEntry_domainUserPair) + if existing_domain_user_pair not in duplicate_domain_user_combos: + duplicate_domain_user_combos.append(existing_domain_user_pair) else: try: - existingEntry = TransitionDomain.objects.get( + existing_entry = TransitionDomain.objects.get( username=new_entry_email, domain_name=new_entry_domainName ) - if existingEntry.status != new_entry_status: + if existing_entry.status != new_entry_status: # DEBUG: if debug_on: logger.info( f"{termColors.OKCYAN}" - f"Updating entry: {existingEntry}" - f"Status: {existingEntry.status} > {new_entry_status}" - f"Email Sent: {existingEntry.email_sent} > {new_entry_emailSent}" + f"Updating entry: {existing_entry}" + f"Status: {existing_entry.status} > {new_entry_status}" + f"Email Sent: {existing_entry.email_sent} > {new_entry_emailSent}" f"{termColors.ENDC}" ) - existingEntry.status = new_entry_status + existing_entry.status = new_entry_status - existingEntry.email_sent = new_entry_emailSent - existingEntry.save() + existing_entry.email_sent = new_entry_emailSent + existing_entry.save() except TransitionDomain.DoesNotExist: # no matching entry, make one - newEntry = TransitionDomain( + new_entry = TransitionDomain( username=new_entry_email, domain_name=new_entry_domainName, status=new_entry_status, email_sent=new_entry_emailSent, ) - to_create.append(newEntry) + to_create.append(new_entry) total_new_entries += 1 # DEBUG: if debug_on: logger.info( - f"{termColors.OKCYAN} Adding entry {total_new_entries}: {newEntry} {termColors.ENDC}" # noqa + f"{termColors.OKCYAN} Adding entry {total_new_entries}: {new_entry} {termColors.ENDC}" # noqa ) except TransitionDomain.MultipleObjectsReturned: logger.info( @@ -422,8 +414,6 @@ class Command(BaseCommand): f"{new_entry_domainName}" f"----------TERMINATING----------" ) - import sys - sys.exit() # DEBUG: