diff --git a/src/registrar/management/commands/cat_files_into_getgov.py b/src/registrar/management/commands/cat_files_into_getgov.py index 9111263a6..5de44ba73 100644 --- a/src/registrar/management/commands/cat_files_into_getgov.py +++ b/src/registrar/management/commands/cat_files_into_getgov.py @@ -1,6 +1,5 @@ """Loads files from /tmp into our sandboxes""" import glob -import csv import logging import os @@ -21,17 +20,19 @@ class Command(BaseCommand): default="txt", help="What file extensions to look for, like txt or gz", ) - parser.add_argument("--directory", default="migrationdata", help="Desired directory") + parser.add_argument( + "--directory", default="migrationdata", help="Desired directory" + ) def handle(self, **options): - file_extension: str = options.get("file_extension").lstrip('.') + file_extension: str = options.get("file_extension").lstrip(".") directory = options.get("directory") # file_extension is always coerced as str, Truthy is OK to use here. if not file_extension or not isinstance(file_extension, str): raise ValueError(f"Invalid file extension '{file_extension}'") - matching_extensions = glob.glob(f'../tmp/*.{file_extension}') + matching_extensions = glob.glob(f"../tmp/*.{file_extension}") if not matching_extensions: logger.error(f"No files with the extension {file_extension} found") @@ -39,25 +40,27 @@ class Command(BaseCommand): filename = os.path.basename(src_file_path) do_command = True exit_status: int - - desired_file_path = f'{directory}/{filename}' + + desired_file_path = f"{directory}/{filename}" if os.path.exists(desired_file_path): - replace = input(f'{desired_file_path} already exists. Do you want to replace it? (y/n) ') - if replace.lower() != 'y': + # For linter + prompt = " Do you want to replace it? (y/n) " + replace = input(f"{desired_file_path} already exists. {prompt}") + if replace.lower() != "y": do_command = False - + if do_command: copy_from = f"../tmp/{filename}" self.cat(copy_from, desired_file_path) - exit_status = os.system(f'cat ../tmp/{filename} > {desired_file_path}') + exit_status = os.system(f"cat ../tmp/{filename} > {desired_file_path}") if exit_status == 0: logger.info(f"Successfully copied {filename}") else: logger.error(f"Failed to copy {filename}") - + def cat(self, copy_from, copy_to): - """Runs the cat command to + """Runs the cat command to copy_from a location to copy_to a location""" - exit_status = os.system(f'cat {copy_from} > {copy_to}') - return exit_status \ No newline at end of file + exit_status = os.system(f"cat {copy_from} > {copy_to}") + return exit_status diff --git a/src/registrar/management/commands/full_domain_migrations.py b/src/registrar/management/commands/full_domain_migrations.py index e2bee9da4..9144675ba 100644 --- a/src/registrar/management/commands/full_domain_migrations.py +++ b/src/registrar/management/commands/full_domain_migrations.py @@ -10,10 +10,6 @@ import argparse import sys import os -from django.test import Client - -from django_fsm import TransitionNotAllowed # type: ignore - from django.core.management import BaseCommand from registrar.models import ( @@ -21,19 +17,26 @@ from registrar.models import ( DomainInformation, DomainInvitation, TransitionDomain, - User, ) from registrar.management.commands.utility.terminal_helper import ( TerminalColors, - TerminalHelper + TerminalHelper, ) logger = logging.getLogger(__name__) + class Command(BaseCommand): help = """ """ + # Specifies which files the loaderFilenames commands should target + default_filenames = [ + "escrow_domain_contacts.daily.gov.GOV.txt", + "escrow_contacts.daily.gov.GOV.txt", + "escrow_domain_statuses.daily.gov.GOV.txt", + ] + def add_arguments(self, parser): """ OPTIONAL ARGUMENTS: @@ -51,8 +54,8 @@ class Command(BaseCommand): > --loaderDirectory /app/tmp --loaderFilenames - The files used for load_transition_domain migration script. - Must appear IN ORDER and separated by spaces: + The files used for load_transition_domain migration script. + Must appear IN ORDER and separated by spaces: EXAMPLE USAGE: > --loaderFilenames domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt where... @@ -74,38 +77,49 @@ class Command(BaseCommand): purposes --resetTable - Used by the loaders to trigger a prompt for deleting all table entries. + Used by the loaders to trigger a prompt for deleting all table entries. Useful for testing purposes, but USE WITH CAUTION - """ + """ # noqa - line length - parser.add_argument("--runLoaders", + parser.add_argument( + "--runLoaders", help="Runs all scripts (in sequence) for transition domain migrations", - action=argparse.BooleanOptionalAction) - - parser.add_argument("--triggerLogins", + action=argparse.BooleanOptionalAction, + ) + + parser.add_argument( + "--triggerLogins", help="Simulates a user login for each user in domain invitation", - action=argparse.BooleanOptionalAction) + action=argparse.BooleanOptionalAction, + ) # The following file arguments have default values for running in the sandbox + # For linter + script = "load_transition_domain migration script" parser.add_argument( "--loaderDirectory", - default="migrationData", - help="The location of the files used for load_transition_domain migration script" + default="migrationdata", + help=f"The location of the files used for the {script}", ) parser.add_argument( "--loaderFilenames", - default="escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt", - help="""The files used for load_transition_domain migration script. - Must appear IN ORDER and separated by spaces: + # For linter. + # Join all of the default filenames with a space. + # The resulting list will look like this: file1.txt file2.txt file3.txt + default=" ".join(self.default_filenames), + help="""The files used for load_transition_domain migration script. + Must appear IN ORDER and separated by spaces: domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt where... - domain_contacts_filename is the Data file with domain contact information - contacts_filename is the Data file with contact information - - domain_statuses_filename is the Data file with domain status information""" + - domain_statuses_filename is the Data file with domain status information""", # noqa - linter length ) - parser.add_argument("--sep", default="|", help="Delimiter character for the loader files") + parser.add_argument( + "--sep", default="|", help="Delimiter character for the loader files" + ) parser.add_argument("--debug", action=argparse.BooleanOptionalAction) @@ -119,22 +133,20 @@ class Command(BaseCommand): action=argparse.BooleanOptionalAction, ) - - def compare_tables(self, debug_on: bool): - """Does a diff between the transition_domain and the following tables: - domain, domain_information and the domain_invitation. - + """Does a diff between the transition_domain and the following tables: + domain, domain_information and the domain_invitation. + Produces the following report (printed to the terminal): #1 - Print any domains that exist in the transition_domain table but not in their corresponding domain, domain information or domain invitation tables. #2 - Print which table this domain is missing from - #3- Check for duplicate entries in domain or - domain_information tables and print which are + #3- Check for duplicate entries in domain or + domain_information tables and print which are duplicates and in which tables - """ - + """ + logger.info( f"""{TerminalColors.OKCYAN} ============= BEGINNING ANALYSIS =============== @@ -142,13 +154,13 @@ class Command(BaseCommand): """ ) - #TODO: would filteredRelation be faster? + # TODO: would filteredRelation be faster? missing_domains = [] duplicate_domains = [] missing_domain_informations = [] missing_domain_invites = [] - for transition_domain in TransitionDomain.objects.all():# DEBUG: + for transition_domain in TransitionDomain.objects.all(): # DEBUG: transition_domain_name = transition_domain.domain_name transition_domain_email = transition_domain.username @@ -160,22 +172,38 @@ class Command(BaseCommand): # Check Domain table matching_domains = Domain.objects.filter(name=transition_domain_name) # Check Domain Information table - matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name) + matching_domain_informations = DomainInformation.objects.filter( + domain__name=transition_domain_name + ) # Check Domain Invitation table - matching_domain_invitations = DomainInvitation.objects.filter(email=transition_domain_email.lower(), - domain__name=transition_domain_name) + matching_domain_invitations = DomainInvitation.objects.filter( + email=transition_domain_email.lower(), + domain__name=transition_domain_name, + ) if len(matching_domains) == 0: - TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain{TerminalColors.ENDC}""") + TerminalHelper.print_conditional( + debug_on, + f"""{TerminalColors.YELLOW}Missing Domain{TerminalColors.ENDC}""", # noqa - line length + ) missing_domains.append(transition_domain_name) elif len(matching_domains) > 1: - TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Duplicate Domain{TerminalColors.ENDC}""") + TerminalHelper.print_conditional( + debug_on, + f"""{TerminalColors.YELLOW}Duplicate Domain{TerminalColors.ENDC}""", # noqa - line length + ) duplicate_domains.append(transition_domain_name) if len(matching_domain_informations) == 0: - TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Information{TerminalColors.ENDC}""") + TerminalHelper.print_conditional( + debug_on, + f"""{TerminalColors.YELLOW}Missing Domain Information{TerminalColors.ENDC}""", # noqa - line length + ) missing_domain_informations.append(transition_domain_name) if len(matching_domain_invitations) == 0: - TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Invitation{TerminalColors.ENDC}""") + TerminalHelper.print_conditional( + debug_on, + f"""{TerminalColors.YELLOW}Missing Domain Invitation{TerminalColors.ENDC}""", # noqa - line length + ) missing_domain_invites.append(transition_domain_name) total_missing_domains = len(missing_domains) @@ -184,10 +212,16 @@ class Command(BaseCommand): total_missing_domain_invitations = len(missing_domain_invites) missing_domains_as_string = "{}".format(", ".join(map(str, missing_domains))) - duplicate_domains_as_string = "{}".format(", ".join(map(str, duplicate_domains))) - missing_domain_informations_as_string = "{}".format(", ".join(map(str, missing_domain_informations))) - missing_domain_invites_as_string = "{}".format(", ".join(map(str, missing_domain_invites))) - + duplicate_domains_as_string = "{}".format( + ", ".join(map(str, duplicate_domains)) + ) + missing_domain_informations_as_string = "{}".format( + ", ".join(map(str, missing_domain_informations)) + ) + missing_domain_invites_as_string = "{}".format( + ", ".join(map(str, missing_domain_invites)) + ) + logger.info( f"""{TerminalColors.OKGREEN} ============= FINISHED ANALYSIS =============== @@ -208,9 +242,9 @@ class Command(BaseCommand): (These are transition domains which have no entires in the Domain Invitation Table) {TerminalColors.YELLOW}{missing_domain_invites_as_string}{TerminalColors.OKGREEN} {TerminalColors.ENDC} - """ + """ # noqa - line length ) - + def prompt_for_execution(self, command_string: str, prompt_title: str) -> bool: """Prompts the user to inspect the given terminal command string and asks if they wish to execute it. If the user responds (y), @@ -236,35 +270,39 @@ class Command(BaseCommand): # Otherwise, exit this subroutine. if not proceed_execution: sys.exit() - + self.execute_command(command_string) return True - - def execute_command(self, command_string:str): + + def execute_command(self, command_string: str): """Executes the given command string""" - logger.info(f"""{TerminalColors.OKCYAN} + logger.info( + f"""{TerminalColors.OKCYAN} ==== EXECUTING... ==== - {TerminalColors.ENDC}""") + {TerminalColors.ENDC}""" + ) os.system(f"{command_string}") - - def run_load_transition_domain_script(self, - file_location: str, - domain_contacts_filename: str, - contacts_filename: str, - domain_statuses_filename: str, - sep: str, - reset_table: bool, - debug_on: bool, - prompts_enabled: bool, - debug_max_entries_to_parse: int): + + def run_load_transition_domain_script( + self, + file_location: str, + domain_contacts_filename: str, + contacts_filename: str, + domain_statuses_filename: str, + sep: str, + reset_table: bool, + debug_on: bool, + prompts_enabled: bool, + debug_max_entries_to_parse: int, + ): """Runs the load_transition_domain script""" # Create the command string command_string = "./manage.py load_transition_domain " - command_string += file_location+domain_contacts_filename + " " - command_string += file_location+contacts_filename + " " - command_string += file_location+domain_statuses_filename + " " + command_string += file_location + domain_contacts_filename + " " + command_string += file_location + contacts_filename + " " + command_string += file_location + domain_statuses_filename + " " if sep is not None and sep != "|": command_string += f"--sep {sep} " if reset_table: @@ -276,12 +314,13 @@ class Command(BaseCommand): # Execute the command string if prompts_enabled: - self.prompt_for_execution(command_string, "Running load_transition_domain script") + self.prompt_for_execution( + command_string, "Running load_transition_domain script" + ) return self.execute_command(command_string) - - - def run_transfer_script(self, debug_on:bool, prompts_enabled: bool): + + def run_transfer_script(self, debug_on: bool, prompts_enabled: bool): """Runs the transfer_transition_domains_to_domains script""" # Create the command string command_string = "./manage.py transfer_transition_domains_to_domains " @@ -289,43 +328,42 @@ class Command(BaseCommand): command_string += "--debug " # Execute the command string if prompts_enabled: - self.prompt_for_execution(command_string, "Running transfer_transition_domains_to_domains script") + self.prompt_for_execution( + command_string, "Running transfer_transition_domains_to_domains script" + ) return self.execute_command(command_string) - def run_send_invites_script(self, debug_on: bool, prompts_enabled: bool): """Runs the send_domain_invitations script""" # Create the command string... command_string = "./manage.py send_domain_invitations -s" # Execute the command string if prompts_enabled: - self.prompt_for_execution(command_string, "Running send_domain_invitations script") + self.prompt_for_execution( + command_string, "Running send_domain_invitations script" + ) return self.execute_command(command_string) + def run_migration_scripts(self, prompts_enabled: bool, options): + """Runs the following migration scripts (in order): + 1 - imports for trans domains + 2 - transfer to domain & domain invitation""" - def run_migration_scripts(self, - prompts_enabled: bool, - options): - """Runs the following migration scripts (in order): - 1 - imports for trans domains - 2 - transfer to domain & domain invitation""" - # Get arguments sep = options.get("sep") reset_table = options.get("resetTable") debug_on = options.get("debug") - debug_max_entries_to_parse = int( - options.get("limitParse") - ) + debug_max_entries_to_parse = int(options.get("limitParse")) # Grab filepath information from the arguments - file_location = options.get("loaderDirectory")+"/" + file_location = options.get("loaderDirectory") + "/" filenames = options.get("loaderFilenames").split() if len(filenames) < 3: filenames_as_string = "{}".format(", ".join(map(str, filenames))) - logger.info(f""" + logger.info( + f""" {TerminalColors.FAIL} --loaderFilenames expected 3 filenames to follow it, but only {len(filenames)} were given: @@ -334,7 +372,8 @@ class Command(BaseCommand): PLEASE MODIFY THE SCRIPT AND TRY RUNNING IT AGAIN ============= TERMINATING ============= {TerminalColors.ENDC} - """) + """ + ) sys.exit() domain_contacts_filename = filenames[0] contacts_filename = filenames[1] @@ -362,45 +401,49 @@ class Command(BaseCommand): ) # If the user rejected the filepath information - # as incorrect, prompt the user to provide + # as incorrect, prompt the user to provide # correct file inputs in their original command # prompt and exit this subroutine if not files_are_correct: - logger.info(f""" + logger.info( + f""" {TerminalColors.YELLOW} PLEASE Re-Run the script with the correct file location and filenames: EXAMPLE: docker compose run -T app ./manage.py test_domain_migration --runLoaders --loaderDirectory /app/tmp --loaderFilenames escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt - """) + """ # noqa - line length + ) return - - # Proceed executing the migration scripts - self.run_load_transition_domain_script(file_location, - domain_contacts_filename, - contacts_filename, - domain_statuses_filename, - sep, - reset_table, - debug_on, - prompts_enabled, - debug_max_entries_to_parse) - self.run_transfer_script(debug_on, prompts_enabled) + # Proceed executing the migration scripts + self.run_load_transition_domain_script( + file_location, + domain_contacts_filename, + contacts_filename, + domain_statuses_filename, + sep, + reset_table, + debug_on, + prompts_enabled, + debug_max_entries_to_parse, + ) + self.run_transfer_script(debug_on, prompts_enabled) def simulate_user_logins(self, debug_on): """Simulates logins for users (this will add Domain Information objects to our tables)""" - logger.info(f"" - f"{TerminalColors.OKCYAN}" - f"================== SIMULATING LOGINS ==================" - f"{TerminalColors.ENDC}") - + logger.info( + f"" + f"{TerminalColors.OKCYAN}" + f"================== SIMULATING LOGINS ==================" + f"{TerminalColors.ENDC}" + ) + command_string = "python ./manage.py test registrar.tests.test_transition_domain_migrations_wiuth_logins.TestLogins.test_user_logins" - - + # for invite in DomainInvitation.objects.all(): #TODO: limit to our stuff # #DEBUG: # TerminalHelper.print_conditional(debug_on, @@ -419,7 +462,6 @@ class Command(BaseCommand): # logger.info(f"""{TerminalColors.YELLOW}(Deleting temporary user object){TerminalColors.ENDC}""") # user.delete() - def handle( self, **options, @@ -430,7 +472,7 @@ class Command(BaseCommand): 2 - simulate logins 3 - send domain invitations (Emails should be sent to the appropriate users note that all moved domains should now be accessible - on django admin for an analyst) + on django admin for an analyst) 4 - analyze the data for transition domains and generate a report """ @@ -442,34 +484,34 @@ class Command(BaseCommand): # the terminal so the user knows what is # enabled. debug_on = options.get("debug") - prompts_enabled = debug_on #TODO: add as argument? + prompts_enabled = debug_on # TODO: add as argument? run_loaders_enabled = options.get("runLoaders") simulate_user_login_enabled = options.get("triggerLogins") TerminalHelper.print_conditional( - debug_on, - f"""{TerminalColors.OKCYAN} + debug_on, + f"""{TerminalColors.OKCYAN} ----------DEBUG MODE ON---------- Detailed print statements activated. {TerminalColors.ENDC} - """ - ) + """, + ) TerminalHelper.print_conditional( - run_loaders_enabled, - f"""{TerminalColors.OKCYAN} + run_loaders_enabled, + f"""{TerminalColors.OKCYAN} ----------RUNNING LOADERS ON---------- All migration scripts will be run before analyzing the data. {TerminalColors.ENDC} - """ - ) + """, + ) TerminalHelper.print_conditional( - run_loaders_enabled, - f"""{TerminalColors.OKCYAN} + run_loaders_enabled, + f"""{TerminalColors.OKCYAN} ----------TRIGGER LOGINS ON---------- Will be simulating user logins {TerminalColors.ENDC} - """ - ) + """, + ) # If a user decides to run all migration # scripts, they may or may not wish to # proceed with analysis of the data depending @@ -486,7 +528,7 @@ class Command(BaseCommand): if run_loaders_enabled: self.run_migration_scripts(options, prompts_enabled) prompt_continuation_of_analysis = True - + # STEP 2 -- SIMULATE LOGINS # Simulate user login for each user in domain # invitation if specified by user OR if running @@ -500,7 +542,7 @@ class Command(BaseCommand): if run_loaders_enabled: if prompts_enabled: simulate_user_login_enabled = TerminalHelper.query_yes_no( - f"""{TerminalColors.FAIL} + f"""{TerminalColors.FAIL} Proceed with simulating user logins? {TerminalColors.ENDC}""" ) @@ -508,14 +550,14 @@ class Command(BaseCommand): return self.simulate_user_logins(debug_on) prompt_continuation_of_analysis = True - + # STEP 3 -- SEND INVITES if prompts_enabled: proceed_with_sending_invites = TerminalHelper.query_yes_no( f"""{TerminalColors.FAIL} Proceed with sending user invites? {TerminalColors.ENDC}""" - ) + ) if not proceed_with_sending_invites: return self.run_send_invites_script(debug_on) diff --git a/src/registrar/management/commands/load_transition_domain.py b/src/registrar/management/commands/load_transition_domain.py index ee1e8c4ea..b2eefae54 100644 --- a/src/registrar/management/commands/load_transition_domain.py +++ b/src/registrar/management/commands/load_transition_domain.py @@ -12,7 +12,7 @@ from registrar.models import TransitionDomain from registrar.management.commands.utility.terminal_helper import ( TerminalColors, - TerminalHelper + TerminalHelper, ) logger = logging.getLogger(__name__) diff --git a/src/registrar/management/commands/test_domain_migration.py b/src/registrar/management/commands/test_domain_migration.py deleted file mode 100644 index 781de38e0..000000000 --- a/src/registrar/management/commands/test_domain_migration.py +++ /dev/null @@ -1,534 +0,0 @@ -"""Data migration: - 1 - generates a report of data integrity across all - transition domain related tables - 2 - allows users to run all migration scripts for - transition domain data -""" - -import logging -import argparse -import sys -import os - -from django.test import Client - -from django_fsm import TransitionNotAllowed # type: ignore - -from django.core.management import BaseCommand - -from registrar.models import ( - Domain, - DomainInformation, - DomainInvitation, - TransitionDomain, - User, -) - -from registrar.management.commands.utility.terminal_helper import ( - TerminalColors, - TerminalHelper -) - -logger = logging.getLogger(__name__) - -class Command(BaseCommand): - help = """ """ - - def add_arguments(self, parser): - """ - OPTIONAL ARGUMENTS: - --runLoaders - A boolean (default to true), which triggers running - all scripts (in sequence) for transition domain migrations - - --triggerLogins - A boolean (default to true), which triggers running - simulations of user logins for each user in domain invitation - - --loaderDirectory - The location of the files used for load_transition_domain migration script - EXAMPLE USAGE: - > --loaderDirectory /app/tmp - - --loaderFilenames - The files used for load_transition_domain migration script. - Must appear IN ORDER and separated by spaces: - EXAMPLE USAGE: - > --loaderFilenames domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt - where... - - domain_contacts_filename is the Data file with domain contact information - - contacts_filename is the Data file with contact information - - domain_statuses_filename is the Data file with domain status information - - --sep - Delimiter for the loaders to correctly parse the given text files. - (usually this can remain at default value of |) - - --debug - A boolean (default to true), which activates additional print statements - - --limitParse - Used by the loaders (load_transition_domain) to set the limit for the - number of data entries to insert. Set to 0 (or just don't use this - argument) to parse every entry. This was provided primarily for testing - purposes - - --resetTable - Used by the loaders to trigger a prompt for deleting all table entries. - Useful for testing purposes, but USE WITH CAUTION - """ - - parser.add_argument("--runLoaders", - help="Runs all scripts (in sequence) for transition domain migrations", - action=argparse.BooleanOptionalAction) - - parser.add_argument("--triggerLogins", - help="Simulates a user login for each user in domain invitation", - action=argparse.BooleanOptionalAction) - - # The following file arguments have default values for running in the sandbox - parser.add_argument( - "--loaderDirectory", - default="migrationData", - help="The location of the files used for load_transition_domain migration script" - ) - parser.add_argument( - "--loaderFilenames", - default="escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt", - help="""The files used for load_transition_domain migration script. - Must appear IN ORDER and separated by spaces: - domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt - - where... - - domain_contacts_filename is the Data file with domain contact information - - contacts_filename is the Data file with contact information - - domain_statuses_filename is the Data file with domain status information""" - ) - - parser.add_argument("--sep", default="|", help="Delimiter character for the loader files") - - parser.add_argument("--debug", action=argparse.BooleanOptionalAction) - - parser.add_argument( - "--limitParse", default=0, help="Sets max number of entries to load" - ) - - parser.add_argument( - "--resetTable", - help="Deletes all data in the TransitionDomain table", - action=argparse.BooleanOptionalAction, - ) - - - - def compare_tables(self, debug_on: bool): - """Does a diff between the transition_domain and the following tables: - domain, domain_information and the domain_invitation. - - Produces the following report (printed to the terminal): - #1 - Print any domains that exist in the transition_domain table - but not in their corresponding domain, domain information or - domain invitation tables. - #2 - Print which table this domain is missing from - #3- Check for duplicate entries in domain or - domain_information tables and print which are - duplicates and in which tables - """ - - logger.info( - f"""{TerminalColors.OKCYAN} - ============= BEGINNING ANALYSIS =============== - {TerminalColors.ENDC} - """ - ) - - #TODO: would filteredRelation be faster? - - missing_domains = [] - duplicate_domains = [] - missing_domain_informations = [] - missing_domain_invites = [] - for transition_domain in TransitionDomain.objects.all():# DEBUG: - transition_domain_name = transition_domain.domain_name - transition_domain_email = transition_domain.username - - TerminalHelper.print_conditional( - debug_on, - f"{TerminalColors.OKCYAN}Checking: {transition_domain_name} {TerminalColors.ENDC}", # noqa - ) - - # Check Domain table - matching_domains = Domain.objects.filter(name=transition_domain_name) - # Check Domain Information table - matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name) - # Check Domain Invitation table - matching_domain_invitations = DomainInvitation.objects.filter(email=transition_domain_email.lower(), - domain__name=transition_domain_name) - - if len(matching_domains) == 0: - TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain{TerminalColors.ENDC}""") - missing_domains.append(transition_domain_name) - elif len(matching_domains) > 1: - TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Duplicate Domain{TerminalColors.ENDC}""") - duplicate_domains.append(transition_domain_name) - if len(matching_domain_informations) == 0: - TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Information{TerminalColors.ENDC}""") - missing_domain_informations.append(transition_domain_name) - if len(matching_domain_invitations) == 0: - TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Invitation{TerminalColors.ENDC}""") - missing_domain_invites.append(transition_domain_name) - - total_missing_domains = len(missing_domains) - total_duplicate_domains = len(duplicate_domains) - total_missing_domain_informations = len(missing_domain_informations) - total_missing_domain_invitations = len(missing_domain_invites) - - missing_domains_as_string = "{}".format(", ".join(map(str, missing_domains))) - duplicate_domains_as_string = "{}".format(", ".join(map(str, duplicate_domains))) - missing_domain_informations_as_string = "{}".format(", ".join(map(str, missing_domain_informations))) - missing_domain_invites_as_string = "{}".format(", ".join(map(str, missing_domain_invites))) - - logger.info( - f"""{TerminalColors.OKGREEN} - ============= FINISHED ANALYSIS =============== - - {total_missing_domains} Missing Domains: - (These are transition domains that are missing from the Domain Table) - {TerminalColors.YELLOW}{missing_domains_as_string}{TerminalColors.OKGREEN} - - {total_duplicate_domains} Duplicate Domains: - (These are transition domains which have duplicate entries in the Domain Table) - {TerminalColors.YELLOW}{duplicate_domains_as_string}{TerminalColors.OKGREEN} - - {total_missing_domain_informations} Domain Information Entries missing: - (These are transition domains which have no entries in the Domain Information Table) - {TerminalColors.YELLOW}{missing_domain_informations_as_string}{TerminalColors.OKGREEN} - - {total_missing_domain_invitations} Domain Invitations missing: - (These are transition domains which have no entires in the Domain Invitation Table) - {TerminalColors.YELLOW}{missing_domain_invites_as_string}{TerminalColors.OKGREEN} - {TerminalColors.ENDC} - """ - ) - - def prompt_for_execution(self, command_string: str, prompt_title: str) -> bool: - """Prompts the user to inspect the given terminal command string - and asks if they wish to execute it. If the user responds (y), - execute the command""" - - # Allow the user to inspect the command string - # and ask if they wish to proceed - proceed_execution = TerminalHelper.query_yes_no( - f"""{TerminalColors.OKCYAN} - ===================================================== - {prompt_title} - ===================================================== - *** IMPORTANT: VERIFY THE FOLLOWING COMMAND LOOKS CORRECT *** - - {command_string} - {TerminalColors.FAIL} - Proceed? (Y = proceed, N = skip) - {TerminalColors.ENDC}""" - ) - - # If the user decided to proceed executing the command, - # run the command for loading transition domains. - # Otherwise, exit this subroutine. - if not proceed_execution: - sys.exit() - - self.execute_command(command_string) - - return True - - def execute_command(self, command_string:str): - """Executes the given command string""" - - logger.info(f"""{TerminalColors.OKCYAN} - ==== EXECUTING... ==== - {TerminalColors.ENDC}""") - os.system(f"{command_string}") - - def run_load_transition_domain_script(self, - file_location: str, - domain_contacts_filename: str, - contacts_filename: str, - domain_statuses_filename: str, - sep: str, - reset_table: bool, - debug_on: bool, - prompts_enabled: bool, - debug_max_entries_to_parse: int): - """Runs the load_transition_domain script""" - # Create the command string - command_string = "./manage.py load_transition_domain " - command_string += file_location+domain_contacts_filename + " " - command_string += file_location+contacts_filename + " " - command_string += file_location+domain_statuses_filename + " " - if sep is not None and sep != "|": - command_string += f"--sep {sep} " - if reset_table: - command_string += "--resetTable " - if debug_on: - command_string += "--debug " - if debug_max_entries_to_parse > 0: - command_string += f"--limitParse {debug_max_entries_to_parse} " - - # Execute the command string - if prompts_enabled: - self.prompt_for_execution(command_string, "Running load_transition_domain script") - return - self.execute_command(command_string) - - - def run_transfer_script(self, debug_on:bool, prompts_enabled: bool): - """Runs the transfer_transition_domains_to_domains script""" - # Create the command string - command_string = "./manage.py transfer_transition_domains_to_domains " - if debug_on: - command_string += "--debug " - # Execute the command string - if prompts_enabled: - self.prompt_for_execution(command_string, "Running transfer_transition_domains_to_domains script") - return - self.execute_command(command_string) - - - def run_send_invites_script(self, debug_on: bool, prompts_enabled: bool): - """Runs the send_domain_invitations script""" - # Create the command string... - command_string = "./manage.py send_domain_invitations -s" - # Execute the command string - if prompts_enabled: - self.prompt_for_execution(command_string, "Running send_domain_invitations script") - return - self.execute_command(command_string) - - - def run_migration_scripts(self, - prompts_enabled: bool, - options): - """Runs the following migration scripts (in order): - 1 - imports for trans domains - 2 - transfer to domain & domain invitation""" - - # Get arguments - sep = options.get("sep") - reset_table = options.get("resetTable") - debug_on = options.get("debug") - debug_max_entries_to_parse = int( - options.get("limitParse") - ) - - # Grab filepath information from the arguments - file_location = options.get("loaderDirectory")+"/" - filenames = options.get("loaderFilenames").split() - if len(filenames) < 3: - filenames_as_string = "{}".format(", ".join(map(str, filenames))) - logger.info(f""" - {TerminalColors.FAIL} - --loaderFilenames expected 3 filenames to follow it, - but only {len(filenames)} were given: - {filenames_as_string} - - PLEASE MODIFY THE SCRIPT AND TRY RUNNING IT AGAIN - ============= TERMINATING ============= - {TerminalColors.ENDC} - """) - sys.exit() - domain_contacts_filename = filenames[0] - contacts_filename = filenames[1] - domain_statuses_filename = filenames[2] - - if prompts_enabled: - # Allow the user to inspect the filepath - # data given in the arguments, and prompt - # the user to verify this info before proceeding - files_are_correct = TerminalHelper.query_yes_no( - f""" - {TerminalColors.OKCYAN} - *** IMPORTANT: VERIFY THE FOLLOWING *** - - The migration scripts are looking in directory.... - {file_location} - - ....for the following files: - - domain contacts: {domain_contacts_filename} - - contacts: {contacts_filename} - - domain statuses: {domain_statuses_filename} - - {TerminalColors.FAIL} - Does this look correct?{TerminalColors.ENDC}""" - ) - - # If the user rejected the filepath information - # as incorrect, prompt the user to provide - # correct file inputs in their original command - # prompt and exit this subroutine - if not files_are_correct: - logger.info(f""" - {TerminalColors.YELLOW} - PLEASE Re-Run the script with the correct file location and filenames: - - EXAMPLE: - docker compose run -T app ./manage.py test_domain_migration --runLoaders --loaderDirectory /app/tmp --loaderFilenames escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt - - """) - return - - # Proceed executing the migration scripts - self.run_load_transition_domain_script(file_location, - domain_contacts_filename, - contacts_filename, - domain_statuses_filename, - sep, - reset_table, - debug_on, - prompts_enabled, - debug_max_entries_to_parse) - self.run_transfer_script(debug_on, prompts_enabled) - - - def simulate_user_logins(self, debug_on): - """Simulates logins for users (this will add - Domain Information objects to our tables)""" - - logger.info(f"" - f"{TerminalColors.OKCYAN}" - f"================== SIMULATING LOGINS ==================" - f"{TerminalColors.ENDC}") - - - - # for invite in DomainInvitation.objects.all(): #TODO: limit to our stuff - # #DEBUG: - # TerminalHelper.print_conditional(debug_on, - # f"{TerminalColors.OKCYAN}" - # f"Processing invite: {invite}" - # f"{TerminalColors.ENDC}") - # # get a user with this email address - # user, user_created = User.objects.get_or_create(email=invite.email, username=invite.email) - # #DEBUG: - # TerminalHelper.print_conditional(user_created, - # f"""{TerminalColors.OKCYAN}No user found (creating temporary user object){TerminalColors.ENDC}""") - # TerminalHelper.print_conditional(debug_on, - # f"""{TerminalColors.OKCYAN}Executing first-time login for user: {user}{TerminalColors.ENDC}""") - # user.first_login() - # if user_created: - # logger.info(f"""{TerminalColors.YELLOW}(Deleting temporary user object){TerminalColors.ENDC}""") - # user.delete() - - - def handle( - self, - **options, - ): - """ - Does the following; - 1 - run loader scripts - 2 - simulate logins - 3 - send domain invitations (Emails should be sent to the appropriate users - note that all moved domains should now be accessible - on django admin for an analyst) - 4 - analyze the data for transition domains - and generate a report - """ - - # SETUP - # Grab all arguments relevant to - # orchestrating which parts of this script - # should execute. Print some indicators to - # the terminal so the user knows what is - # enabled. - debug_on = options.get("debug") - prompts_enabled = debug_on #TODO: add as argument? - run_loaders_enabled = options.get("runLoaders") - simulate_user_login_enabled = options.get("triggerLogins") - TerminalHelper.print_conditional( - debug_on, - f"""{TerminalColors.OKCYAN} - ----------DEBUG MODE ON---------- - Detailed print statements activated. - {TerminalColors.ENDC} - """ - ) - TerminalHelper.print_conditional( - run_loaders_enabled, - f"""{TerminalColors.OKCYAN} - ----------RUNNING LOADERS ON---------- - All migration scripts will be run before - analyzing the data. - {TerminalColors.ENDC} - """ - ) - TerminalHelper.print_conditional( - run_loaders_enabled, - f"""{TerminalColors.OKCYAN} - ----------TRIGGER LOGINS ON---------- - Will be simulating user logins - {TerminalColors.ENDC} - """ - ) - # If a user decides to run all migration - # scripts, they may or may not wish to - # proceed with analysis of the data depending - # on the results of the migration. - # Provide a breakpoint for them to decide - # whether to continue or not. - # The same will happen if simulating user - # logins (to allow users to run only that - # portion of the script if desired) - prompt_continuation_of_analysis = False - - # STEP 1 -- RUN LOADERS - # Run migration scripts if specified by user - if run_loaders_enabled: - self.run_migration_scripts(options, prompts_enabled) - prompt_continuation_of_analysis = True - - # STEP 2 -- SIMULATE LOGINS - # Simulate user login for each user in domain - # invitation if specified by user OR if running - # migration scripts. - # (NOTE: Although users can choose to run login - # simulations separately (for testing purposes), - # if we are running all migration scripts, we should - # automatically execute this as the final step - # to ensure Domain Information objects get added - # to the database.) - if run_loaders_enabled: - if prompts_enabled: - simulate_user_login_enabled = TerminalHelper.query_yes_no( - f"""{TerminalColors.FAIL} - Proceed with simulating user logins? - {TerminalColors.ENDC}""" - ) - if not simulate_user_login_enabled: - return - self.simulate_user_logins(debug_on) - prompt_continuation_of_analysis = True - - # STEP 3 -- SEND INVITES - if prompts_enabled: - proceed_with_sending_invites = TerminalHelper.query_yes_no( - f"""{TerminalColors.FAIL} - Proceed with sending user invites? - {TerminalColors.ENDC}""" - ) - if not proceed_with_sending_invites: - return - self.run_send_invites_script(debug_on) - prompt_continuation_of_analysis = True - - # STEP 4 -- ANALYZE TABLES & GENERATE REPORT - # Analyze tables for corrupt data... - if prompt_continuation_of_analysis & prompts_enabled: - # ^ (only prompt if we ran steps 1 and/or 2) - analyze_tables = TerminalHelper.query_yes_no( - f"""{TerminalColors.FAIL} - Proceed with table analysis? - {TerminalColors.ENDC}""" - ) - if not analyze_tables: - return - self.compare_tables(debug_on) diff --git a/src/registrar/management/commands/transfer_transition_domains_to_domains.py b/src/registrar/management/commands/transfer_transition_domains_to_domains.py index 9443f085f..056b8d447 100644 --- a/src/registrar/management/commands/transfer_transition_domains_to_domains.py +++ b/src/registrar/management/commands/transfer_transition_domains_to_domains.py @@ -12,7 +12,7 @@ from registrar.models import DomainInvitation from registrar.management.commands.utility.terminal_helper import ( TerminalColors, - TerminalHelper + TerminalHelper, ) logger = logging.getLogger(__name__) @@ -57,7 +57,6 @@ class Command(BaseCommand): """, ) - def update_domain_status( self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool ) -> bool: diff --git a/src/registrar/management/commands/utility/terminal_helper.py b/src/registrar/management/commands/utility/terminal_helper.py index 3f647f0b8..99064346d 100644 --- a/src/registrar/management/commands/utility/terminal_helper.py +++ b/src/registrar/management/commands/utility/terminal_helper.py @@ -2,6 +2,7 @@ import logging logger = logging.getLogger(__name__) + class TerminalColors: """Colors for terminal outputs (makes reading the logs WAY easier)""" @@ -18,8 +19,8 @@ class TerminalColors: UNDERLINE = "\033[4m" BackgroundLightYellow = "\033[103m" -class TerminalHelper: +class TerminalHelper: def query_yes_no(question: str, default="yes") -> bool: """Ask a yes/no question via raw_input() and return their answer. @@ -57,4 +58,4 @@ class TerminalHelper: terminal if print_condition is TRUE""" # DEBUG: if print_condition: - logger.info(print_statement) \ No newline at end of file + logger.info(print_statement) diff --git a/src/registrar/tests/test_transition_domain_migrations_with_logins.py b/src/registrar/tests/test_transition_domain_migrations_with_logins.py index da2c31877..4b723bc8a 100644 --- a/src/registrar/tests/test_transition_domain_migrations_with_logins.py +++ b/src/registrar/tests/test_transition_domain_migrations_with_logins.py @@ -1,26 +1,13 @@ from django.test import TestCase -from django.db.utils import IntegrityError -from unittest.mock import patch from registrar.models import ( - Contact, - DomainApplication, - DomainInformation, User, - Website, Domain, - DraftDomain, DomainInvitation, UserDomainRole, ) -import boto3_mocking # type: ignore -from .common import MockSESClient, less_console_noise, completed_application -from django_fsm import TransitionNotAllowed -boto3_mocking.clients.register_handler("sesv2", MockSESClient) - -@boto3_mocking.patching class TestLogins(TestCase): """Test the retrieval of invitations."""