diff --git a/docs/operations/data_migration.md b/docs/operations/data_migration.md index c4d2c150c..a77266282 100644 --- a/docs/operations/data_migration.md +++ b/docs/operations/data_migration.md @@ -240,7 +240,23 @@ This will allow Docker to mount the files to a container (under `/app`) for our ### STEP 1: Load Transition Domains Run the following command, making sure the file paths point to the right location. This will parse the three given files and load the information into the TransitionDomain table. - +##### Create a JSON file +In your chosen directory (either `src/tmp` or `src/migrationdata` depending on preference), create a json file called `migrationFilepaths.json`. This file will map to other urls +Example +``` +{ + "directory": "migrationdata/", + "agency_adhoc_filename": "20231009.agency.adhoc.dotgov.txt", + "authority_adhoc_filename": "authority.adhoc.dotgov.txt", + "contacts_filename": "escrow_contacts.daily.dotgov.GOV.txt", + "domain_adhoc_filename": "20231009.domaintypes.adhoc.dotgov.txt", + "domain_additional_filename": "20231009.domainadditionaldatalink.adhoc.dotgov.txt", + "domain_contacts_filename": "escrow_domain_contacts.daily.dotgov.GOV.txt", + "domain_escrow_filename": "escrow_domains.daily.dotgov.GOV.txt", + "domain_statuses_filename": "escrow_domain_statuses.daily.dotgov.GOV.txt", + "organization_adhoc_filename": "20231009.organization.adhoc.dotgov.txt" +} +``` ##### LOCAL COMMAND ```shell docker-compose exec app ./manage.py load_transition_domain migrationFilepaths.json --directory /app/tmp/ --debug --limitParse 10 diff --git a/src/registrar/management/commands/agency_data_extractor.py b/src/registrar/management/commands/agency_data_extractor.py index d5b304a8a..1f0d6e0da 100644 --- a/src/registrar/management/commands/agency_data_extractor.py +++ b/src/registrar/management/commands/agency_data_extractor.py @@ -17,6 +17,7 @@ logger = logging.getLogger(__name__) # Example command for running this script: # docker compose run -T app ./manage.py agency_data_extractor 20231009.agency.adhoc.dotgov.txt --dir /app/tmp --debug + class Command(BaseCommand): help = """Loads data for domains that are in transition (populates transition_domain model objects).""" @@ -26,40 +27,42 @@ class Command(BaseCommand): parser.add_argument( "agency_data_filename", help="Data file with agency information" ) - parser.add_argument( - "--dir", default="migrationdata", help="Desired directory" - ) + parser.add_argument("--dir", default="migrationdata", help="Desired directory") parser.add_argument("--sep", default="|", help="Delimiter character") - parser.add_argument("--debug", help="Prints additional debug statements to the terminal", action=argparse.BooleanOptionalAction) + parser.add_argument( + "--debug", + help="Prints additional debug statements to the terminal", + action=argparse.BooleanOptionalAction, + ) parser.add_argument("--prompt", action=argparse.BooleanOptionalAction) @staticmethod - def extract_agencies( - agency_data_filepath: str, - sep: str, - debug: bool - ) -> [str]: - """Extracts all the agency names from the provided + def extract_agencies(agency_data_filepath: str, sep: str, debug: bool) -> [str]: + """Extracts all the agency names from the provided agency file (skips any duplicates) and returns those names in an array""" agency_names = [] - logger.info(f"{TerminalColors.OKCYAN}Reading agency data file {agency_data_filepath}{TerminalColors.ENDC}") + logger.info( + f"{TerminalColors.OKCYAN}Reading agency data file {agency_data_filepath}{TerminalColors.ENDC}" + ) with open(agency_data_filepath, "r") as agency_data_filepath: # noqa for row in csv.reader(agency_data_filepath, delimiter=sep): agency_name = row[1] TerminalHelper.print_conditional(debug, f"Checking: {agency_name}") if agency_name not in agency_names: agency_names.append(agency_name) - logger.info(f"{TerminalColors.OKCYAN}Checked {len(agency_names)} agencies{TerminalColors.ENDC}") + logger.info( + f"{TerminalColors.OKCYAN}Checked {len(agency_names)} agencies{TerminalColors.ENDC}" + ) return agency_names - + @staticmethod - def compare_agency_lists(provided_agencies: [str], - existing_agencies: [str], - debug: bool): + def compare_agency_lists( + provided_agencies: [str], existing_agencies: [str], debug: bool + ): """ - Compares new_agencies with existing_agencies and + Compares new_agencies with existing_agencies and provides the equivalent of an outer-join on the two (printed to the terminal) """ @@ -69,27 +72,37 @@ class Command(BaseCommand): for agency in provided_agencies: if agency not in existing_agencies and agency not in new_agencies: new_agencies.append(agency) - TerminalHelper.print_conditional(debug, f"{TerminalColors.YELLOW}Found new agency: {agency}{TerminalColors.ENDC}") + TerminalHelper.print_conditional( + debug, + f"{TerminalColors.YELLOW}Found new agency: {agency}{TerminalColors.ENDC}", + ) possibly_unused_agencies = [] # 2 - Get all new agencies that we don't already have (We might want to ADD these to our list) for agency in existing_agencies: - if agency not in provided_agencies and agency not in possibly_unused_agencies: + if ( + agency not in provided_agencies + and agency not in possibly_unused_agencies + ): possibly_unused_agencies.append(agency) - TerminalHelper.print_conditional(debug, f"{TerminalColors.YELLOW}Possibly unused agency detected: {agency}{TerminalColors.ENDC}") + TerminalHelper.print_conditional( + debug, + f"{TerminalColors.YELLOW}Possibly unused agency detected: {agency}{TerminalColors.ENDC}", + ) matched_agencies = [] for agency in provided_agencies: if agency in existing_agencies: matched_agencies.append(agency) - TerminalHelper.print_conditional(debug, f"{TerminalColors.YELLOW}Matched agencies: {agency}{TerminalColors.ENDC}") + TerminalHelper.print_conditional( + debug, + f"{TerminalColors.YELLOW}Matched agencies: {agency}{TerminalColors.ENDC}", + ) # Print the summary of findings # 1 - Print the list of agencies in the NEW list, which we do not already have # 2 - Print the list of agencies that we currently have, which are NOT in the new list (these might be eligible for removal?) TODO: would we ever want to remove existing agencies? - new_agencies_as_string = "{}".format( - ",\n ".join(map(str, new_agencies)) - ) + new_agencies_as_string = "{}".format(",\n ".join(map(str, new_agencies))) possibly_unused_agencies_as_string = "{}".format( ",\n ".join(map(str, possibly_unused_agencies)) ) @@ -97,7 +110,8 @@ class Command(BaseCommand): ",\n ".join(map(str, matched_agencies)) ) - logger.info(f""" + logger.info( + f""" {TerminalColors.OKGREEN} ======================== SUMMARY OF FINDINGS ============================ {len(provided_agencies)} AGENCIES WERE PROVIDED in the agency file. @@ -117,13 +131,12 @@ class Command(BaseCommand): These agencies are in our system, but not in the provided agency file: {TerminalColors.YELLOW}{possibly_unused_agencies_as_string} {TerminalColors.ENDC} - """) - + """ + ) + @staticmethod def print_agency_list(agencies, filename): - full_agency_list_as_string = "{}".format( - ",\n".join(map(str, agencies)) - ) + full_agency_list_as_string = "{}".format(",\n".join(map(str, agencies))) logger.info( f"\n{TerminalColors.YELLOW}" f"\n{full_agency_list_as_string}" @@ -146,19 +159,22 @@ class Command(BaseCommand): prompt = options.get("prompt") dir = options.get("dir") - agency_data_file = dir+"/"+agency_data_filename + agency_data_file = dir + "/" + agency_data_filename new_agencies = self.extract_agencies(agency_data_file, sep, debug) hard_coded_agencies = DomainApplication.AGENCIES - transition_domain_agencies = TransitionDomain.objects.all().values_list('federal_agency', flat=True).distinct() + transition_domain_agencies = ( + TransitionDomain.objects.all() + .values_list("federal_agency", flat=True) + .distinct() + ) print(transition_domain_agencies) - merged_agencies = new_agencies for agency in hard_coded_agencies: if agency not in merged_agencies: merged_agencies.append(agency) - + merged_transition_agencies = new_agencies for agency in transition_domain_agencies: if agency not in merged_transition_agencies: @@ -168,73 +184,90 @@ class Command(BaseCommand): # OPTION to compare the agency file to our hard-coded list if prompt: - prompt_successful = TerminalHelper.query_yes_no(f"\n\n{TerminalColors.FAIL}Check {agency_data_filename} against our (hard-coded) dropdown list of agencies?{TerminalColors.ENDC}") + prompt_successful = TerminalHelper.query_yes_no( + f"\n\n{TerminalColors.FAIL}Check {agency_data_filename} against our (hard-coded) dropdown list of agencies?{TerminalColors.ENDC}" + ) if prompt_successful or not prompt: self.compare_agency_lists(new_agencies, hard_coded_agencies, debug) - + # OPTION to compare the agency file to Transition Domains if prompt: - prompt_successful = TerminalHelper.query_yes_no(f"\n\n{TerminalColors.FAIL}Check {agency_data_filename} against Transition Domain contents?{TerminalColors.ENDC}") + prompt_successful = TerminalHelper.query_yes_no( + f"\n\n{TerminalColors.FAIL}Check {agency_data_filename} against Transition Domain contents?{TerminalColors.ENDC}" + ) if prompt_successful or not prompt: self.compare_agency_lists(new_agencies, transition_domain_agencies, debug) # OPTION to print out the full list of agencies from the agency file if prompt: - prompt_successful = TerminalHelper.query_yes_no(f"\n\n{TerminalColors.FAIL}Would you like to print the full list of agencies from the given agency file?{TerminalColors.ENDC}") + prompt_successful = TerminalHelper.query_yes_no( + f"\n\n{TerminalColors.FAIL}Would you like to print the full list of agencies from the given agency file?{TerminalColors.ENDC}" + ) if prompt_successful or not prompt: logger.info( - f"\n{TerminalColors.OKGREEN}" - f"\n======================== FULL LIST OF IMPORTED AGENCIES ============================" - f"\nThese are all the agencies provided by the given agency file." - f"\n\n{len(new_agencies)} TOTAL\n\n" + f"\n{TerminalColors.OKGREEN}" + f"\n======================== FULL LIST OF IMPORTED AGENCIES ============================" + f"\nThese are all the agencies provided by the given agency file." + f"\n\n{len(new_agencies)} TOTAL\n\n" ) self.print_agency_list(new_agencies, "Imported_Agencies") - + # OPTION to print out the full list of agencies from the agency file if prompt: - prompt_successful = TerminalHelper.query_yes_no(f"{TerminalColors.FAIL}Would you like to print the full list of agencies from the dropdown?{TerminalColors.ENDC}") + prompt_successful = TerminalHelper.query_yes_no( + f"{TerminalColors.FAIL}Would you like to print the full list of agencies from the dropdown?{TerminalColors.ENDC}" + ) if prompt_successful or not prompt: logger.info( - f"\n{TerminalColors.OKGREEN}" - f"\n======================== FULL LIST OF AGENCIES IN DROPDOWN ============================" - f"\nThese are all the agencies hard-coded in our system for the dropdown list." - f"\n\n{len(hard_coded_agencies)} TOTAL\n\n" + f"\n{TerminalColors.OKGREEN}" + f"\n======================== FULL LIST OF AGENCIES IN DROPDOWN ============================" + f"\nThese are all the agencies hard-coded in our system for the dropdown list." + f"\n\n{len(hard_coded_agencies)} TOTAL\n\n" ) self.print_agency_list(hard_coded_agencies, "Dropdown_Agencies") - - # OPTION to print out the full list of agencies from the agency file - if prompt: - prompt_successful = TerminalHelper.query_yes_no(f"{TerminalColors.FAIL}Would you like to print the full list of agencies from the dropdown?{TerminalColors.ENDC}") - if prompt_successful or not prompt: - logger.info( - f"\n{TerminalColors.OKGREEN}" - f"\n======================== FULL LIST OF AGENCIES IN TRANSITION DOMAIN ============================" - f"\nThese are all the agencies in the Transition Domains table." - f"\n\n{len(transition_domain_agencies)} TOTAL\n\n" - ) - self.print_agency_list(transition_domain_agencies, "Transition_Domain_Agencies") - # OPTION to print out the full list of agencies from the agency file if prompt: - prompt_successful = TerminalHelper.query_yes_no(f"{TerminalColors.FAIL}Would you like to print the MERGED list of agencies (dropdown + agency file)?{TerminalColors.ENDC}") + prompt_successful = TerminalHelper.query_yes_no( + f"{TerminalColors.FAIL}Would you like to print the full list of agencies from the dropdown?{TerminalColors.ENDC}" + ) if prompt_successful or not prompt: logger.info( - f"\n{TerminalColors.OKGREEN}" - f"\n======================== MERGED LISTS (dropdown + agency file) ============================" - f"\nThese are all the agencies our dropdown plus all the agencies in the agency file." - f"\n\n{len(merged_agencies)} TOTAL\n\n" + f"\n{TerminalColors.OKGREEN}" + f"\n======================== FULL LIST OF AGENCIES IN TRANSITION DOMAIN ============================" + f"\nThese are all the agencies in the Transition Domains table." + f"\n\n{len(transition_domain_agencies)} TOTAL\n\n" + ) + self.print_agency_list( + transition_domain_agencies, "Transition_Domain_Agencies" + ) + + # OPTION to print out the full list of agencies from the agency file + if prompt: + prompt_successful = TerminalHelper.query_yes_no( + f"{TerminalColors.FAIL}Would you like to print the MERGED list of agencies (dropdown + agency file)?{TerminalColors.ENDC}" + ) + if prompt_successful or not prompt: + logger.info( + f"\n{TerminalColors.OKGREEN}" + f"\n======================== MERGED LISTS (dropdown + agency file) ============================" + f"\nThese are all the agencies our dropdown plus all the agencies in the agency file." + f"\n\n{len(merged_agencies)} TOTAL\n\n" ) self.print_agency_list(merged_agencies, "Merged_Dropdown_Agency_List") - - # OPTION to print out the full list of agencies from the agency file + + # OPTION to print out the full list of agencies from the agency file if prompt: - prompt_successful = TerminalHelper.query_yes_no(f"{TerminalColors.FAIL}Would you like to print the MERGED list of agencies (dropdown + agency file)?{TerminalColors.ENDC}") + prompt_successful = TerminalHelper.query_yes_no( + f"{TerminalColors.FAIL}Would you like to print the MERGED list of agencies (dropdown + agency file)?{TerminalColors.ENDC}" + ) if prompt_successful or not prompt: logger.info( - f"\n{TerminalColors.OKGREEN}" - f"\n======================== MERGED LISTS (transition domain + agency file) ============================" - f"\nThese are all the agencies our transition domains table plus all the agencies in the agency file." - f"\n\n{len(merged_agencies)} TOTAL\n\n" + f"\n{TerminalColors.OKGREEN}" + f"\n======================== MERGED LISTS (transition domain + agency file) ============================" + f"\nThese are all the agencies our transition domains table plus all the agencies in the agency file." + f"\n\n{len(merged_agencies)} TOTAL\n\n" + ) + self.print_agency_list( + merged_transition_agencies, "Merged_Transition_Domain_Agency_List" ) - self.print_agency_list(merged_transition_agencies, "Merged_Transition_Domain_Agency_List") \ No newline at end of file diff --git a/src/registrar/management/commands/load_transition_domain.py b/src/registrar/management/commands/load_transition_domain.py index 3878a3366..b3902d57e 100644 --- a/src/registrar/management/commands/load_transition_domain.py +++ b/src/registrar/management/commands/load_transition_domain.py @@ -64,9 +64,9 @@ class Command(BaseCommand): ) parser.add_argument( - "--infer_filenames", + "--infer_filenames", action=argparse.BooleanOptionalAction, - help="Determines if we should infer filenames or not. Recommended to be enabled only in a development or testing setting." + help="Determines if we should infer filenames or not. Recommended to be enabled only in a development or testing setting.", ) parser.add_argument( @@ -74,7 +74,7 @@ class Command(BaseCommand): ) parser.add_argument( "--domain_contacts_filename", - help="Data file with domain contact information" + help="Data file with domain contact information", ) parser.add_argument( "--contacts_filename", @@ -82,7 +82,7 @@ class Command(BaseCommand): ) parser.add_argument( "--domain_statuses_filename", - help="Data file with domain status information" + help="Data file with domain status information", ) parser.add_argument( "--agency_adhoc_filename", @@ -347,10 +347,12 @@ class Command(BaseCommand): # If it does, update the options options[key] = value except Exception as err: - logger.error(f"""{TerminalColors.FAIL}There was an error loading the JSON responsible + logger.error( + f"""{TerminalColors.FAIL}There was an error loading the JSON responsible for providing filepaths. {TerminalColors.ENDC} - """) + """ + ) raise err sep = args.sep @@ -369,33 +371,36 @@ class Command(BaseCommand): ) # set to 0 to parse all entries ## Variables for Additional TransitionDomain Information ## - # Main script filenames - these do not have defaults domain_contacts_filename = None try: - domain_contacts_filename = directory + options.get("domain_contacts_filename") + domain_contacts_filename = directory + options.get( + "domain_contacts_filename" + ) except TypeError as err: logger.error( - f"Invalid filename of '{args.domain_contacts_filename}'" + f"Invalid filename of '{args.domain_contacts_filename}'" " was provided for domain_contacts_filename" ) - + contacts_filename = None try: contacts_filename = directory + options.get("contacts_filename") except TypeError as err: logger.error( - f"Invalid filename of '{args.contacts_filename}'" + f"Invalid filename of '{args.contacts_filename}'" " was provided for contacts_filename" ) domain_statuses_filename = None try: - domain_statuses_filename = directory + options.get("domain_statuses_filename") + domain_statuses_filename = directory + options.get( + "domain_statuses_filename" + ) except TypeError as err: logger.error( - f"Invalid filename of '{args.domain_statuses_filename}'" + f"Invalid filename of '{args.domain_statuses_filename}'" " was provided for domain_statuses_filename" ) @@ -468,7 +473,10 @@ class Command(BaseCommand): new_entry_email = "" new_entry_emailSent = False # set to False by default - TerminalHelper.print_conditional(True, f"Processing item {total_rows_parsed}: {new_entry_domain_name}") + TerminalHelper.print_conditional( + True, + f"Processing item {total_rows_parsed}: {new_entry_domain_name}", + ) # PART 1: Get the status if new_entry_domain_name not in domain_status_dictionary: @@ -608,7 +616,6 @@ class Command(BaseCommand): ) self.print_summary_status_findings(domains_without_status, outlier_statuses) - logger.info( f"""{TerminalColors.OKGREEN} ============= FINISHED =============== diff --git a/src/registrar/management/commands/master_domain_migrations.py b/src/registrar/management/commands/master_domain_migrations.py index 96bf0517b..b34a24375 100644 --- a/src/registrar/management/commands/master_domain_migrations.py +++ b/src/registrar/management/commands/master_domain_migrations.py @@ -28,8 +28,7 @@ logger = logging.getLogger(__name__) class Command(BaseCommand): - help = """ """ # TODO: update this! - + help = """ """ # TODO: update this! # ====================================================== # ================== ARGUMENTS =================== @@ -45,7 +44,7 @@ class Command(BaseCommand): The location of the files used for load_transition_domain migration script EXAMPLE USAGE: > --migrationDirectory /app/tmp - + --migrationJSON The name of the JSON file used for load_transition_domain migration script EXAMPLE USAGE: @@ -129,7 +128,6 @@ class Command(BaseCommand): action=argparse.BooleanOptionalAction, ) - # ====================================================== # =============== DATA ANALYSIS ================== # ====================================================== @@ -252,7 +250,6 @@ class Command(BaseCommand): """ ) - # ====================================================== # ================= MIGRATIONS =================== # ====================================================== @@ -266,17 +263,13 @@ class Command(BaseCommand): prompts_enabled: bool, debug_max_entries_to_parse: int, ): - if file_directory and file_directory[-1] != "/": file_directory += "/" json_filepath = migration_json_filename """Runs the load_transition_domain script""" # Create the command string command_script = "load_transition_domain" - command_string = ( - f"./manage.py {command_script} " - f"{json_filepath} " - ) + command_string = f"./manage.py {command_script} " f"{json_filepath} " if sep is not None and sep != "|": command_string += f"--sep {sep} " if reset_table: @@ -306,7 +299,7 @@ class Command(BaseCommand): resetTable=reset_table, debug=debug_on, limitParse=debug_max_entries_to_parse, - directory=file_directory + directory=file_directory, ) def run_transfer_script(self, debug_on: bool, prompts_enabled: bool): @@ -326,7 +319,7 @@ class Command(BaseCommand): ) # TODO: make this somehow run inside TerminalHelper prompt if proceed or not prompts_enabled: - call_command(command_script) + call_command(command_script) def run_send_invites_script(self, debug_on: bool, prompts_enabled: bool): """Runs the send_domain_invitations script""" diff --git a/src/registrar/management/commands/transfer_transition_domains_to_domains.py b/src/registrar/management/commands/transfer_transition_domains_to_domains.py index a4e81abf5..413745c61 100644 --- a/src/registrar/management/commands/transfer_transition_domains_to_domains.py +++ b/src/registrar/management/commands/transfer_transition_domains_to_domains.py @@ -39,7 +39,6 @@ class Command(BaseCommand): help="Sets max number of entries to load, set to 0 to load all entries", ) - # ====================================================== # ===================== PRINTING ====================== # ====================================================== @@ -66,13 +65,14 @@ class Command(BaseCommand): {TerminalColors.ENDC} """, ) - - def parse_limit_reached(self, - debug_max_entries_to_parse: bool, - total_rows_parsed: int - ) -> bool: - if (debug_max_entries_to_parse > 0 - and total_rows_parsed >= debug_max_entries_to_parse): + + def parse_limit_reached( + self, debug_max_entries_to_parse: bool, total_rows_parsed: int + ) -> bool: + if ( + debug_max_entries_to_parse > 0 + and total_rows_parsed >= debug_max_entries_to_parse + ): logger.info( f"""{TerminalColors.YELLOW} ----PARSE LIMIT REACHED. HALTING PARSER.---- @@ -81,7 +81,7 @@ class Command(BaseCommand): ) return True return False - + def print_summary_of_findings( self, domains_to_create, @@ -156,16 +156,15 @@ class Command(BaseCommand): """, ) - # ====================================================== # =================== DOMAIN ===================== # ====================================================== - def update_or_create_domain(self, - transition_domain: TransitionDomain, - debug_on: bool) -> (Domain, bool): - """ Given a transition domain, either finds & updates an existing + def update_or_create_domain( + self, transition_domain: TransitionDomain, debug_on: bool + ) -> (Domain, bool): + """Given a transition domain, either finds & updates an existing corresponding domain, or creates a new corresponding domain in - the Domain table. + the Domain table. Returns the corresponding Domain object and a boolean that is TRUE if that Domain was newly created. @@ -176,7 +175,7 @@ class Command(BaseCommand): transition_domain_status = transition_domain.status transition_domain_creation_date = transition_domain.epp_creation_date transition_domain_expiration_date = transition_domain.epp_expiration_date - + domain_exists = Domain.objects.filter(name=transition_domain_name).exists() if domain_exists: try: @@ -197,7 +196,7 @@ class Command(BaseCommand): transition_domain, target_domain, debug_on ) # TODO: not all domains need to be updated (the information is the same). Need to bubble this up to the final report. - + # update dates (creation and expiration) if transition_domain_creation_date is not None: # TODO: added this because I ran into a situation where the created_at date was null (violated a key constraint). How do we want to handle this case? @@ -257,7 +256,6 @@ class Command(BaseCommand): expiration_date=transition_domain_expiration_date, ) return (target_domain, True) - def update_domain_status( self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool @@ -289,7 +287,6 @@ class Command(BaseCommand): return True return False - # ====================================================== # ================ DOMAIN INVITATION ================== # ====================================================== @@ -336,23 +333,27 @@ class Command(BaseCommand): # ====================================================== # ================ DOMAIN INFORMATION ================= # ====================================================== - def update_domain_information(self, current: DomainInformation, target: DomainInformation, debug_on: bool) -> bool: + def update_domain_information( + self, current: DomainInformation, target: DomainInformation, debug_on: bool + ) -> bool: # DEBUG: TerminalHelper.print_conditional( debug_on, - (f"{TerminalColors.OKCYAN}" - f"Updating: {current}" - f"{TerminalColors.ENDC}"), # noqa + ( + f"{TerminalColors.OKCYAN}" + f"Updating: {current}" + f"{TerminalColors.ENDC}" + ), # noqa ) updated = False fields_to_update = [ - 'organization_type', - 'federal_type', - 'federal_agency', - "organization_name" - ] + "organization_type", + "federal_type", + "federal_agency", + "organization_name", + ] defaults = {field: getattr(target, field) for field in fields_to_update} if current != target: current = target @@ -360,14 +361,19 @@ class Command(BaseCommand): updated = True return updated - + def try_add_domain_information(self): pass - def create_new_domain_info(self, - transition_domain: TransitionDomain, - domain: Domain) -> DomainInformation: - + def create_new_domain_info( + self, + transition_domain: TransitionDomain, + domain: Domain, + agency_choices, + fed_choices, + org_choices, + debug_on, + ) -> DomainInformation: org_type = transition_domain.organization_type fed_type = transition_domain.federal_type fed_agency = transition_domain.federal_agency @@ -387,77 +393,95 @@ class Command(BaseCommand): org_type = ("city", "City") case "Independent Intrastate": org_type = ("special_district", "Special district") - - valid_org_type = org_type in [(name, value) for name, value in DomainApplication.OrganizationChoices.choices] - valid_fed_type = fed_type in [value for name, value in DomainApplication.BranchChoices.choices] - valid_fed_agency = fed_agency in DomainApplication.AGENCIES + + valid_org_type = org_type in org_choices + valid_fed_type = fed_type in fed_choices + valid_fed_agency = fed_agency in agency_choices default_creator, _ = User.objects.get_or_create(username="System") new_domain_info_data = { - 'domain': domain, - 'organization_name': transition_domain.organization_name, + "domain": domain, + "organization_name": transition_domain.organization_name, "creator": default_creator, } - + if valid_org_type: - new_domain_info_data['organization_type'] = org_type[0] - else: + new_domain_info_data["organization_type"] = org_type[0] + elif debug_on: logger.debug(f"No org type found on {domain.name}") if valid_fed_type: - new_domain_info_data['federal_type'] = fed_type.lower() - pass - else: + new_domain_info_data["federal_type"] = fed_type.lower() + elif debug_on: logger.debug(f"No federal type found on {domain.name}") if valid_fed_agency: - new_domain_info_data['federal_agency'] = fed_agency - else: + new_domain_info_data["federal_agency"] = fed_agency + elif debug_on: logger.debug(f"No federal agency found on {domain.name}") new_domain_info = DomainInformation(**new_domain_info_data) - # DEBUG: + # DEBUG: TerminalHelper.print_conditional( True, - (f"{TerminalColors.MAGENTA}" - f"Created Domain Information template for: {new_domain_info}" - f"{TerminalColors.ENDC}"), # noqa + ( + f"{TerminalColors.MAGENTA}" + f"Created Domain Information template for: {new_domain_info}" + f"{TerminalColors.ENDC}" + ), # noqa ) return new_domain_info - def update_or_create_domain_information(self, - transition_domain: TransitionDomain, - debug_on: bool) -> (DomainInformation, bool): - + def update_or_create_domain_information( + self, + transition_domain: TransitionDomain, + agency_choices, + fed_choices, + org_choices, + debug_on: bool, + ) -> (DomainInformation, bool): transition_domain_name = transition_domain.domain_name - + # Get associated domain domain_data = Domain.objects.filter(name=transition_domain.domain_name) if not domain_data.exists(): logger.warn( - f"{TerminalColors.FAIL}" - f"WARNING: No Domain exists for:" - f"{transition_domain_name}" - f"{TerminalColors.ENDC}\n" - ) + f"{TerminalColors.FAIL}" + f"WARNING: No Domain exists for:" + f"{transition_domain_name}" + f"{TerminalColors.ENDC}\n" + ) return (None, None, False) domain = domain_data.get() - template_domain_information = self.create_new_domain_info(transition_domain, domain) + template_domain_information = self.create_new_domain_info( + transition_domain, + domain, + agency_choices, + fed_choices, + org_choices, + debug_on, + ) target_domain_information = None - domain_information_exists = DomainInformation.objects.filter(domain__name=transition_domain_name).exists() + domain_information_exists = DomainInformation.objects.filter( + domain__name=transition_domain_name + ).exists() if domain_information_exists: try: # get the existing domain information object - target_domain_information = DomainInformation.objects.get(domain__name=transition_domain_name) + target_domain_information = DomainInformation.objects.get( + domain__name=transition_domain_name + ) # DEBUG: TerminalHelper.print_conditional( debug_on, - (f"{TerminalColors.FAIL}" - f"Found existing entry in Domain Information table for:" - f"{transition_domain_name}" - f"{TerminalColors.ENDC}"), # noqa + ( + f"{TerminalColors.FAIL}" + f"Found existing entry in Domain Information table for:" + f"{transition_domain_name}" + f"{TerminalColors.ENDC}" + ), # noqa ) # for existing entry, update the status to @@ -466,7 +490,7 @@ class Command(BaseCommand): target_domain_information, template_domain_information, debug_on ) # TODO: not all domains need to be updated (the information is the same). Need to bubble this up to the final report. - + return (target_domain_information, domain, False) except DomainInformation.MultipleObjectsReturned: # This should never happen (just like with the Domain Table). @@ -493,15 +517,15 @@ class Command(BaseCommand): # DEBUG: TerminalHelper.print_conditional( debug_on, - (f"{TerminalColors.OKCYAN}" - f"Adding domain information for: " - f"{transition_domain_name}" - f"{TerminalColors.ENDC}"), + ( + f"{TerminalColors.OKCYAN}" + f"Adding domain information for: " + f"{transition_domain_name}" + f"{TerminalColors.ENDC}" + ), ) return (target_domain_information, domain, True) - - # ====================================================== # ===================== HANDLE ======================== # ====================================================== @@ -536,7 +560,6 @@ class Command(BaseCommand): # domain invitations to ADD domain_invitations_to_create = [] - # if we are limiting our parse (for testing purposes, keep # track of total rows parsed) total_rows_parsed = 0 @@ -566,7 +589,7 @@ class Command(BaseCommand): TerminalHelper.print_conditional( debug_on, f"{TerminalColors.OKCYAN}" - "Processing Transition Domain: " + "Processing Transition Domain: " f"{transition_domain_name}, {transition_domain_status}, {transition_domain_email}" f", {transition_domain_creation_date}, {transition_domain_expiration_date}" f"{TerminalColors.ENDC}", # noqa @@ -574,7 +597,9 @@ class Command(BaseCommand): # ====================================================== # ====================== DOMAIN ======================= - target_domain, was_created = self.update_or_create_domain(transition_domain, debug_on) + target_domain, was_created = self.update_or_create_domain( + transition_domain, debug_on + ) debug_string = "" if target_domain is None: @@ -603,12 +628,12 @@ class Command(BaseCommand): # ---------------- UPDATED ---------------- updated_domain_entries.append(transition_domain.domain_name) debug_string = f"updated domain: {target_domain}" - + # DEBUG: TerminalHelper.print_conditional( debug_on, (f"{TerminalColors.OKCYAN} {debug_string} {TerminalColors.ENDC}"), - ) + ) # ====================================================== # ================ DOMAIN INVITATIONS ================== @@ -633,26 +658,36 @@ class Command(BaseCommand): if self.parse_limit_reached(debug_max_entries_to_parse, total_rows_parsed): break - # First, save all Domain objects to the database Domain.objects.bulk_create(domains_to_create) - #DomainInvitation.objects.bulk_create(domain_invitations_to_create) + # DomainInvitation.objects.bulk_create(domain_invitations_to_create) # TODO: this is to resolve an error where bulk_create # doesn't save to database in a way that invitation objects can - # utilize. + # utilize. # Then, create DomainInvitation objects for invitation in domain_invitations_to_create: - logger.info(f"Pairing invite to its domain...{invitation}") + if debug_on: + logger.info(f"Pairing invite to its domain...{invitation}") existing_domain = Domain.objects.filter(name=invitation.domain.name) # Make sure the related Domain object is saved if existing_domain.exists(): invitation.domain = existing_domain.get() else: # Raise an err for now - raise Exception(f"Domain {existing_domain} wants to be added but doesn't exist in the DB") + raise Exception( + f"Domain {existing_domain} wants to be added but doesn't exist in the DB" + ) invitation.save() + valid_org_choices = [ + (name, value) + for name, value in DomainApplication.OrganizationChoices.choices + ] + valid_fed_choices = [ + value for name, value in DomainApplication.BranchChoices.choices + ] + valid_agency_choices = DomainApplication.AGENCIES # ====================================================== # ================= DOMAIN INFORMATION ================= logger.info( @@ -661,31 +696,54 @@ class Command(BaseCommand): {TerminalColors.ENDC}""" ) for transition_domain in TransitionDomain.objects.all(): - target_domain_information, associated_domain, was_created = self.update_or_create_domain_information(transition_domain, debug_on) + ( + target_domain_information, + associated_domain, + was_created, + ) = self.update_or_create_domain_information( + transition_domain, + valid_agency_choices, + valid_fed_choices, + valid_org_choices, + debug_on, + ) debug_string = "" if target_domain_information is None: # ---------------- SKIPPED ---------------- skipped_domain_information_entries.append(target_domain_information) - debug_string = f"skipped domain information: {target_domain_information}" + debug_string = ( + f"skipped domain information: {target_domain_information}" + ) elif was_created: - # DEBUG: + # DEBUG: TerminalHelper.print_conditional( debug_on, - (f"{TerminalColors.OKCYAN}" - f"Checking duplicates for: {target_domain_information}" - f"{TerminalColors.ENDC}"), # noqa + ( + f"{TerminalColors.OKCYAN}" + f"Checking duplicates for: {target_domain_information}" + f"{TerminalColors.ENDC}" + ), # noqa ) # ---------------- DUPLICATE ---------------- # The unique key constraint does not allow multiple domain # information objects to share the same domain existing_domain_information_in_to_create = next( - (x for x in domain_information_to_create if x.domain.name == target_domain_information.domain.name), + ( + x + for x in domain_information_to_create + if x.domain.name == target_domain_information.domain.name + ), None, ) # TODO: this is redundant. Currently debugging....running into unique key constraint error.... - existing_domain_info = DomainInformation.objects.filter(domain__name=target_domain_information.domain.name).exists() - if existing_domain_information_in_to_create is not None or existing_domain_info: + existing_domain_info = DomainInformation.objects.filter( + domain__name=target_domain_information.domain.name + ).exists() + if ( + existing_domain_information_in_to_create is not None + or existing_domain_info + ): debug_string = f"""{TerminalColors.YELLOW} Duplicate Detected: {existing_domain_information_in_to_create}. Cannot add duplicate Domain Information object @@ -693,19 +751,23 @@ class Command(BaseCommand): else: # ---------------- CREATED ---------------- domain_information_to_create.append(target_domain_information) - debug_string = f"created domain information: {target_domain_information}" + debug_string = ( + f"created domain information: {target_domain_information}" + ) elif not was_created: # ---------------- UPDATED ---------------- updated_domain_information.append(target_domain_information) - debug_string = f"updated domain information: {target_domain_information}" + debug_string = ( + f"updated domain information: {target_domain_information}" + ) else: debug_string = f"domain information already exists and matches incoming data (NO CHANGES MADE): {target_domain_information}" - + # DEBUG: TerminalHelper.print_conditional( debug_on, (f"{TerminalColors.OKCYAN}{debug_string}{TerminalColors.ENDC}"), - ) + ) # ------------------ Parse limit reached? ------------------ # Check parse limit and exit loop if parse limit has been reached @@ -713,11 +775,13 @@ class Command(BaseCommand): break TerminalHelper.print_conditional( - debug_on, - (f"{TerminalColors.YELLOW}" - f"Trying to add: {domain_information_to_create}" - f"{TerminalColors.ENDC}"), - ) + debug_on, + ( + f"{TerminalColors.YELLOW}" + f"Trying to add: {domain_information_to_create}" + f"{TerminalColors.ENDC}" + ), + ) DomainInformation.objects.bulk_create(domain_information_to_create) self.print_summary_of_findings( diff --git a/src/registrar/management/commands/utility/epp_data_containers.py b/src/registrar/management/commands/utility/epp_data_containers.py index 36b5e3f17..3fe170574 100644 --- a/src/registrar/management/commands/utility/epp_data_containers.py +++ b/src/registrar/management/commands/utility/epp_data_containers.py @@ -15,69 +15,69 @@ from typing import List, Optional class AgencyAdhoc: """Defines the structure given in the AGENCY_ADHOC file""" - agencyid: Optional[int] = field(default=None, repr=True) - agencyname: Optional[str] = field(default=None, repr=True) - active: Optional[str] = field(default=None, repr=True) - isfederal: Optional[str] = field(default=None, repr=True) + agencyid: Optional[int] = field(default=None, repr=True) + agencyname: Optional[str] = field(default=None, repr=True) + active: Optional[str] = field(default=None, repr=True) + isfederal: Optional[str] = field(default=None, repr=True) @dataclass class DomainAdditionalData: """Defines the structure given in the DOMAIN_ADDITIONAL file""" - domainname: Optional[str] = field(default=None, repr=True) - domaintypeid: Optional[int] = field(default=None, repr=True) - authorityid: Optional[int] = field(default=None, repr=True) - orgid: Optional[int] = field(default=None, repr=True) - securitycontactemail: Optional[str] = field(default=None, repr=True) - dnsseckeymonitor: Optional[str] = field(default=None, repr=True) - domainpurpose: Optional[str] = field(default=None, repr=True) + domainname: Optional[str] = field(default=None, repr=True) + domaintypeid: Optional[int] = field(default=None, repr=True) + authorityid: Optional[int] = field(default=None, repr=True) + orgid: Optional[int] = field(default=None, repr=True) + securitycontactemail: Optional[str] = field(default=None, repr=True) + dnsseckeymonitor: Optional[str] = field(default=None, repr=True) + domainpurpose: Optional[str] = field(default=None, repr=True) @dataclass class DomainTypeAdhoc: """Defines the structure given in the DOMAIN_ADHOC file""" - domaintypeid: Optional[int] = field(default=None, repr=True) - domaintype: Optional[str] = field(default=None, repr=True) - code: Optional[str] = field(default=None, repr=True) - active: Optional[str] = field(default=None, repr=True) + domaintypeid: Optional[int] = field(default=None, repr=True) + domaintype: Optional[str] = field(default=None, repr=True) + code: Optional[str] = field(default=None, repr=True) + active: Optional[str] = field(default=None, repr=True) @dataclass class OrganizationAdhoc: """Defines the structure given in the ORGANIZATION_ADHOC file""" - orgid: Optional[int] = field(default=None, repr=True) - orgname: Optional[str] = field(default=None, repr=True) - orgstreet: Optional[str] = field(default=None, repr=True) - orgcity: Optional[str] = field(default=None, repr=True) - orgstate: Optional[str] = field(default=None, repr=True) - orgzip: Optional[str] = field(default=None, repr=True) - orgcountrycode: Optional[str] = field(default=None, repr=True) + orgid: Optional[int] = field(default=None, repr=True) + orgname: Optional[str] = field(default=None, repr=True) + orgstreet: Optional[str] = field(default=None, repr=True) + orgcity: Optional[str] = field(default=None, repr=True) + orgstate: Optional[str] = field(default=None, repr=True) + orgzip: Optional[str] = field(default=None, repr=True) + orgcountrycode: Optional[str] = field(default=None, repr=True) @dataclass class AuthorityAdhoc: """Defines the structure given in the AUTHORITY_ADHOC file""" - authorityid: Optional[int] = field(default=None, repr=True) - firstname: Optional[str] = field(default=None, repr=True) - middlename: Optional[str] = field(default=None, repr=True) - lastname: Optional[str] = field(default=None, repr=True) - email: Optional[str] = field(default=None, repr=True) - phonenumber: Optional[str] = field(default=None, repr=True) - agencyid: Optional[int] = field(default=None, repr=True) - addlinfo: Optional[List[str]] = field(default=None, repr=True) + authorityid: Optional[int] = field(default=None, repr=True) + firstname: Optional[str] = field(default=None, repr=True) + middlename: Optional[str] = field(default=None, repr=True) + lastname: Optional[str] = field(default=None, repr=True) + email: Optional[str] = field(default=None, repr=True) + phonenumber: Optional[str] = field(default=None, repr=True) + agencyid: Optional[int] = field(default=None, repr=True) + addlinfo: Optional[List[str]] = field(default=None, repr=True) @dataclass class DomainEscrow: """Defines the structure given in the DOMAIN_ESCROW file""" - domainname: Optional[str] = field(default=None, repr=True) - creationdate: Optional[date] = field(default=None, repr=True) - expirationdate: Optional[date] = field(default=None, repr=True) + domainname: Optional[str] = field(default=None, repr=True) + creationdate: Optional[date] = field(default=None, repr=True) + expirationdate: Optional[date] = field(default=None, repr=True) class EnumFilenames(Enum): @@ -89,27 +89,12 @@ class EnumFilenames(Enum): # We are sourcing data from many different locations, so its better to track this # as an Enum rather than multiple spread out variables. # We store the "type" as [0], and we store the "default_filepath" as [1]. - AGENCY_ADHOC = ( - "agency_adhoc", - "agency.adhoc.dotgov.txt" - ) + AGENCY_ADHOC = ("agency_adhoc", "agency.adhoc.dotgov.txt") DOMAIN_ADDITIONAL = ( "domain_additional", "domainadditionaldatalink.adhoc.dotgov.txt", ) - DOMAIN_ESCROW = ( - "domain_escrow", - "escrow_domains.daily.dotgov.GOV.txt" - ) - DOMAIN_ADHOC = ( - "domain_adhoc", - "domaintypes.adhoc.dotgov.txt" - ) - ORGANIZATION_ADHOC = ( - "organization_adhoc", - "organization.adhoc.dotgov.txt" - ) - AUTHORITY_ADHOC = ( - "authority_adhoc", - "authority.adhoc.dotgov.txt" - ) + DOMAIN_ESCROW = ("domain_escrow", "escrow_domains.daily.dotgov.GOV.txt") + DOMAIN_ADHOC = ("domain_adhoc", "domaintypes.adhoc.dotgov.txt") + ORGANIZATION_ADHOC = ("organization_adhoc", "organization.adhoc.dotgov.txt") + AUTHORITY_ADHOC = ("authority_adhoc", "authority.adhoc.dotgov.txt") diff --git a/src/registrar/management/commands/utility/extra_transition_domain_helper.py b/src/registrar/management/commands/utility/extra_transition_domain_helper.py index fd7752839..b06bc5299 100644 --- a/src/registrar/management/commands/utility/extra_transition_domain_helper.py +++ b/src/registrar/management/commands/utility/extra_transition_domain_helper.py @@ -69,10 +69,15 @@ class FileTransitionLog: log = self.LogItem(file_type, code, message, domain_name) dict_name = (file_type, domain_name) self._add_to_log_list(dict_name, log) - def create_log_item( - self, file_type, code, message, domain_name=None, add_to_list=True, minimal_logging=True + self, + file_type, + code, + message, + domain_name=None, + add_to_list=True, + minimal_logging=True, ): """Creates and returns an LogItem object. @@ -81,10 +86,10 @@ class FileTransitionLog: log = self.LogItem(file_type, code, message, domain_name) if not add_to_list: return log - + dict_name = (file_type, domain_name) self._add_to_log_list(dict_name, log) - + restrict_type = [] if minimal_logging: restrict_type = [LogCode.INFO, LogCode.WARNING] @@ -99,7 +104,7 @@ class FileTransitionLog: def _add_to_log_list(self, log_name, log): if log_name not in self.logs: self.logs[log_name] = [log] - else: + else: self.logs[log_name].append(log) def display_all_logs(self): @@ -107,9 +112,7 @@ class FileTransitionLog: for parent_log in self.logs: for child_log in parent_log: TerminalHelper.print_conditional( - True, - child_log.message, - child_log.severity + True, child_log.message, child_log.severity ) def display_logs_by_domain_name(self, domain_name, restrict_type=LogCode.DEFAULT): @@ -125,16 +128,14 @@ class FileTransitionLog: domain_logs = self.get_logs(file_type, domain_name) if domain_logs is None: return None - + for log in domain_logs: TerminalHelper.print_conditional( - restrict_type != log.code, - log.message, - log.code + restrict_type != log.code, log.message, log.code ) def get_logs(self, file_type, domain_name): - """Grabs the logs associated with + """Grabs the logs associated with a particular file_type and domain_name""" log_name = (file_type, domain_name) return self.logs.get(log_name) @@ -213,19 +214,20 @@ class LoadExtraTransitionDomain: logger.info( f"""{TerminalColors.OKGREEN} ============= FINISHED =============== - Updated {len(updated_transition_domains)} transition domain entries: - {[domain for domain in updated_transition_domains]} + Updated {len(updated_transition_domains)} transition domain entries {TerminalColors.ENDC} """ ) else: # TODO - update - TerminalHelper.print_conditional(self.debug, f"{TerminalHelper.array_as_string(updated_transition_domains)}") + TerminalHelper.print_conditional( + self.debug, + f"{TerminalHelper.array_as_string(updated_transition_domains)}", + ) logger.error( f"""{TerminalColors.FAIL} ============= FINISHED WITH ERRORS =============== - Updated {len(updated_transition_domains)} transition domain entries: - {[domain for domain in updated_transition_domains]} + Updated {len(updated_transition_domains)} transition domain entries, Failed to update {failed_count} transition domain entries: {[domain for domain in failed_transition_domains]} {TerminalColors.ENDC} @@ -237,7 +239,8 @@ class LoadExtraTransitionDomain: total_transition_domains = len(updated_transition_domains) total_updates_made = TransitionDomain.objects.all().count() if total_transition_domains != total_updates_made: - logger.error(f"""{TerminalColors.FAIL} + logger.error( + f"""{TerminalColors.FAIL} WARNING: something went wrong processing domain information data. Total Transition Domains expecting a data update: {total_transition_domains} @@ -248,7 +251,8 @@ class LoadExtraTransitionDomain: corrupt data. Please check logs to diagnose. ----- TERMINATING ---- - """) + """ + ) sys.exit() def parse_creation_expiration_data(self, domain_name, transition_domain): @@ -262,19 +266,15 @@ class LoadExtraTransitionDomain: self.parse_logs.create_log_item( EnumFilenames.DOMAIN_ESCROW, LogCode.ERROR, - "Could not add epp_creation_date and epp_expiration_date " + "Could not add epp_creation_date and epp_expiration_date " f"on {domain_name}, no data exists.", domain_name, - not self.debug + not self.debug, ) return transition_domain - creation_exists = ( - transition_domain.epp_creation_date is not None - ) - expiration_exists = ( - transition_domain.epp_expiration_date is not None - ) + creation_exists = transition_domain.epp_creation_date is not None + expiration_exists = transition_domain.epp_expiration_date is not None transition_domain.epp_creation_date = info.creationdate transition_domain.epp_expiration_date = info.expirationdate @@ -311,7 +311,7 @@ class LoadExtraTransitionDomain: LogCode.ERROR, f"Could not add federal_agency on {domain_name}, no data exists.", domain_name, - not self.debug + not self.debug, ) return transition_domain @@ -326,7 +326,7 @@ class LoadExtraTransitionDomain: LogCode.ERROR, f"Could not add inactive agency {info.agencyname} on {domain_name}", domain_name, - not self.debug + not self.debug, ) return transition_domain @@ -336,7 +336,7 @@ class LoadExtraTransitionDomain: LogCode.ERROR, f"Could not add non-federal agency {info.agencyname} on {domain_name}", domain_name, - not self.debug + not self.debug, ) return transition_domain @@ -369,7 +369,7 @@ class LoadExtraTransitionDomain: LogCode.ERROR, f"Could not add domain_type on {domain_name}, no data exists.", domain_name, - not self.debug + not self.debug, ) return transition_domain @@ -392,7 +392,7 @@ class LoadExtraTransitionDomain: LogCode.ERROR, f"Could not add inactive domain_type {domain_type[0]} on {domain_name}", domain_name, - not self.debug + not self.debug, ) return transition_domain @@ -453,7 +453,7 @@ class LoadExtraTransitionDomain: LogCode.ERROR, f"Could not add organization_name on {domain_name}, no data exists.", domain_name, - not self.debug + not self.debug, ) return transition_domain @@ -487,7 +487,7 @@ class LoadExtraTransitionDomain: LogCode.INFO, f"Added {var_name} as '{changed_value}' on {domain_name}", domain_name, - not self.debug + not self.debug, ) else: self.parse_logs.create_log_item( @@ -495,7 +495,7 @@ class LoadExtraTransitionDomain: LogCode.WARNING, f"Updated existing {var_name} to '{changed_value}' on {domain_name}", domain_name, - not self.debug + not self.debug, ) # Property getters, i.e. orgid or domaintypeid @@ -523,7 +523,7 @@ class LoadExtraTransitionDomain: domain_info = self.get_domain_data(domain_name) if domain_info is None: return None - + # The agency record is within the authority adhoc authority_id = domain_info.authorityid authority = self.get_authority_adhoc(authority_id) @@ -542,14 +542,14 @@ class LoadExtraTransitionDomain: return None type_id = domain_info.authorityid return self.get_authority_adhoc(type_id) - + def get_domain_escrow_info(self, domain_name): domain_info = self.get_domain_data(domain_name) if domain_info is None: return None type_id = domain_info.domainname return self.get_domain_escrow(type_id) - + # Object getters, i.e. DomainAdditionalData or OrganizationAdhoc def get_domain_data(self, desired_id) -> DomainAdditionalData: """Grabs a corresponding row within the DOMAIN_ADDITIONAL file, @@ -575,7 +575,7 @@ class LoadExtraTransitionDomain: """Grabs a corresponding row within the AUTHORITY_ADHOC file, based off a desired_id""" return self.get_object_by_id(EnumFilenames.AUTHORITY_ADHOC, desired_id) - + def get_domain_escrow(self, desired_id) -> DomainEscrow: """Grabs a corresponding row within the DOMAIN_ESCROW file, based off a desired_id""" @@ -615,7 +615,9 @@ class LoadExtraTransitionDomain: desired_type = self.parsed_data_container.file_data.get(file_type) if desired_type is None: self.parse_logs.create_log_item( - file_type, LogCode.ERROR, f"Type {file_type} does not exist", + file_type, + LogCode.ERROR, + f"Type {file_type} does not exist", ) return None @@ -624,10 +626,13 @@ class LoadExtraTransitionDomain: obj = desired_type.data.get(desired_id) if obj is None: self.parse_logs.create_log_item( - file_type, LogCode.ERROR, f"Id {desired_id} does not exist for {file_type.value[0]}" + file_type, + LogCode.ERROR, + f"Id {desired_id} does not exist for {file_type.value[0]}", ) return obj + # TODO - change name @dataclass class FileDataHolder: @@ -698,18 +703,18 @@ class FileDataHolder: # matches, then we shouldn't infer if total_groups == 0 or total_groups > 2: return (self.filename, False) - + # If only one match is returned, # it means that our default matches our request if total_groups == 1: return (self.filename, True) - + # Otherwise, if two are returned, then # its likely the pattern we want date = match.group(1) filename_without_date = match.group(2) - # After stripping out the date, + # After stripping out the date, # do the two filenames match? can_infer = filename_without_date == default_file_name if not can_infer: @@ -861,7 +866,7 @@ class ExtraTransitionDomain: if not infer_filenames: logger.error(f"Could not find file: {filename}") continue - + # Infer filename logic # # This mode is used for internal development use and testing only. Rather than having # to manually define the filename each time, we can infer what the filename @@ -898,26 +903,15 @@ class ExtraTransitionDomain: file_type.data = {} def parse_csv_file( - self, - file, - seperator, - dataclass_type, - id_field, - is_domain_escrow=False + self, file, seperator, dataclass_type, id_field, is_domain_escrow=False ): # Domain escrow is an edge case if is_domain_escrow: - item_to_return = self._read_domain_escrow( - file, - seperator - ) + item_to_return = self._read_domain_escrow(file, seperator) return item_to_return else: item_to_return = self._read_csv_file( - file, - seperator, - dataclass_type, - id_field + file, seperator, dataclass_type, id_field ) return item_to_return @@ -946,14 +940,16 @@ class ExtraTransitionDomain: reader = csv.DictReader(requested_file, delimiter=seperator) for row in reader: # Checks if we encounter any bad data. - # If we do, we (non-destructively) clean the file + # If we do, we (non-destructively) clean the file if None in row: logger.warning( f"{TerminalColors.YELLOW}" f"Found bad data in {file}. Attempting to clean." f"{TerminalColors.ENDC}" ) - updated_file_content = self.replace_bad_seperators(file, f"{seperator}", ";badseperator;") + updated_file_content = self.replace_bad_seperators( + file, f"{seperator}", ";badseperator;" + ) dict_data = {} break @@ -964,7 +960,7 @@ class ExtraTransitionDomain: if id_field == "domainname" and row_id is not None: row_id = row_id.lower() dict_data[row_id] = dataclass_type(**row) - + # After we clean the data, try to parse it again if updated_file_content: logger.info( @@ -999,10 +995,10 @@ class ExtraTransitionDomain: row_id = row_id.lower() dict_data[row_id] = dataclass_type(**row) return dict_data - + def replace_bad_seperators(self, filename, delimiter, special_character): with open(filename, "r", encoding="utf-8-sig") as file: contents = file.read() new_content = re.sub(rf" \{delimiter} ", special_character, contents) - return new_content \ No newline at end of file + return new_content diff --git a/src/registrar/management/commands/utility/terminal_helper.py b/src/registrar/management/commands/utility/terminal_helper.py index 0f309f9c8..05c37b271 100644 --- a/src/registrar/management/commands/utility/terminal_helper.py +++ b/src/registrar/management/commands/utility/terminal_helper.py @@ -4,9 +4,10 @@ import sys logger = logging.getLogger(__name__) + class LogCode(Enum): """Stores the desired log severity - + Overview of error codes: - 1 ERROR - 2 WARNING @@ -21,6 +22,7 @@ class LogCode(Enum): DEBUG = 4 DEFAULT = 5 + class TerminalColors: """Colors for terminal outputs (makes reading the logs WAY easier)""" @@ -81,7 +83,14 @@ class TerminalHelper: The "answer" return value is True for "yes" or False for "no". """ - valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False, "e": "exit"} + valid = { + "yes": True, + "y": True, + "ye": True, + "no": False, + "n": False, + "e": "exit", + } if default is None: prompt = " [y/n] " elif default == "yes": @@ -105,22 +114,20 @@ class TerminalHelper: # @staticmethod def array_as_string(array_to_convert: []) -> str: - array_as_string = "{}".format( - "\n".join(map(str, array_to_convert)) - ) + array_as_string = "{}".format("\n".join(map(str, array_to_convert))) return array_as_string @staticmethod def print_conditional( - print_condition: bool, - print_statement: str, - log_severity: LogCode = LogCode.DEFAULT + print_condition: bool, + print_statement: str, + log_severity: LogCode = LogCode.DEFAULT, ): """This function reduces complexity of debug statements in other functions. It uses the logger to write the given print_statement to the terminal if print_condition is TRUE. - + print_condition: bool -> Prints if print_condition is TRUE print_statement: str -> The statement to print @@ -181,29 +188,40 @@ class TerminalHelper: @staticmethod def get_file_line_count(filepath: str) -> int: - with open(filepath,'r') as file: + with open(filepath, "r") as file: li = file.readlines() total_line = len(li) return total_line @staticmethod - def print_to_file_conditional(print_condition: bool, filename: str, file_directory: str, file_contents: str): - """Sometimes logger outputs get insanely huge. - """ - if (print_condition): + def print_to_file_conditional( + print_condition: bool, filename: str, file_directory: str, file_contents: str + ): + """Sometimes logger outputs get insanely huge.""" + if print_condition: # Add a slash if the last character isn't one if file_directory and file_directory[-1] != "/": file_directory += "/" # Assemble filepath filepath = f"{file_directory}{filename}.txt" # Write to file - logger.info(f"{TerminalColors.MAGENTA}Writing to file {filepath}...{TerminalColors.ENDC}") + logger.info( + f"{TerminalColors.MAGENTA}Writing to file {filepath}...{TerminalColors.ENDC}" + ) with open(f"{filepath}", "w+") as f: f.write(file_contents) - @staticmethod - def printProgressBar (iteration, total, prefix = 'Progress:', suffix = 'Complete', decimals = 1, length = 100, fill = '█', printEnd = "\r"): + def printProgressBar( + iteration, + total, + prefix="Progress:", + suffix="Complete", + decimals=1, + length=100, + fill="█", + printEnd="\r", + ): """ Call in a loop to create terminal progress bar @params: @@ -227,10 +245,12 @@ class TerminalHelper: printProgressBar(i + 1, l, prefix = 'Progress:', suffix = 'Complete', length = 50) """ - percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) + percent = ("{0:." + str(decimals) + "f}").format( + 100 * (iteration / float(total)) + ) filledLength = int(length * iteration // total) - bar = fill * filledLength + '-' * (length - filledLength) - print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd) + bar = fill * filledLength + "-" * (length - filledLength) + print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd) # Print New Line on Complete - if iteration == total: - print() \ No newline at end of file + if iteration == total: + print() diff --git a/src/registrar/management/commands/utility/transition_domain_arguments.py b/src/registrar/management/commands/utility/transition_domain_arguments.py index 335f74cd9..11f3afe7c 100644 --- a/src/registrar/management/commands/utility/transition_domain_arguments.py +++ b/src/registrar/management/commands/utility/transition_domain_arguments.py @@ -3,19 +3,20 @@ from typing import Optional from registrar.management.commands.utility.epp_data_containers import EnumFilenames + @dataclass class TransitionDomainArguments: """Stores arguments for load_transition_domain, structurally a mix - of a dataclass and a regular class, meaning we get a hardcoded + of a dataclass and a regular class, meaning we get a hardcoded representation of the values we want, while maintaining flexiblity and reducing boilerplate. - + All pre-defined fields are optional but will remain on the model definition. In this event, they are provided a default value if none is given. """ # Maintains an internal kwargs list and sets values - # that match the class definition. + # that match the class definition. def __init__(self, **kwargs): self.kwargs = kwargs for k, v in kwargs.items(): @@ -36,14 +37,26 @@ class TransitionDomainArguments: # Filenames # ## Adhocs ## - agency_adhoc_filename: Optional[str] = field(default=EnumFilenames.AGENCY_ADHOC.value[1], repr=True) - domain_adhoc_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADHOC.value[1], repr=True) - organization_adhoc_filename: Optional[str] = field(default=EnumFilenames.ORGANIZATION_ADHOC.value[1], repr=True) - authority_adhoc_filename: Optional[str] = field(default=EnumFilenames.AUTHORITY_ADHOC.value[1], repr=True) + agency_adhoc_filename: Optional[str] = field( + default=EnumFilenames.AGENCY_ADHOC.value[1], repr=True + ) + domain_adhoc_filename: Optional[str] = field( + default=EnumFilenames.DOMAIN_ADHOC.value[1], repr=True + ) + organization_adhoc_filename: Optional[str] = field( + default=EnumFilenames.ORGANIZATION_ADHOC.value[1], repr=True + ) + authority_adhoc_filename: Optional[str] = field( + default=EnumFilenames.AUTHORITY_ADHOC.value[1], repr=True + ) ## Data files ## - domain_escrow_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ESCROW.value[1], repr=True) - domain_additional_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADDITIONAL.value[1], repr=True) + domain_escrow_filename: Optional[str] = field( + default=EnumFilenames.DOMAIN_ESCROW.value[1], repr=True + ) + domain_additional_filename: Optional[str] = field( + default=EnumFilenames.DOMAIN_ADDITIONAL.value[1], repr=True + ) domain_contacts_filename: Optional[str] = field(default=None, repr=True) domain_statuses_filename: Optional[str] = field(default=None, repr=True) contacts_filename: Optional[str] = field(default=None, repr=True) diff --git a/src/registrar/tests/test_transition_domain_migrations.py b/src/registrar/tests/test_transition_domain_migrations.py index 877737f43..5c5b0f1f7 100644 --- a/src/registrar/tests/test_transition_domain_migrations.py +++ b/src/registrar/tests/test_transition_domain_migrations.py @@ -13,6 +13,7 @@ from registrar.models import ( from django.core.management import call_command from unittest.mock import patch + class TestMigrations(TestCase): def setUp(self): """ """ @@ -46,24 +47,30 @@ class TestMigrations(TestCase): UserDomainRole.objects.all().delete() def run_load_domains(self): - with patch('registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit', return_value=True): + with patch( + "registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", + return_value=True, + ): call_command( "load_transition_domain", self.migration_json_filename, - directory=self.test_data_file_location + directory=self.test_data_file_location, ) def run_transfer_domains(self): call_command("transfer_transition_domains_to_domains") def run_master_script(self): - with patch('registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit', return_value=True): + with patch( + "registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", + return_value=True, + ): call_command( "master_domain_migrations", runMigrations=True, migrationDirectory=self.test_data_file_location, migrationJSON=self.migration_json_filename, - disablePrompts=True + disablePrompts=True, ) def compare_tables( @@ -204,7 +211,7 @@ class TestMigrations(TestCase): expected_missing_domain_informations, expected_missing_domain_invitations, ) - + def test_load_full_transition_domain(self): # Load command self.run_load_domains() @@ -242,7 +249,7 @@ class TestMigrations(TestCase): federal_type="Executive", federal_agency="InnoZ", epp_creation_date=None, - epp_expiration_date=None + epp_expiration_date=None, ), TransitionDomain( username="reginald.ratcliff4@test.com", @@ -254,20 +261,21 @@ class TestMigrations(TestCase): federal_type=None, federal_agency=None, epp_creation_date=None, - epp_expiration_date=None - ) + epp_expiration_date=None, + ), ] - expected_transition_domains = TransitionDomain.objects.filter(username="alexandra.bobbitt5@test.com") + expected_transition_domains = TransitionDomain.objects.filter( + username="alexandra.bobbitt5@test.com" + ) self.assertEqual(expected_transition_domains.count(), 1) expected_transition_domain = expected_transition_domains.get() - #TransitionDomain.objects.filter(domain_name = "fakewebsite3.gov") + # TransitionDomain.objects.filter(domain_name = "fakewebsite3.gov") # Afterwards, their values should be what we expect all_transition_domains = TransitionDomain.objects.all() for domain in all_transition_domains: for expected in expected_transition_domains: - # This data gets created when the object is, # so we should just match it. Not relevant # to the added data. @@ -277,7 +285,7 @@ class TestMigrations(TestCase): # Each TransitionDomain should have the correct data self.assertEqual(domain, expected) - + def test_load_full_domain(self): self.run_load_domains() self.run_transfer_domains() @@ -323,10 +331,10 @@ class TestMigrations(TestCase): testdomain = testdomain_domains.get() self.assertEqual(testdomain.expiration_date, datetime.date(2023, 9, 30)) - #self.assertEqual(testdomain.created_at, "test") + # self.assertEqual(testdomain.created_at, "test") self.assertEqual(testdomain.name, "fakewebsite2.gov") self.assertEqual(testdomain.state, "on hold") - + def test_load_full_domain_information(self): self.run_load_domains() self.run_transfer_domains() @@ -355,7 +363,7 @@ class TestMigrations(TestCase): # Test created Domain Information objects domain = Domain.objects.filter(name="anomaly.gov").get() anomaly_domain_infos = DomainInformation.objects.filter(domain=domain) - + self.assertEqual(anomaly_domain_infos.count(), 1) # This domain should be pretty barebones. Something isnt @@ -365,7 +373,7 @@ class TestMigrations(TestCase): self.assertEqual(anomaly.organization_type, None) self.assertEqual(anomaly.federal_agency, None) self.assertEqual(anomaly.federal_type, None) - + # Check for the "system" creator user Users = User.objects.filter(username="System") self.assertEqual(Users.count(), 1) @@ -380,13 +388,12 @@ class TestMigrations(TestCase): self.assertEqual(fakewebsite.organization_type, "federal") self.assertEqual(fakewebsite.federal_agency, "Department of Commerce") self.assertEqual(fakewebsite.federal_type, "executive") - + # Check for the "system" creator user Users = User.objects.filter(username="System") self.assertEqual(Users.count(), 1) self.assertEqual(anomaly.creator, Users.get()) - def test_transfer_transition_domains_to_domains(self): self.run_load_domains() self.run_transfer_domains() diff --git a/src/registrar/views/utility/mixins.py b/src/registrar/views/utility/mixins.py index 1e43a3e24..0db1ff402 100644 --- a/src/registrar/views/utility/mixins.py +++ b/src/registrar/views/utility/mixins.py @@ -100,7 +100,7 @@ class DomainPermission(PermissionsLoginMixin): if DomainInformation.objects.filter(id=pk).exists(): requested_domain = DomainInformation.objects.get(id=pk) - # If no domain_application object exists and we are + # If no domain_application object exists and we are # coming from the manage_domain dashboard, this is likely # a transition domain. domain_application = requested_domain.domain_application