diff --git a/src/registrar/management/commands/cat_files_into_getgov.py b/src/registrar/management/commands/cat_files_into_getgov.py index 17964f236..cb00e9f31 100644 --- a/src/registrar/management/commands/cat_files_into_getgov.py +++ b/src/registrar/management/commands/cat_files_into_getgov.py @@ -21,17 +21,19 @@ class Command(BaseCommand): default="txt", help="What file extensions to look for, like txt or gz", ) - parser.add_argument("--directory", default="migrationdata", help="Desired directory") + parser.add_argument( + "--directory", default="migrationdata", help="Desired directory" + ) def handle(self, **options): - file_extension: str = options.get("file_extension").lstrip('.') + file_extension: str = options.get("file_extension").lstrip(".") directory = options.get("directory") # file_extension is always coerced as str, Truthy is OK to use here. if not file_extension or not isinstance(file_extension, str): raise ValueError(f"Invalid file extension '{file_extension}'") - matching_extensions = glob.glob(f'../tmp/*.{file_extension}') + matching_extensions = glob.glob(f"../tmp/*.{file_extension}") if not matching_extensions: logger.error(f"No files with the extension {file_extension} found") @@ -39,23 +41,25 @@ class Command(BaseCommand): filename = os.path.basename(src_file_path) do_command = True exit_status: int - - desired_file_path = f'{directory}/{filename}' + + desired_file_path = f"{directory}/{filename}" if os.path.exists(desired_file_path): - replace = input(f'{desired_file_path} already exists. Do you want to replace it? (y/n) ') - if replace.lower() != 'y': + replace = input( + f"{desired_file_path} already exists. Do you want to replace it? (y/n) " + ) + if replace.lower() != "y": do_command = False - + if do_command: copy_from = f"../tmp/{filename}" self.cat(copy_from, desired_file_path) - exit_status = os.system(f'cat ../tmp/{filename} > {desired_file_path}') + exit_status = os.system(f"cat ../tmp/{filename} > {desired_file_path}") if exit_status == 0: logger.info(f"Successfully copied {filename}") else: logger.info(f"Failed to copy {filename}") - + def cat(self, copy_from, copy_to): - exit_status = os.system(f'cat {copy_from} > {copy_to}') - return exit_status \ No newline at end of file + exit_status = os.system(f"cat {copy_from} > {copy_to}") + return exit_status diff --git a/src/registrar/management/commands/load_extra_transition_domain.py b/src/registrar/management/commands/load_extra_transition_domain.py index 866508fb1..41e9856ce 100644 --- a/src/registrar/management/commands/load_extra_transition_domain.py +++ b/src/registrar/management/commands/load_extra_transition_domain.py @@ -11,40 +11,98 @@ from django.core.management import BaseCommand from registrar.models.transition_domain import TransitionDomain from .utility.extra_transition_domain import ExtraTransitionDomain -from .utility.epp_data_containers import AgencyAdhoc, DomainAdditionalData, DomainTypeAdhoc, OrganizationAdhoc, EnumFilenames +from .utility.epp_data_containers import ( + AgencyAdhoc, + DomainAdditionalData, + DomainTypeAdhoc, + OrganizationAdhoc, + EnumFilenames, +) logger = logging.getLogger(__name__) +class LogCode(Enum): + ERROR = 1 + WARNING = 2 + INFO = 3 + DEBUG = 4 + + +class FileTransitionLog: + def __init__(self): + self.logs = { + EnumFilenames.DOMAIN_ADHOC: [], + EnumFilenames.AGENCY_ADHOC: [], + EnumFilenames.ORGANIZATION_ADHOC: [], + EnumFilenames.DOMAIN_ADDITIONAL: [], + } + + class LogItem: + def __init__(self, file_type, code, message): + self.file_type = file_type + self.code = code + self.message = message + + def add_log(self, file_type, code, message): + self.logs[file_type] = self.LogItem(file_type, code, message) + + def add_log(self, log: LogItem): + self.logs.append(log) + + def create_log_item(self, file_type, code, message, add_to_list=True): + """Creates and returns an LogItem object. + + add_to_list: bool -> If enabled, add it to the logs array. + """ + log = self.LogItem(file_type, code, message) + if not add_to_list: + return log + else: + self.logs[file_type] = log + return log + + def display_logs(self, file_type): + for log in self.logs.get(file_type): + match log.code: + case LogCode.ERROR: + logger.error(log.message) + case LogCode.WARNING: + logger.warning(log.message) + case LogCode.INFO: + logger.info(log.message) + case LogCode.DEBUG: + logger.debug(log.message) + + class Command(BaseCommand): help = "" filenames = EnumFilenames + parse_logs = FileTransitionLog() def add_arguments(self, parser): """Add filename arguments.""" parser.add_argument( - "--directory", - default="migrationdata", - help="Desired directory" + "--directory", default="migrationdata", help="Desired directory" ) parser.add_argument( "--agency_adhoc_filename", - default=self.filenames.AGENCY_ADHOC[1], + default=EnumFilenames.AGENCY_ADHOC[1], help="Defines the filename for agency adhocs", ) parser.add_argument( "--domain_additional_filename", - default=self.filenames.DOMAIN_ADDITIONAL[1], + default=EnumFilenames.DOMAIN_ADDITIONAL[1], help="Defines the filename for additional domain data", ) parser.add_argument( "--domain_adhoc_filename", - default=self.filenames.DOMAIN_ADHOC[1], + default=EnumFilenames.DOMAIN_ADHOC[1], help="Defines the filename for domain type adhocs", ) parser.add_argument( "--organization_adhoc_filename", - default=self.filenames.ORGANIZATION_ADHOC[1], + default=EnumFilenames.ORGANIZATION_ADHOC[1], help="Defines the filename for domain type adhocs", ) parser.add_argument("--sep", default="|", help="Delimiter character") @@ -52,41 +110,259 @@ class Command(BaseCommand): def handle(self, **options): try: self.domain_object = ExtraTransitionDomain( - agency_adhoc_filename=options['agency_adhoc_filename'], - domain_additional_filename=options['domain_additional_filename'], - domain_adhoc_filename=options['domain_adhoc_filename'], - organization_adhoc_filename=options['organization_adhoc_filename'], - directory=options['directory'], - seperator=options['sep'] + agency_adhoc_filename=options["agency_adhoc_filename"], + domain_additional_filename=options["domain_additional_filename"], + domain_adhoc_filename=options["domain_adhoc_filename"], + organization_adhoc_filename=options["organization_adhoc_filename"], + directory=options["directory"], + seperator=options["sep"], ) self.domain_object.parse_all_files() except Exception as err: logger.error(f"Could not load additional data. Error: {err}") else: - for transition_domain in TransitionDomain.objects.all(): - transition_domain.organization_type + all_transition_domains = TransitionDomain.objects.all() + if not all_transition_domains.exists(): + raise Exception("No TransitionDomain objects exist.") - def get_organization_adhoc(self, desired_id): + for transition_domain in all_transition_domains: + domain_name = transition_domain.domain_name + updated_transition_domain = transition_domain + + # STEP 1: Parse domain type data + updated_transition_domain = self.parse_domain_type_data( + domain_name, transition_domain + ) + self.parse_logs(EnumFilenames.DOMAIN_ADHOC) + + # STEP 2: Parse agency data - TODO + updated_transition_domain = self.parse_agency_data( + domain_name, transition_domain + ) + self.parse_logs(EnumFilenames.AGENCY_ADHOC) + + # STEP 3: Parse organization data + updated_transition_domain = self.parse_org_data( + domain_name, transition_domain + ) + self.parse_logs.display_logs(EnumFilenames.ORGANIZATION_ADHOC) + + # STEP 4: Parse expiration data - TODO + updated_transition_domain = self.parse_expiration_data( + domain_name, transition_domain + ) + # self.parse_logs(EnumFilenames.EXPIRATION_DATA) + + updated_transition_domain.save() + + # TODO - Implement once Niki gets her ticket in + def parse_expiration_data(self, domain_name, transition_domain): + return transition_domain + + # TODO - Implement once Niki gets her ticket in + def parse_agency_data(self, domain_name, transition_domain): + """ + + if not isinstance(transition_domain, TransitionDomain): + raise ValueError("Not a valid object, must be TransitionDomain") + + info = self.get_domain_type_info(domain_name) + if info is None: + self.parse_logs.create_log_item( + EnumFilenames.AGENCY_ADHOC, + LogCode.INFO, + f"Could not add agency_data on {domain_name}, no data exists." + ) + return transition_domain + + agency_exists = ( + transition_domain.agency_name is not None + and transition_domain.agency_name.strip() != "" + ) + + # Logs if we either added to this property, + # or modified it. + self._add_or_change_message( + EnumFilenames.AGENCY_ADHOC, + "agency_name", + transition_domain.agency_name, + domain_name, + agency_exists + ) + """ + return transition_domain + + def parse_domain_type_data(self, domain_name, transition_domain: TransitionDomain): + if not isinstance(transition_domain, TransitionDomain): + raise ValueError("Not a valid object, must be TransitionDomain") + + info = self.get_domain_type_info(domain_name) + if info is None: + self.parse_logs.create_log_item( + EnumFilenames.DOMAIN_ADHOC, + LogCode.INFO, + f"Could not add domain_type on {domain_name}, no data exists.", + ) + return transition_domain + + # This data is stored as follows: FEDERAL - Judicial + # For all other records, it is stored as so: Interstate + # We can infer if it is federal or not based on this fact. + domain_type = info.domaintype.split("-") + if domain_type.count != 1 or domain_type.count != 2: + raise ValueError("Found invalid data in DOMAIN_ADHOC") + + # Then, just grab the agency type. + new_federal_agency = domain_type[0].strip() + + # Check if this domain_type is active or not. + # If not, we don't want to add this. + if not info.active.lower() == "y": + self.parse_logs.create_log_item( + EnumFilenames.DOMAIN_ADHOC, + LogCode.ERROR, + f"Could not add inactive domain_type {domain_type[0]} on {domain_name}", + ) + return transition_domain + + # Are we updating data that already exists, + # or are we adding new data in its place? + federal_agency_exists = ( + transition_domain.federal_agency is not None + and transition_domain.federal_agency.strip() != "" + ) + federal_type_exists = ( + transition_domain.federal_type is not None + and transition_domain.federal_type.strip() != "" + ) + + # If we get two records, then we know it is federal. + is_federal = domain_type.count() == 2 + if is_federal: + new_federal_type = domain_type[1].strip() + transition_domain.federal_agency = new_federal_agency + transition_domain.federal_type = new_federal_type + else: + transition_domain.federal_agency = new_federal_agency + transition_domain.federal_type = None + + # Logs if we either added to this property, + # or modified it. + self._add_or_change_message( + EnumFilenames.DOMAIN_ADHOC, + "federal_agency", + transition_domain.federal_agency, + domain_name, + federal_agency_exists, + ) + + self._add_or_change_message( + EnumFilenames.DOMAIN_ADHOC, + "federal_type", + transition_domain.federal_type, + domain_name, + federal_type_exists, + ) + + return transition_domain + + def parse_org_data(self, domain_name, transition_domain: TransitionDomain): + if not isinstance(transition_domain, TransitionDomain): + raise ValueError("Not a valid object, must be TransitionDomain") + + org_info = self.get_org_info(domain_name) + if org_info is None: + self.parse_logs.create_log_item( + EnumFilenames.ORGANIZATION_ADHOC, + LogCode.INFO, + f"Could not add organization_type on {domain_name}, no data exists.", + ) + return transition_domain + + desired_property_exists = ( + transition_domain.organization_type is not None + and transition_domain.organization_type.strip() != "" + ) + + transition_domain.organization_type = org_info.orgname + + # Logs if we either added to this property, + # or modified it. + self._add_or_change_message( + EnumFilenames.ORGANIZATION_ADHOC, + "organization_type", + transition_domain.organization_type, + domain_name, + desired_property_exists, + ) + + return transition_domain + + def _add_or_change_message( + self, file_type, var_name, changed_value, domain_name, is_update=False + ): + """Creates a log instance when a property + is successfully changed on a given TransitionDomain.""" + if not is_update: + self.parse_logs.create_log_item( + file_type, + LogCode.DEBUG, + f"Added {file_type} as '{var_name}' on {domain_name}", + ) + else: + self.parse_logs.create_log_item( + file_type, + LogCode.INFO, + f"Updated existing {var_name} to '{changed_value}' on {domain_name}", + ) + + def get_org_info(self, domain_name) -> OrganizationAdhoc: + domain_info = self.get_domain_data(domain_name) + org_id = domain_info.orgid + return self.get_organization_adhoc(org_id) + + def get_domain_type_info(self, domain_name) -> DomainTypeAdhoc: + domain_info = self.get_domain_data(domain_name) + type_id = domain_info.domaintypeid + return self.get_domain_adhoc(type_id) + + def get_agency_info(self, domain_name): + # domain_info = self.get_domain_data(domain_name) + # type_id = domain_info.authorityid + # return self.get_domain_adhoc(type_id) + raise + + def get_domain_data(self, desired_id) -> DomainAdditionalData: + return self.get_object_by_id(EnumFilenames.DOMAIN_ADDITIONAL, desired_id) + + def get_organization_adhoc(self, desired_id) -> OrganizationAdhoc: """Grabs adhoc information for organizations. Returns an organization - dictionary - - returns: - { - " + dictionary. + + returns: + { + "org_id_1": OrganizationAdhoc, + "org_id_2: OrganizationAdhoc, + ... } """ - return self.get_object_by_id(self.filenames.ORGANIZATION_ADHOC, desired_id) - + return self.get_object_by_id(EnumFilenames.ORGANIZATION_ADHOC, desired_id) + def get_domain_adhoc(self, desired_id): """""" - return self.get_object_by_id(self.filenames.DOMAIN_ADHOC, desired_id) - + return self.get_object_by_id(EnumFilenames.DOMAIN_ADHOC, desired_id) + def get_agency_adhoc(self, desired_id): """""" - return self.get_object_by_id(self.filenames.AGENCY_ADHOC, desired_id) - + return self.get_object_by_id(EnumFilenames.AGENCY_ADHOC, desired_id) + def get_object_by_id(self, file_type: EnumFilenames, desired_id): """""" desired_type = self.domain_object.csv_data.get(file_type) - obj = desired_type.get(desired_id) + if desired_type is not None: + obj = desired_type.get(desired_id) + else: + self.parse_logs.create_log_item( + file_type, LogCode.ERROR, f"Id {desired_id} does not exist" + ) return obj diff --git a/src/registrar/management/commands/load_transition_domain.py b/src/registrar/management/commands/load_transition_domain.py index 624418fe9..4fb5b2e6e 100644 --- a/src/registrar/management/commands/load_transition_domain.py +++ b/src/registrar/management/commands/load_transition_domain.py @@ -88,9 +88,6 @@ class Command(BaseCommand): parser.add_argument( "domain_statuses_filename", help="Data file with domain status information" ) - parser.add_argument( - "--loadExtraData", default=True, help="Determines if additional metadata should be applied" - ) parser.add_argument("--sep", default="|", help="Delimiter character") @@ -326,9 +323,6 @@ class Command(BaseCommand): # print message to terminal about which args are in use self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse) - - if load_extra_data: - # STEP 1: # Create mapping of domain name -> status diff --git a/src/registrar/management/commands/test_domain_migration.py b/src/registrar/management/commands/test_domain_migration.py index bc9efa9df..5910d2bbf 100644 --- a/src/registrar/management/commands/test_domain_migration.py +++ b/src/registrar/management/commands/test_domain_migration.py @@ -17,10 +17,13 @@ from registrar.models.domain_information import DomainInformation from registrar.management.commands.utility.terminal_helper import TerminalColors from registrar.management.commands.utility.terminal_helper import TerminalHelper -from registrar.management.commands.load_transition_domain import Command as load_transition_domain_command +from registrar.management.commands.load_transition_domain import ( + Command as load_transition_domain_command, +) logger = logging.getLogger(__name__) + class Command(BaseCommand): help = """ """ @@ -31,19 +34,23 @@ class Command(BaseCommand): A boolean (default to true), which activates additional print statements """ - parser.add_argument("--runLoaders", + parser.add_argument( + "--runLoaders", help="Runs all scripts (in sequence) for transition domain migrations", - action=argparse.BooleanOptionalAction) - - parser.add_argument("--triggerLogins", + action=argparse.BooleanOptionalAction, + ) + + parser.add_argument( + "--triggerLogins", help="Simulates a user login for each user in domain invitation", - action=argparse.BooleanOptionalAction) + action=argparse.BooleanOptionalAction, + ) # The following file arguments have default values for running in the sandbox parser.add_argument( "--loaderDirectory", default="migrationData", - help="The location of the files used for load_transition_domain migration script" + help="The location of the files used for load_transition_domain migration script", ) parser.add_argument( "--loaderFilenames", @@ -55,7 +62,7 @@ class Command(BaseCommand): where... - domain_contacts_filename is the Data file with domain contact information - contacts_filename is the Data file with contact information - - domain_statuses_filename is the Data file with domain status information""" + - domain_statuses_filename is the Data file with domain status information""", ) # parser.add_argument( @@ -74,7 +81,9 @@ class Command(BaseCommand): # help="Data file with domain status information" # ) - parser.add_argument("--sep", default="|", help="Delimiter character for the loader files") + parser.add_argument( + "--sep", default="|", help="Delimiter character for the loader files" + ) parser.add_argument("--debug", action=argparse.BooleanOptionalAction) @@ -88,9 +97,7 @@ class Command(BaseCommand): action=argparse.BooleanOptionalAction, ) - def print_debug_mode_statements( - self, debug_on: bool - ): + def print_debug_mode_statements(self, debug_on: bool): """Prints additional terminal statements to indicate if --debug or --limitParse are in use""" self.print_debug( @@ -119,8 +126,8 @@ class Command(BaseCommand): """ ) - #TODO: would filteredRelation be faster? - for transition_domain in TransitionDomain.objects.all():# DEBUG: + # TODO: would filteredRelation be faster? + for transition_domain in TransitionDomain.objects.all(): # DEBUG: transition_domain_name = transition_domain.domain_name transition_domain_email = transition_domain.username @@ -137,10 +144,14 @@ class Command(BaseCommand): # Check Domain table matching_domains = Domain.objects.filter(name=transition_domain_name) # Check Domain Information table - matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name) + matching_domain_informations = DomainInformation.objects.filter( + domain__name=transition_domain_name + ) # Check Domain Invitation table - matching_domain_invitations = DomainInvitation.objects.filter(email=transition_domain_email.lower(), - domain__name=transition_domain_name) + matching_domain_invitations = DomainInvitation.objects.filter( + email=transition_domain_email.lower(), + domain__name=transition_domain_name, + ) if len(matching_domains) == 0: missing_domains.append(transition_domain_name) @@ -157,10 +168,16 @@ class Command(BaseCommand): total_missing_domain_invitations = len(missing_domain_invites) missing_domains_as_string = "{}".format(", ".join(map(str, missing_domains))) - duplicate_domains_as_string = "{}".format(", ".join(map(str, duplicate_domains))) - missing_domain_informations_as_string = "{}".format(", ".join(map(str, missing_domain_informations))) - missing_domain_invites_as_string = "{}".format(", ".join(map(str, missing_domain_invites))) - + duplicate_domains_as_string = "{}".format( + ", ".join(map(str, duplicate_domains)) + ) + missing_domain_informations_as_string = "{}".format( + ", ".join(map(str, missing_domain_informations)) + ) + missing_domain_invites_as_string = "{}".format( + ", ".join(map(str, missing_domain_invites)) + ) + logger.info( f"""{TerminalColors.OKGREEN} ============= FINISHED ANALYSIS =============== @@ -183,20 +200,26 @@ class Command(BaseCommand): {TerminalColors.ENDC} """ ) - - def run_load_transition_domain_script(self, - file_location, - domain_contacts_filename, - contacts_filename, - domain_statuses_filename, - sep, - reset_table, - debug_on, - debug_max_entries_to_parse): + + def run_load_transition_domain_script( + self, + file_location, + domain_contacts_filename, + contacts_filename, + domain_statuses_filename, + sep, + reset_table, + debug_on, + debug_max_entries_to_parse, + ): load_transition_domain_command_string = "./manage.py load_transition_domain " - load_transition_domain_command_string += file_location+domain_contacts_filename + " " - load_transition_domain_command_string += file_location+contacts_filename + " " - load_transition_domain_command_string += file_location+domain_statuses_filename + " " + load_transition_domain_command_string += ( + file_location + domain_contacts_filename + " " + ) + load_transition_domain_command_string += file_location + contacts_filename + " " + load_transition_domain_command_string += ( + file_location + domain_statuses_filename + " " + ) if sep is not None and sep != "|": load_transition_domain_command_string += f"--sep {sep} " @@ -208,7 +231,9 @@ class Command(BaseCommand): load_transition_domain_command_string += "--debug " if debug_max_entries_to_parse > 0: - load_transition_domain_command_string += f"--limitParse {debug_max_entries_to_parse} " + load_transition_domain_command_string += ( + f"--limitParse {debug_max_entries_to_parse} " + ) proceed_load_transition_domain = TerminalHelper.query_yes_no( f"""{TerminalColors.OKCYAN} @@ -224,18 +249,19 @@ class Command(BaseCommand): if not proceed_load_transition_domain: return - logger.info(f"""{TerminalColors.OKCYAN} + logger.info( + f"""{TerminalColors.OKCYAN} ==== EXECUTING... ==== - {TerminalColors.ENDC}""") + {TerminalColors.ENDC}""" + ) os.system(f"{load_transition_domain_command_string}") - + def run_transfer_script(self, debug_on): command_string = "./manage.py transfer_transition_domains_to_domains " if debug_on: command_string += "--debug " - proceed_load_transition_domain = TerminalHelper.query_yes_no( f"""{TerminalColors.OKCYAN} ===================================================== @@ -250,18 +276,20 @@ class Command(BaseCommand): if not proceed_load_transition_domain: return - logger.info(f"""{TerminalColors.OKCYAN} + logger.info( + f"""{TerminalColors.OKCYAN} ==== EXECUTING... ==== - {TerminalColors.ENDC}""") + {TerminalColors.ENDC}""" + ) os.system(f"{command_string}") - def run_migration_scripts(self, - options): - file_location = options.get("loaderDirectory")+"/" + def run_migration_scripts(self, options): + file_location = options.get("loaderDirectory") + "/" filenames = options.get("loaderFilenames").split() if len(filenames) < 3: filenames_as_string = "{}".format(", ".join(map(str, filenames))) - logger.info(f""" + logger.info( + f""" {TerminalColors.FAIL} --loaderFilenames expected 3 filenames to follow it, but only {len(filenames)} were given: @@ -270,7 +298,8 @@ class Command(BaseCommand): PLEASE MODIFY THE SCRIPT AND TRY RUNNING IT AGAIN ============= TERMINATING ============= {TerminalColors.ENDC} - """) + """ + ) return domain_contacts_filename = filenames[0] contacts_filename = filenames[1] @@ -295,16 +324,18 @@ class Command(BaseCommand): if not files_are_correct: # prompt the user to provide correct file inputs - logger.info(f""" + logger.info( + f""" {TerminalColors.YELLOW} PLEASE Re-Run the script with the correct file location and filenames: EXAMPLE: docker compose run -T app ./manage.py test_domain_migration --runLoaders --loaderDirectory /app/tmp --loaderFilenames escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt - """) + """ + ) return - + # Get --sep argument sep = options.get("sep") @@ -319,53 +350,65 @@ class Command(BaseCommand): options.get("limitParse") ) # set to 0 to parse all entries - self.run_load_transition_domain_script(file_location, - domain_contacts_filename, - contacts_filename, - domain_statuses_filename, - sep, - reset_table, - debug_on, - debug_max_entries_to_parse) + self.run_load_transition_domain_script( + file_location, + domain_contacts_filename, + contacts_filename, + domain_statuses_filename, + sep, + reset_table, + debug_on, + debug_max_entries_to_parse, + ) self.run_transfer_script(debug_on) def simulate_user_logins(self, debug_on): - logger.info(f"""{TerminalColors.OKCYAN} + logger.info( + f"""{TerminalColors.OKCYAN} ================== SIMULATING LOGINS ================== {TerminalColors.ENDC} - """) + """ + ) for invite in DomainInvitation.objects.all(): - #DEBUG: - TerminalHelper.print_debug(debug_on,f"""{TerminalColors.OKCYAN}Processing invite: {invite}{TerminalColors.ENDC}""") - # get a user with this email address + # DEBUG: + TerminalHelper.print_debug( + debug_on, + f"""{TerminalColors.OKCYAN}Processing invite: {invite}{TerminalColors.ENDC}""", + ) + # get a user with this email address User = get_user_model() try: user = User.objects.get(email=invite.email) - #DEBUG: - TerminalHelper.print_debug(debug_on,f"""{TerminalColors.OKCYAN}Logging in user: {user}{TerminalColors.ENDC}""") + # DEBUG: + TerminalHelper.print_debug( + debug_on, + f"""{TerminalColors.OKCYAN}Logging in user: {user}{TerminalColors.ENDC}""", + ) Client.force_login(user) except User.DoesNotExist: - #TODO: how should we handle this? - logger.warn(f"""{TerminalColors.FAIL}No user found {invite.email}{TerminalColors.ENDC}""") + # TODO: how should we handle this? + logger.warn( + f"""{TerminalColors.FAIL}No user found {invite.email}{TerminalColors.ENDC}""" + ) def handle( self, **options, ): """ - Does a diff between the transition_domain and the following tables: - domain, domain_information and the domain_invitation. - + Does a diff between the transition_domain and the following tables: + domain, domain_information and the domain_invitation. + Produces the following report (printed to the terminal): #1 - Print any domains that exist in the transition_domain table but not in their corresponding domain, domain information or domain invitation tables. #2 - Print which table this domain is missing from - #3- Check for duplicate entries in domain or - domain_information tables and print which are + #3- Check for duplicate entries in domain or + domain_information tables and print which are duplicates and in which tables (ONLY RUNS with full script option) @@ -374,10 +417,10 @@ class Command(BaseCommand): on django admin for an analyst OPTIONS: - -- (run all other scripts: + -- (run all other scripts: 1 - imports for trans domains 2 - transfer to domain & domain invitation - 3 - send domain invite) + 3 - send domain invite) ** Triggers table reset ** """ @@ -394,12 +437,12 @@ class Command(BaseCommand): if run_loaders_on: self.run_migration_scripts(options) prompt_continuation_of_analysis = True - + # Simulate user login for each user in domain invitation if sepcified by user if simulate_user_login_enabled: self.simulate_user_logins(debug_on) prompt_continuation_of_analysis = True - + analyze_tables = True if prompt_continuation_of_analysis: analyze_tables = TerminalHelper.query_yes_no( @@ -410,4 +453,4 @@ class Command(BaseCommand): # Analyze tables for corrupt data... if analyze_tables: - self.compare_tables(debug_on) \ No newline at end of file + self.compare_tables(debug_on) diff --git a/src/registrar/management/commands/utility/epp_data_containers.py b/src/registrar/management/commands/utility/epp_data_containers.py index 2d006c42b..3fe60da2b 100644 --- a/src/registrar/management/commands/utility/epp_data_containers.py +++ b/src/registrar/management/commands/utility/epp_data_containers.py @@ -2,18 +2,21 @@ from dataclasses import dataclass from enum import Enum from typing import Optional + @dataclass -class AgencyAdhoc(): - """Defines the structure given in the given AGENCY_ADHOC file""" +class AgencyAdhoc: + """Defines the structure given in the AGENCY_ADHOC file""" + agencyid: Optional[int] = None agencyname: Optional[str] = None - active: Optional[bool] = None - isfederal: Optional[bool] = None + active: Optional[str] = None + isfederal: Optional[str] = None @dataclass -class DomainAdditionalData(): - """Defines the structure given in the given DOMAIN_ADDITIONAL file""" +class DomainAdditionalData: + """Defines the structure given in the DOMAIN_ADDITIONAL file""" + domainname: Optional[str] = None domaintypeid: Optional[int] = None authorityid: Optional[int] = None @@ -22,17 +25,21 @@ class DomainAdditionalData(): dnsseckeymonitor: Optional[str] = None domainpurpose: Optional[str] = None + @dataclass -class DomainTypeAdhoc(): - """Defines the structure given in the given DOMAIN_ADHOC file""" +class DomainTypeAdhoc: + """Defines the structure given in the DOMAIN_ADHOC file""" + domaintypeid: Optional[int] = None domaintype: Optional[str] = None code: Optional[str] = None - active: Optional[bool] = None + active: Optional[str] = None + @dataclass -class OrganizationAdhoc(): - """Defines the structure given in the given ORGANIZATION_ADHOC file""" +class OrganizationAdhoc: + """Defines the structure given in the ORGANIZATION_ADHOC file""" + orgid: Optional[int] = None orgname: Optional[str] = None orgstreet: Optional[str] = None @@ -41,12 +48,17 @@ class OrganizationAdhoc(): orgzip: Optional[str] = None orgcountrycode: Optional[str] = None + class EnumFilenames(Enum): - """Returns a tuple mapping for (filetype, default_file_name). - + """Returns a tuple mapping for (filetype, default_file_name). + For instance, AGENCY_ADHOC = ("agency_adhoc", "agency.adhoc.dotgov.txt") """ + AGENCY_ADHOC = ("agency_adhoc", "agency.adhoc.dotgov.txt") - DOMAIN_ADDITIONAL = ("domain_additional", "domainadditionaldatalink.adhoc.dotgov.txt") + DOMAIN_ADDITIONAL = ( + "domain_additional", + "domainadditionaldatalink.adhoc.dotgov.txt", + ) DOMAIN_ADHOC = ("domain_adhoc", "domaintypes.adhoc.dotgov.txt") - ORGANIZATION_ADHOC = ("organization_adhoc", "organization.adhoc.dotgov.txt") \ No newline at end of file + ORGANIZATION_ADHOC = ("organization_adhoc", "organization.adhoc.dotgov.txt") diff --git a/src/registrar/management/commands/utility/extra_transition_domain.py b/src/registrar/management/commands/utility/extra_transition_domain.py index a54540960..2010fe563 100644 --- a/src/registrar/management/commands/utility/extra_transition_domain.py +++ b/src/registrar/management/commands/utility/extra_transition_domain.py @@ -7,20 +7,26 @@ import logging import os from typing import List -from epp_data_containers import AgencyAdhoc, DomainAdditionalData, DomainTypeAdhoc, OrganizationAdhoc, EnumFilenames +from epp_data_containers import ( + AgencyAdhoc, + DomainAdditionalData, + DomainTypeAdhoc, + OrganizationAdhoc, + EnumFilenames, +) logger = logging.getLogger(__name__) @dataclass -class PatternMap(): +class PatternMap: """Helper class that holds data and metadata about a requested file. - + filename: str -> The desired filename to target. If no filename is given, it is assumed that you are passing in a filename pattern and it will look for a filename that matches the given postfix you pass in. - regex: re.Pattern -> Defines what regex you want to use when inferring + regex: re.Pattern -> Defines what regex you want to use when inferring filenames. If none, no matching occurs. data_type: type -> Metadata about the desired type for data. @@ -31,7 +37,15 @@ class PatternMap(): to cross-reference. """ - def __init__(self, filename: str, regex: re.Pattern, data_type: type, id_field: str, data: dict = {}): + + def __init__( + self, + filename: str, + regex: re.Pattern, + data_type: type, + id_field: str, + data: dict = {}, + ): self.regex = regex self.data_type = data_type self.id_field = id_field @@ -41,16 +55,15 @@ class PatternMap(): _infer = self._infer_filename(self.regex, filename) self.filename = _infer[0] self.could_infer = _infer[1] - def _infer_filename(self, regex: re.Pattern, default_file_name): if not isinstance(regex, re.Pattern): return (self.filename, False) - + match = regex.match(self.filename) if not match: return (self.filename, False) - + date = match.group(1) filename_without_date = match.group(2) @@ -61,17 +74,19 @@ class PatternMap(): full_filename = date + filename_without_date return (full_filename, can_infer) -class ExtraTransitionDomain(): + +class ExtraTransitionDomain: filenames = EnumFilenames - strip_date_regex = re.compile(r'\d+\.(.+)') - - def __init__(self, + strip_date_regex = re.compile(r"\d+\.(.+)") + + def __init__( + self, agency_adhoc_filename=filenames.AGENCY_ADHOC[1], domain_additional_filename=filenames.DOMAIN_ADDITIONAL[1], domain_adhoc_filename=filenames.DOMAIN_ADHOC[1], organization_adhoc_filename=filenames.ORGANIZATION_ADHOC[1], directory="migrationdata", - seperator="|" + seperator="|", ): self.directory = directory self.seperator = seperator @@ -81,18 +96,34 @@ class ExtraTransitionDomain(): self.csv_data = { # (filename, default_url): metadata about the desired file - self.filenames.AGENCY_ADHOC: PatternMap(agency_adhoc_filename, self.strip_date_regex, AgencyAdhoc, "agencyid"), - self.filenames.DOMAIN_ADDITIONAL: PatternMap(domain_additional_filename, self.strip_date_regex, DomainAdditionalData, "domainname"), - self.filenames.DOMAIN_ADHOC: PatternMap(domain_adhoc_filename, self.strip_date_regex, DomainTypeAdhoc, "domaintypeid"), - self.filenames.ORGANIZATION_ADHOC: PatternMap(organization_adhoc_filename, self.strip_date_regex, OrganizationAdhoc, "orgid") + self.filenames.AGENCY_ADHOC: PatternMap( + agency_adhoc_filename, self.strip_date_regex, AgencyAdhoc, "agencyid" + ), + self.filenames.DOMAIN_ADDITIONAL: PatternMap( + domain_additional_filename, + self.strip_date_regex, + DomainAdditionalData, + "domainname", + ), + self.filenames.DOMAIN_ADHOC: PatternMap( + domain_adhoc_filename, + self.strip_date_regex, + DomainTypeAdhoc, + "domaintypeid", + ), + self.filenames.ORGANIZATION_ADHOC: PatternMap( + organization_adhoc_filename, + self.strip_date_regex, + OrganizationAdhoc, + "orgid", + ), } - - def parse_all_files(self, overwrite_existing_data = True): + def parse_all_files(self, overwrite_existing_data=True): """Clears all preexisting data then parses each related CSV file. - - overwrite_existing_data: bool -> Determines if we should clear - csv_data.data if it already exists + + overwrite_existing_data: bool -> Determines if we should clear + csv_data.data if it already exists """ self.clear_csv_data() for item in self.csv_data: @@ -101,16 +132,15 @@ class ExtraTransitionDomain(): if filename in self.all_files_set: file_type.data = self._read_csv_file( - self.all_files_set[filename], - self.seperator, + self.all_files_set[filename], + self.seperator, file_type.data_type, - file_type.id_field + file_type.id_field, ) else: # Log if we can't find the desired file logger.error(f"Could not find file: {filename}") - def clear_csv_data(self): for item in self.csv_data: file_type: PatternMap = item.value @@ -120,4 +150,3 @@ class ExtraTransitionDomain(): with open(file, "r", encoding="utf-8") as requested_file: reader = csv.DictReader(requested_file, delimiter=seperator) return {row[id_field]: dataclass_type(**row) for row in reader} - diff --git a/src/registrar/management/commands/utility/terminal_helper.py b/src/registrar/management/commands/utility/terminal_helper.py index ec7580e21..ad20ad4ca 100644 --- a/src/registrar/management/commands/utility/terminal_helper.py +++ b/src/registrar/management/commands/utility/terminal_helper.py @@ -2,6 +2,7 @@ import logging logger = logging.getLogger(__name__) + class TerminalColors: """Colors for terminal outputs (makes reading the logs WAY easier)""" @@ -17,8 +18,8 @@ class TerminalColors: UNDERLINE = "\033[4m" BackgroundLightYellow = "\033[103m" -class TerminalHelper: +class TerminalHelper: def query_yes_no(question: str, default="yes") -> bool: """Ask a yes/no question via raw_input() and return their answer. @@ -56,4 +57,4 @@ class TerminalHelper: terminal if print_condition is TRUE""" # DEBUG: if print_condition: - logger.info(print_statement) \ No newline at end of file + logger.info(print_statement) diff --git a/src/registrar/migrations/0044_transitiondomain_organization_name.py b/src/registrar/migrations/0044_transitiondomain_organization_name.py deleted file mode 100644 index 07c6ece82..000000000 --- a/src/registrar/migrations/0044_transitiondomain_organization_name.py +++ /dev/null @@ -1,19 +0,0 @@ -# Generated by Django 4.2.6 on 2023-10-27 14:21 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - dependencies = [ - ("registrar", "0043_transitiondomain_federal_agency_and_more"), - ] - - operations = [ - migrations.AddField( - model_name="transitiondomain", - name="organization_name", - field=models.TextField( - blank=True, help_text="Organization name", null=True - ), - ), - ] diff --git a/src/registrar/models/transition_domain.py b/src/registrar/models/transition_domain.py index 6670b7bbd..d95d8e441 100644 --- a/src/registrar/models/transition_domain.py +++ b/src/registrar/models/transition_domain.py @@ -48,11 +48,6 @@ class TransitionDomain(TimeStampedModel): blank=True, help_text="Type of organization", ) - organization_name = models.TextField( - null=True, - blank=True, - help_text="Organization name", - ) federal_type = models.TextField( max_length=50, null=True,