Linter things

Removed old test_domain_migration.py file, and linted many items.
This commit is contained in:
zandercymatics 2023-10-25 09:06:36 -06:00
parent 9468876197
commit 53c74a766f
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
7 changed files with 190 additions and 692 deletions

View file

@ -1,6 +1,5 @@
"""Loads files from /tmp into our sandboxes""" """Loads files from /tmp into our sandboxes"""
import glob import glob
import csv
import logging import logging
import os import os
@ -21,17 +20,19 @@ class Command(BaseCommand):
default="txt", default="txt",
help="What file extensions to look for, like txt or gz", help="What file extensions to look for, like txt or gz",
) )
parser.add_argument("--directory", default="migrationdata", help="Desired directory") parser.add_argument(
"--directory", default="migrationdata", help="Desired directory"
)
def handle(self, **options): def handle(self, **options):
file_extension: str = options.get("file_extension").lstrip('.') file_extension: str = options.get("file_extension").lstrip(".")
directory = options.get("directory") directory = options.get("directory")
# file_extension is always coerced as str, Truthy is OK to use here. # file_extension is always coerced as str, Truthy is OK to use here.
if not file_extension or not isinstance(file_extension, str): if not file_extension or not isinstance(file_extension, str):
raise ValueError(f"Invalid file extension '{file_extension}'") raise ValueError(f"Invalid file extension '{file_extension}'")
matching_extensions = glob.glob(f'../tmp/*.{file_extension}') matching_extensions = glob.glob(f"../tmp/*.{file_extension}")
if not matching_extensions: if not matching_extensions:
logger.error(f"No files with the extension {file_extension} found") logger.error(f"No files with the extension {file_extension} found")
@ -39,25 +40,27 @@ class Command(BaseCommand):
filename = os.path.basename(src_file_path) filename = os.path.basename(src_file_path)
do_command = True do_command = True
exit_status: int exit_status: int
desired_file_path = f'{directory}/{filename}' desired_file_path = f"{directory}/{filename}"
if os.path.exists(desired_file_path): if os.path.exists(desired_file_path):
replace = input(f'{desired_file_path} already exists. Do you want to replace it? (y/n) ') # For linter
if replace.lower() != 'y': prompt = " Do you want to replace it? (y/n) "
replace = input(f"{desired_file_path} already exists. {prompt}")
if replace.lower() != "y":
do_command = False do_command = False
if do_command: if do_command:
copy_from = f"../tmp/{filename}" copy_from = f"../tmp/{filename}"
self.cat(copy_from, desired_file_path) self.cat(copy_from, desired_file_path)
exit_status = os.system(f'cat ../tmp/{filename} > {desired_file_path}') exit_status = os.system(f"cat ../tmp/{filename} > {desired_file_path}")
if exit_status == 0: if exit_status == 0:
logger.info(f"Successfully copied {filename}") logger.info(f"Successfully copied {filename}")
else: else:
logger.error(f"Failed to copy {filename}") logger.error(f"Failed to copy {filename}")
def cat(self, copy_from, copy_to): def cat(self, copy_from, copy_to):
"""Runs the cat command to """Runs the cat command to
copy_from a location to copy_to a location""" copy_from a location to copy_to a location"""
exit_status = os.system(f'cat {copy_from} > {copy_to}') exit_status = os.system(f"cat {copy_from} > {copy_to}")
return exit_status return exit_status

View file

@ -10,10 +10,6 @@ import argparse
import sys import sys
import os import os
from django.test import Client
from django_fsm import TransitionNotAllowed # type: ignore
from django.core.management import BaseCommand from django.core.management import BaseCommand
from registrar.models import ( from registrar.models import (
@ -21,19 +17,26 @@ from registrar.models import (
DomainInformation, DomainInformation,
DomainInvitation, DomainInvitation,
TransitionDomain, TransitionDomain,
User,
) )
from registrar.management.commands.utility.terminal_helper import ( from registrar.management.commands.utility.terminal_helper import (
TerminalColors, TerminalColors,
TerminalHelper TerminalHelper,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Command(BaseCommand): class Command(BaseCommand):
help = """ """ help = """ """
# Specifies which files the loaderFilenames commands should target
default_filenames = [
"escrow_domain_contacts.daily.gov.GOV.txt",
"escrow_contacts.daily.gov.GOV.txt",
"escrow_domain_statuses.daily.gov.GOV.txt",
]
def add_arguments(self, parser): def add_arguments(self, parser):
""" """
OPTIONAL ARGUMENTS: OPTIONAL ARGUMENTS:
@ -51,8 +54,8 @@ class Command(BaseCommand):
> --loaderDirectory /app/tmp > --loaderDirectory /app/tmp
--loaderFilenames --loaderFilenames
The files used for load_transition_domain migration script. The files used for load_transition_domain migration script.
Must appear IN ORDER and separated by spaces: Must appear IN ORDER and separated by spaces:
EXAMPLE USAGE: EXAMPLE USAGE:
> --loaderFilenames domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt > --loaderFilenames domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt
where... where...
@ -74,38 +77,49 @@ class Command(BaseCommand):
purposes purposes
--resetTable --resetTable
Used by the loaders to trigger a prompt for deleting all table entries. Used by the loaders to trigger a prompt for deleting all table entries.
Useful for testing purposes, but USE WITH CAUTION Useful for testing purposes, but USE WITH CAUTION
""" """ # noqa - line length
parser.add_argument("--runLoaders", parser.add_argument(
"--runLoaders",
help="Runs all scripts (in sequence) for transition domain migrations", help="Runs all scripts (in sequence) for transition domain migrations",
action=argparse.BooleanOptionalAction) action=argparse.BooleanOptionalAction,
)
parser.add_argument("--triggerLogins",
parser.add_argument(
"--triggerLogins",
help="Simulates a user login for each user in domain invitation", help="Simulates a user login for each user in domain invitation",
action=argparse.BooleanOptionalAction) action=argparse.BooleanOptionalAction,
)
# The following file arguments have default values for running in the sandbox # The following file arguments have default values for running in the sandbox
# For linter
script = "load_transition_domain migration script"
parser.add_argument( parser.add_argument(
"--loaderDirectory", "--loaderDirectory",
default="migrationData", default="migrationdata",
help="The location of the files used for load_transition_domain migration script" help=f"The location of the files used for the {script}",
) )
parser.add_argument( parser.add_argument(
"--loaderFilenames", "--loaderFilenames",
default="escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt", # For linter.
help="""The files used for load_transition_domain migration script. # Join all of the default filenames with a space.
Must appear IN ORDER and separated by spaces: # The resulting list will look like this: file1.txt file2.txt file3.txt
default=" ".join(self.default_filenames),
help="""The files used for load_transition_domain migration script.
Must appear IN ORDER and separated by spaces:
domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt
where... where...
- domain_contacts_filename is the Data file with domain contact information - domain_contacts_filename is the Data file with domain contact information
- contacts_filename is the Data file with contact information - contacts_filename is the Data file with contact information
- domain_statuses_filename is the Data file with domain status information""" - domain_statuses_filename is the Data file with domain status information""", # noqa - linter length
) )
parser.add_argument("--sep", default="|", help="Delimiter character for the loader files") parser.add_argument(
"--sep", default="|", help="Delimiter character for the loader files"
)
parser.add_argument("--debug", action=argparse.BooleanOptionalAction) parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
@ -119,22 +133,20 @@ class Command(BaseCommand):
action=argparse.BooleanOptionalAction, action=argparse.BooleanOptionalAction,
) )
def compare_tables(self, debug_on: bool): def compare_tables(self, debug_on: bool):
"""Does a diff between the transition_domain and the following tables: """Does a diff between the transition_domain and the following tables:
domain, domain_information and the domain_invitation. domain, domain_information and the domain_invitation.
Produces the following report (printed to the terminal): Produces the following report (printed to the terminal):
#1 - Print any domains that exist in the transition_domain table #1 - Print any domains that exist in the transition_domain table
but not in their corresponding domain, domain information or but not in their corresponding domain, domain information or
domain invitation tables. domain invitation tables.
#2 - Print which table this domain is missing from #2 - Print which table this domain is missing from
#3- Check for duplicate entries in domain or #3- Check for duplicate entries in domain or
domain_information tables and print which are domain_information tables and print which are
duplicates and in which tables duplicates and in which tables
""" """
logger.info( logger.info(
f"""{TerminalColors.OKCYAN} f"""{TerminalColors.OKCYAN}
============= BEGINNING ANALYSIS =============== ============= BEGINNING ANALYSIS ===============
@ -142,13 +154,13 @@ class Command(BaseCommand):
""" """
) )
#TODO: would filteredRelation be faster? # TODO: would filteredRelation be faster?
missing_domains = [] missing_domains = []
duplicate_domains = [] duplicate_domains = []
missing_domain_informations = [] missing_domain_informations = []
missing_domain_invites = [] missing_domain_invites = []
for transition_domain in TransitionDomain.objects.all():# DEBUG: for transition_domain in TransitionDomain.objects.all(): # DEBUG:
transition_domain_name = transition_domain.domain_name transition_domain_name = transition_domain.domain_name
transition_domain_email = transition_domain.username transition_domain_email = transition_domain.username
@ -160,22 +172,38 @@ class Command(BaseCommand):
# Check Domain table # Check Domain table
matching_domains = Domain.objects.filter(name=transition_domain_name) matching_domains = Domain.objects.filter(name=transition_domain_name)
# Check Domain Information table # Check Domain Information table
matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name) matching_domain_informations = DomainInformation.objects.filter(
domain__name=transition_domain_name
)
# Check Domain Invitation table # Check Domain Invitation table
matching_domain_invitations = DomainInvitation.objects.filter(email=transition_domain_email.lower(), matching_domain_invitations = DomainInvitation.objects.filter(
domain__name=transition_domain_name) email=transition_domain_email.lower(),
domain__name=transition_domain_name,
)
if len(matching_domains) == 0: if len(matching_domains) == 0:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain{TerminalColors.ENDC}""") TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}Missing Domain{TerminalColors.ENDC}""", # noqa - line length
)
missing_domains.append(transition_domain_name) missing_domains.append(transition_domain_name)
elif len(matching_domains) > 1: elif len(matching_domains) > 1:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Duplicate Domain{TerminalColors.ENDC}""") TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}Duplicate Domain{TerminalColors.ENDC}""", # noqa - line length
)
duplicate_domains.append(transition_domain_name) duplicate_domains.append(transition_domain_name)
if len(matching_domain_informations) == 0: if len(matching_domain_informations) == 0:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Information{TerminalColors.ENDC}""") TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}Missing Domain Information{TerminalColors.ENDC}""", # noqa - line length
)
missing_domain_informations.append(transition_domain_name) missing_domain_informations.append(transition_domain_name)
if len(matching_domain_invitations) == 0: if len(matching_domain_invitations) == 0:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Invitation{TerminalColors.ENDC}""") TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}Missing Domain Invitation{TerminalColors.ENDC}""", # noqa - line length
)
missing_domain_invites.append(transition_domain_name) missing_domain_invites.append(transition_domain_name)
total_missing_domains = len(missing_domains) total_missing_domains = len(missing_domains)
@ -184,10 +212,16 @@ class Command(BaseCommand):
total_missing_domain_invitations = len(missing_domain_invites) total_missing_domain_invitations = len(missing_domain_invites)
missing_domains_as_string = "{}".format(", ".join(map(str, missing_domains))) missing_domains_as_string = "{}".format(", ".join(map(str, missing_domains)))
duplicate_domains_as_string = "{}".format(", ".join(map(str, duplicate_domains))) duplicate_domains_as_string = "{}".format(
missing_domain_informations_as_string = "{}".format(", ".join(map(str, missing_domain_informations))) ", ".join(map(str, duplicate_domains))
missing_domain_invites_as_string = "{}".format(", ".join(map(str, missing_domain_invites))) )
missing_domain_informations_as_string = "{}".format(
", ".join(map(str, missing_domain_informations))
)
missing_domain_invites_as_string = "{}".format(
", ".join(map(str, missing_domain_invites))
)
logger.info( logger.info(
f"""{TerminalColors.OKGREEN} f"""{TerminalColors.OKGREEN}
============= FINISHED ANALYSIS =============== ============= FINISHED ANALYSIS ===============
@ -208,9 +242,9 @@ class Command(BaseCommand):
(These are transition domains which have no entires in the Domain Invitation Table) (These are transition domains which have no entires in the Domain Invitation Table)
{TerminalColors.YELLOW}{missing_domain_invites_as_string}{TerminalColors.OKGREEN} {TerminalColors.YELLOW}{missing_domain_invites_as_string}{TerminalColors.OKGREEN}
{TerminalColors.ENDC} {TerminalColors.ENDC}
""" """ # noqa - line length
) )
def prompt_for_execution(self, command_string: str, prompt_title: str) -> bool: def prompt_for_execution(self, command_string: str, prompt_title: str) -> bool:
"""Prompts the user to inspect the given terminal command string """Prompts the user to inspect the given terminal command string
and asks if they wish to execute it. If the user responds (y), and asks if they wish to execute it. If the user responds (y),
@ -236,35 +270,39 @@ class Command(BaseCommand):
# Otherwise, exit this subroutine. # Otherwise, exit this subroutine.
if not proceed_execution: if not proceed_execution:
sys.exit() sys.exit()
self.execute_command(command_string) self.execute_command(command_string)
return True return True
def execute_command(self, command_string:str): def execute_command(self, command_string: str):
"""Executes the given command string""" """Executes the given command string"""
logger.info(f"""{TerminalColors.OKCYAN} logger.info(
f"""{TerminalColors.OKCYAN}
==== EXECUTING... ==== ==== EXECUTING... ====
{TerminalColors.ENDC}""") {TerminalColors.ENDC}"""
)
os.system(f"{command_string}") os.system(f"{command_string}")
def run_load_transition_domain_script(self, def run_load_transition_domain_script(
file_location: str, self,
domain_contacts_filename: str, file_location: str,
contacts_filename: str, domain_contacts_filename: str,
domain_statuses_filename: str, contacts_filename: str,
sep: str, domain_statuses_filename: str,
reset_table: bool, sep: str,
debug_on: bool, reset_table: bool,
prompts_enabled: bool, debug_on: bool,
debug_max_entries_to_parse: int): prompts_enabled: bool,
debug_max_entries_to_parse: int,
):
"""Runs the load_transition_domain script""" """Runs the load_transition_domain script"""
# Create the command string # Create the command string
command_string = "./manage.py load_transition_domain " command_string = "./manage.py load_transition_domain "
command_string += file_location+domain_contacts_filename + " " command_string += file_location + domain_contacts_filename + " "
command_string += file_location+contacts_filename + " " command_string += file_location + contacts_filename + " "
command_string += file_location+domain_statuses_filename + " " command_string += file_location + domain_statuses_filename + " "
if sep is not None and sep != "|": if sep is not None and sep != "|":
command_string += f"--sep {sep} " command_string += f"--sep {sep} "
if reset_table: if reset_table:
@ -276,12 +314,13 @@ class Command(BaseCommand):
# Execute the command string # Execute the command string
if prompts_enabled: if prompts_enabled:
self.prompt_for_execution(command_string, "Running load_transition_domain script") self.prompt_for_execution(
command_string, "Running load_transition_domain script"
)
return return
self.execute_command(command_string) self.execute_command(command_string)
def run_transfer_script(self, debug_on: bool, prompts_enabled: bool):
def run_transfer_script(self, debug_on:bool, prompts_enabled: bool):
"""Runs the transfer_transition_domains_to_domains script""" """Runs the transfer_transition_domains_to_domains script"""
# Create the command string # Create the command string
command_string = "./manage.py transfer_transition_domains_to_domains " command_string = "./manage.py transfer_transition_domains_to_domains "
@ -289,43 +328,42 @@ class Command(BaseCommand):
command_string += "--debug " command_string += "--debug "
# Execute the command string # Execute the command string
if prompts_enabled: if prompts_enabled:
self.prompt_for_execution(command_string, "Running transfer_transition_domains_to_domains script") self.prompt_for_execution(
command_string, "Running transfer_transition_domains_to_domains script"
)
return return
self.execute_command(command_string) self.execute_command(command_string)
def run_send_invites_script(self, debug_on: bool, prompts_enabled: bool): def run_send_invites_script(self, debug_on: bool, prompts_enabled: bool):
"""Runs the send_domain_invitations script""" """Runs the send_domain_invitations script"""
# Create the command string... # Create the command string...
command_string = "./manage.py send_domain_invitations -s" command_string = "./manage.py send_domain_invitations -s"
# Execute the command string # Execute the command string
if prompts_enabled: if prompts_enabled:
self.prompt_for_execution(command_string, "Running send_domain_invitations script") self.prompt_for_execution(
command_string, "Running send_domain_invitations script"
)
return return
self.execute_command(command_string) self.execute_command(command_string)
def run_migration_scripts(self, prompts_enabled: bool, options):
"""Runs the following migration scripts (in order):
1 - imports for trans domains
2 - transfer to domain & domain invitation"""
def run_migration_scripts(self,
prompts_enabled: bool,
options):
"""Runs the following migration scripts (in order):
1 - imports for trans domains
2 - transfer to domain & domain invitation"""
# Get arguments # Get arguments
sep = options.get("sep") sep = options.get("sep")
reset_table = options.get("resetTable") reset_table = options.get("resetTable")
debug_on = options.get("debug") debug_on = options.get("debug")
debug_max_entries_to_parse = int( debug_max_entries_to_parse = int(options.get("limitParse"))
options.get("limitParse")
)
# Grab filepath information from the arguments # Grab filepath information from the arguments
file_location = options.get("loaderDirectory")+"/" file_location = options.get("loaderDirectory") + "/"
filenames = options.get("loaderFilenames").split() filenames = options.get("loaderFilenames").split()
if len(filenames) < 3: if len(filenames) < 3:
filenames_as_string = "{}".format(", ".join(map(str, filenames))) filenames_as_string = "{}".format(", ".join(map(str, filenames)))
logger.info(f""" logger.info(
f"""
{TerminalColors.FAIL} {TerminalColors.FAIL}
--loaderFilenames expected 3 filenames to follow it, --loaderFilenames expected 3 filenames to follow it,
but only {len(filenames)} were given: but only {len(filenames)} were given:
@ -334,7 +372,8 @@ class Command(BaseCommand):
PLEASE MODIFY THE SCRIPT AND TRY RUNNING IT AGAIN PLEASE MODIFY THE SCRIPT AND TRY RUNNING IT AGAIN
============= TERMINATING ============= ============= TERMINATING =============
{TerminalColors.ENDC} {TerminalColors.ENDC}
""") """
)
sys.exit() sys.exit()
domain_contacts_filename = filenames[0] domain_contacts_filename = filenames[0]
contacts_filename = filenames[1] contacts_filename = filenames[1]
@ -362,45 +401,49 @@ class Command(BaseCommand):
) )
# If the user rejected the filepath information # If the user rejected the filepath information
# as incorrect, prompt the user to provide # as incorrect, prompt the user to provide
# correct file inputs in their original command # correct file inputs in their original command
# prompt and exit this subroutine # prompt and exit this subroutine
if not files_are_correct: if not files_are_correct:
logger.info(f""" logger.info(
f"""
{TerminalColors.YELLOW} {TerminalColors.YELLOW}
PLEASE Re-Run the script with the correct file location and filenames: PLEASE Re-Run the script with the correct file location and filenames:
EXAMPLE: EXAMPLE:
docker compose run -T app ./manage.py test_domain_migration --runLoaders --loaderDirectory /app/tmp --loaderFilenames escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt docker compose run -T app ./manage.py test_domain_migration --runLoaders --loaderDirectory /app/tmp --loaderFilenames escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt
""") """ # noqa - line length
)
return return
# Proceed executing the migration scripts
self.run_load_transition_domain_script(file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
prompts_enabled,
debug_max_entries_to_parse)
self.run_transfer_script(debug_on, prompts_enabled)
# Proceed executing the migration scripts
self.run_load_transition_domain_script(
file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
prompts_enabled,
debug_max_entries_to_parse,
)
self.run_transfer_script(debug_on, prompts_enabled)
def simulate_user_logins(self, debug_on): def simulate_user_logins(self, debug_on):
"""Simulates logins for users (this will add """Simulates logins for users (this will add
Domain Information objects to our tables)""" Domain Information objects to our tables)"""
logger.info(f"" logger.info(
f"{TerminalColors.OKCYAN}" f""
f"================== SIMULATING LOGINS ==================" f"{TerminalColors.OKCYAN}"
f"{TerminalColors.ENDC}") f"================== SIMULATING LOGINS =================="
f"{TerminalColors.ENDC}"
)
command_string = "python ./manage.py test registrar.tests.test_transition_domain_migrations_wiuth_logins.TestLogins.test_user_logins" command_string = "python ./manage.py test registrar.tests.test_transition_domain_migrations_wiuth_logins.TestLogins.test_user_logins"
# for invite in DomainInvitation.objects.all(): #TODO: limit to our stuff # for invite in DomainInvitation.objects.all(): #TODO: limit to our stuff
# #DEBUG: # #DEBUG:
# TerminalHelper.print_conditional(debug_on, # TerminalHelper.print_conditional(debug_on,
@ -419,7 +462,6 @@ class Command(BaseCommand):
# logger.info(f"""{TerminalColors.YELLOW}(Deleting temporary user object){TerminalColors.ENDC}""") # logger.info(f"""{TerminalColors.YELLOW}(Deleting temporary user object){TerminalColors.ENDC}""")
# user.delete() # user.delete()
def handle( def handle(
self, self,
**options, **options,
@ -430,7 +472,7 @@ class Command(BaseCommand):
2 - simulate logins 2 - simulate logins
3 - send domain invitations (Emails should be sent to the appropriate users 3 - send domain invitations (Emails should be sent to the appropriate users
note that all moved domains should now be accessible note that all moved domains should now be accessible
on django admin for an analyst) on django admin for an analyst)
4 - analyze the data for transition domains 4 - analyze the data for transition domains
and generate a report and generate a report
""" """
@ -442,34 +484,34 @@ class Command(BaseCommand):
# the terminal so the user knows what is # the terminal so the user knows what is
# enabled. # enabled.
debug_on = options.get("debug") debug_on = options.get("debug")
prompts_enabled = debug_on #TODO: add as argument? prompts_enabled = debug_on # TODO: add as argument?
run_loaders_enabled = options.get("runLoaders") run_loaders_enabled = options.get("runLoaders")
simulate_user_login_enabled = options.get("triggerLogins") simulate_user_login_enabled = options.get("triggerLogins")
TerminalHelper.print_conditional( TerminalHelper.print_conditional(
debug_on, debug_on,
f"""{TerminalColors.OKCYAN} f"""{TerminalColors.OKCYAN}
----------DEBUG MODE ON---------- ----------DEBUG MODE ON----------
Detailed print statements activated. Detailed print statements activated.
{TerminalColors.ENDC} {TerminalColors.ENDC}
""" """,
) )
TerminalHelper.print_conditional( TerminalHelper.print_conditional(
run_loaders_enabled, run_loaders_enabled,
f"""{TerminalColors.OKCYAN} f"""{TerminalColors.OKCYAN}
----------RUNNING LOADERS ON---------- ----------RUNNING LOADERS ON----------
All migration scripts will be run before All migration scripts will be run before
analyzing the data. analyzing the data.
{TerminalColors.ENDC} {TerminalColors.ENDC}
""" """,
) )
TerminalHelper.print_conditional( TerminalHelper.print_conditional(
run_loaders_enabled, run_loaders_enabled,
f"""{TerminalColors.OKCYAN} f"""{TerminalColors.OKCYAN}
----------TRIGGER LOGINS ON---------- ----------TRIGGER LOGINS ON----------
Will be simulating user logins Will be simulating user logins
{TerminalColors.ENDC} {TerminalColors.ENDC}
""" """,
) )
# If a user decides to run all migration # If a user decides to run all migration
# scripts, they may or may not wish to # scripts, they may or may not wish to
# proceed with analysis of the data depending # proceed with analysis of the data depending
@ -486,7 +528,7 @@ class Command(BaseCommand):
if run_loaders_enabled: if run_loaders_enabled:
self.run_migration_scripts(options, prompts_enabled) self.run_migration_scripts(options, prompts_enabled)
prompt_continuation_of_analysis = True prompt_continuation_of_analysis = True
# STEP 2 -- SIMULATE LOGINS # STEP 2 -- SIMULATE LOGINS
# Simulate user login for each user in domain # Simulate user login for each user in domain
# invitation if specified by user OR if running # invitation if specified by user OR if running
@ -500,7 +542,7 @@ class Command(BaseCommand):
if run_loaders_enabled: if run_loaders_enabled:
if prompts_enabled: if prompts_enabled:
simulate_user_login_enabled = TerminalHelper.query_yes_no( simulate_user_login_enabled = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL} f"""{TerminalColors.FAIL}
Proceed with simulating user logins? Proceed with simulating user logins?
{TerminalColors.ENDC}""" {TerminalColors.ENDC}"""
) )
@ -508,14 +550,14 @@ class Command(BaseCommand):
return return
self.simulate_user_logins(debug_on) self.simulate_user_logins(debug_on)
prompt_continuation_of_analysis = True prompt_continuation_of_analysis = True
# STEP 3 -- SEND INVITES # STEP 3 -- SEND INVITES
if prompts_enabled: if prompts_enabled:
proceed_with_sending_invites = TerminalHelper.query_yes_no( proceed_with_sending_invites = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL} f"""{TerminalColors.FAIL}
Proceed with sending user invites? Proceed with sending user invites?
{TerminalColors.ENDC}""" {TerminalColors.ENDC}"""
) )
if not proceed_with_sending_invites: if not proceed_with_sending_invites:
return return
self.run_send_invites_script(debug_on) self.run_send_invites_script(debug_on)

View file

@ -12,7 +12,7 @@ from registrar.models import TransitionDomain
from registrar.management.commands.utility.terminal_helper import ( from registrar.management.commands.utility.terminal_helper import (
TerminalColors, TerminalColors,
TerminalHelper TerminalHelper,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View file

@ -1,534 +0,0 @@
"""Data migration:
1 - generates a report of data integrity across all
transition domain related tables
2 - allows users to run all migration scripts for
transition domain data
"""
import logging
import argparse
import sys
import os
from django.test import Client
from django_fsm import TransitionNotAllowed # type: ignore
from django.core.management import BaseCommand
from registrar.models import (
Domain,
DomainInformation,
DomainInvitation,
TransitionDomain,
User,
)
from registrar.management.commands.utility.terminal_helper import (
TerminalColors,
TerminalHelper
)
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = """ """
def add_arguments(self, parser):
"""
OPTIONAL ARGUMENTS:
--runLoaders
A boolean (default to true), which triggers running
all scripts (in sequence) for transition domain migrations
--triggerLogins
A boolean (default to true), which triggers running
simulations of user logins for each user in domain invitation
--loaderDirectory
The location of the files used for load_transition_domain migration script
EXAMPLE USAGE:
> --loaderDirectory /app/tmp
--loaderFilenames
The files used for load_transition_domain migration script.
Must appear IN ORDER and separated by spaces:
EXAMPLE USAGE:
> --loaderFilenames domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt
where...
- domain_contacts_filename is the Data file with domain contact information
- contacts_filename is the Data file with contact information
- domain_statuses_filename is the Data file with domain status information
--sep
Delimiter for the loaders to correctly parse the given text files.
(usually this can remain at default value of |)
--debug
A boolean (default to true), which activates additional print statements
--limitParse
Used by the loaders (load_transition_domain) to set the limit for the
number of data entries to insert. Set to 0 (or just don't use this
argument) to parse every entry. This was provided primarily for testing
purposes
--resetTable
Used by the loaders to trigger a prompt for deleting all table entries.
Useful for testing purposes, but USE WITH CAUTION
"""
parser.add_argument("--runLoaders",
help="Runs all scripts (in sequence) for transition domain migrations",
action=argparse.BooleanOptionalAction)
parser.add_argument("--triggerLogins",
help="Simulates a user login for each user in domain invitation",
action=argparse.BooleanOptionalAction)
# The following file arguments have default values for running in the sandbox
parser.add_argument(
"--loaderDirectory",
default="migrationData",
help="The location of the files used for load_transition_domain migration script"
)
parser.add_argument(
"--loaderFilenames",
default="escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt",
help="""The files used for load_transition_domain migration script.
Must appear IN ORDER and separated by spaces:
domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt
where...
- domain_contacts_filename is the Data file with domain contact information
- contacts_filename is the Data file with contact information
- domain_statuses_filename is the Data file with domain status information"""
)
parser.add_argument("--sep", default="|", help="Delimiter character for the loader files")
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
parser.add_argument(
"--limitParse", default=0, help="Sets max number of entries to load"
)
parser.add_argument(
"--resetTable",
help="Deletes all data in the TransitionDomain table",
action=argparse.BooleanOptionalAction,
)
def compare_tables(self, debug_on: bool):
"""Does a diff between the transition_domain and the following tables:
domain, domain_information and the domain_invitation.
Produces the following report (printed to the terminal):
#1 - Print any domains that exist in the transition_domain table
but not in their corresponding domain, domain information or
domain invitation tables.
#2 - Print which table this domain is missing from
#3- Check for duplicate entries in domain or
domain_information tables and print which are
duplicates and in which tables
"""
logger.info(
f"""{TerminalColors.OKCYAN}
============= BEGINNING ANALYSIS ===============
{TerminalColors.ENDC}
"""
)
#TODO: would filteredRelation be faster?
missing_domains = []
duplicate_domains = []
missing_domain_informations = []
missing_domain_invites = []
for transition_domain in TransitionDomain.objects.all():# DEBUG:
transition_domain_name = transition_domain.domain_name
transition_domain_email = transition_domain.username
TerminalHelper.print_conditional(
debug_on,
f"{TerminalColors.OKCYAN}Checking: {transition_domain_name} {TerminalColors.ENDC}", # noqa
)
# Check Domain table
matching_domains = Domain.objects.filter(name=transition_domain_name)
# Check Domain Information table
matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name)
# Check Domain Invitation table
matching_domain_invitations = DomainInvitation.objects.filter(email=transition_domain_email.lower(),
domain__name=transition_domain_name)
if len(matching_domains) == 0:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain{TerminalColors.ENDC}""")
missing_domains.append(transition_domain_name)
elif len(matching_domains) > 1:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Duplicate Domain{TerminalColors.ENDC}""")
duplicate_domains.append(transition_domain_name)
if len(matching_domain_informations) == 0:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Information{TerminalColors.ENDC}""")
missing_domain_informations.append(transition_domain_name)
if len(matching_domain_invitations) == 0:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Invitation{TerminalColors.ENDC}""")
missing_domain_invites.append(transition_domain_name)
total_missing_domains = len(missing_domains)
total_duplicate_domains = len(duplicate_domains)
total_missing_domain_informations = len(missing_domain_informations)
total_missing_domain_invitations = len(missing_domain_invites)
missing_domains_as_string = "{}".format(", ".join(map(str, missing_domains)))
duplicate_domains_as_string = "{}".format(", ".join(map(str, duplicate_domains)))
missing_domain_informations_as_string = "{}".format(", ".join(map(str, missing_domain_informations)))
missing_domain_invites_as_string = "{}".format(", ".join(map(str, missing_domain_invites)))
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ANALYSIS ===============
{total_missing_domains} Missing Domains:
(These are transition domains that are missing from the Domain Table)
{TerminalColors.YELLOW}{missing_domains_as_string}{TerminalColors.OKGREEN}
{total_duplicate_domains} Duplicate Domains:
(These are transition domains which have duplicate entries in the Domain Table)
{TerminalColors.YELLOW}{duplicate_domains_as_string}{TerminalColors.OKGREEN}
{total_missing_domain_informations} Domain Information Entries missing:
(These are transition domains which have no entries in the Domain Information Table)
{TerminalColors.YELLOW}{missing_domain_informations_as_string}{TerminalColors.OKGREEN}
{total_missing_domain_invitations} Domain Invitations missing:
(These are transition domains which have no entires in the Domain Invitation Table)
{TerminalColors.YELLOW}{missing_domain_invites_as_string}{TerminalColors.OKGREEN}
{TerminalColors.ENDC}
"""
)
def prompt_for_execution(self, command_string: str, prompt_title: str) -> bool:
"""Prompts the user to inspect the given terminal command string
and asks if they wish to execute it. If the user responds (y),
execute the command"""
# Allow the user to inspect the command string
# and ask if they wish to proceed
proceed_execution = TerminalHelper.query_yes_no(
f"""{TerminalColors.OKCYAN}
=====================================================
{prompt_title}
=====================================================
*** IMPORTANT: VERIFY THE FOLLOWING COMMAND LOOKS CORRECT ***
{command_string}
{TerminalColors.FAIL}
Proceed? (Y = proceed, N = skip)
{TerminalColors.ENDC}"""
)
# If the user decided to proceed executing the command,
# run the command for loading transition domains.
# Otherwise, exit this subroutine.
if not proceed_execution:
sys.exit()
self.execute_command(command_string)
return True
def execute_command(self, command_string:str):
"""Executes the given command string"""
logger.info(f"""{TerminalColors.OKCYAN}
==== EXECUTING... ====
{TerminalColors.ENDC}""")
os.system(f"{command_string}")
def run_load_transition_domain_script(self,
file_location: str,
domain_contacts_filename: str,
contacts_filename: str,
domain_statuses_filename: str,
sep: str,
reset_table: bool,
debug_on: bool,
prompts_enabled: bool,
debug_max_entries_to_parse: int):
"""Runs the load_transition_domain script"""
# Create the command string
command_string = "./manage.py load_transition_domain "
command_string += file_location+domain_contacts_filename + " "
command_string += file_location+contacts_filename + " "
command_string += file_location+domain_statuses_filename + " "
if sep is not None and sep != "|":
command_string += f"--sep {sep} "
if reset_table:
command_string += "--resetTable "
if debug_on:
command_string += "--debug "
if debug_max_entries_to_parse > 0:
command_string += f"--limitParse {debug_max_entries_to_parse} "
# Execute the command string
if prompts_enabled:
self.prompt_for_execution(command_string, "Running load_transition_domain script")
return
self.execute_command(command_string)
def run_transfer_script(self, debug_on:bool, prompts_enabled: bool):
"""Runs the transfer_transition_domains_to_domains script"""
# Create the command string
command_string = "./manage.py transfer_transition_domains_to_domains "
if debug_on:
command_string += "--debug "
# Execute the command string
if prompts_enabled:
self.prompt_for_execution(command_string, "Running transfer_transition_domains_to_domains script")
return
self.execute_command(command_string)
def run_send_invites_script(self, debug_on: bool, prompts_enabled: bool):
"""Runs the send_domain_invitations script"""
# Create the command string...
command_string = "./manage.py send_domain_invitations -s"
# Execute the command string
if prompts_enabled:
self.prompt_for_execution(command_string, "Running send_domain_invitations script")
return
self.execute_command(command_string)
def run_migration_scripts(self,
prompts_enabled: bool,
options):
"""Runs the following migration scripts (in order):
1 - imports for trans domains
2 - transfer to domain & domain invitation"""
# Get arguments
sep = options.get("sep")
reset_table = options.get("resetTable")
debug_on = options.get("debug")
debug_max_entries_to_parse = int(
options.get("limitParse")
)
# Grab filepath information from the arguments
file_location = options.get("loaderDirectory")+"/"
filenames = options.get("loaderFilenames").split()
if len(filenames) < 3:
filenames_as_string = "{}".format(", ".join(map(str, filenames)))
logger.info(f"""
{TerminalColors.FAIL}
--loaderFilenames expected 3 filenames to follow it,
but only {len(filenames)} were given:
{filenames_as_string}
PLEASE MODIFY THE SCRIPT AND TRY RUNNING IT AGAIN
============= TERMINATING =============
{TerminalColors.ENDC}
""")
sys.exit()
domain_contacts_filename = filenames[0]
contacts_filename = filenames[1]
domain_statuses_filename = filenames[2]
if prompts_enabled:
# Allow the user to inspect the filepath
# data given in the arguments, and prompt
# the user to verify this info before proceeding
files_are_correct = TerminalHelper.query_yes_no(
f"""
{TerminalColors.OKCYAN}
*** IMPORTANT: VERIFY THE FOLLOWING ***
The migration scripts are looking in directory....
{file_location}
....for the following files:
- domain contacts: {domain_contacts_filename}
- contacts: {contacts_filename}
- domain statuses: {domain_statuses_filename}
{TerminalColors.FAIL}
Does this look correct?{TerminalColors.ENDC}"""
)
# If the user rejected the filepath information
# as incorrect, prompt the user to provide
# correct file inputs in their original command
# prompt and exit this subroutine
if not files_are_correct:
logger.info(f"""
{TerminalColors.YELLOW}
PLEASE Re-Run the script with the correct file location and filenames:
EXAMPLE:
docker compose run -T app ./manage.py test_domain_migration --runLoaders --loaderDirectory /app/tmp --loaderFilenames escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt
""")
return
# Proceed executing the migration scripts
self.run_load_transition_domain_script(file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
prompts_enabled,
debug_max_entries_to_parse)
self.run_transfer_script(debug_on, prompts_enabled)
def simulate_user_logins(self, debug_on):
"""Simulates logins for users (this will add
Domain Information objects to our tables)"""
logger.info(f""
f"{TerminalColors.OKCYAN}"
f"================== SIMULATING LOGINS =================="
f"{TerminalColors.ENDC}")
# for invite in DomainInvitation.objects.all(): #TODO: limit to our stuff
# #DEBUG:
# TerminalHelper.print_conditional(debug_on,
# f"{TerminalColors.OKCYAN}"
# f"Processing invite: {invite}"
# f"{TerminalColors.ENDC}")
# # get a user with this email address
# user, user_created = User.objects.get_or_create(email=invite.email, username=invite.email)
# #DEBUG:
# TerminalHelper.print_conditional(user_created,
# f"""{TerminalColors.OKCYAN}No user found (creating temporary user object){TerminalColors.ENDC}""")
# TerminalHelper.print_conditional(debug_on,
# f"""{TerminalColors.OKCYAN}Executing first-time login for user: {user}{TerminalColors.ENDC}""")
# user.first_login()
# if user_created:
# logger.info(f"""{TerminalColors.YELLOW}(Deleting temporary user object){TerminalColors.ENDC}""")
# user.delete()
def handle(
self,
**options,
):
"""
Does the following;
1 - run loader scripts
2 - simulate logins
3 - send domain invitations (Emails should be sent to the appropriate users
note that all moved domains should now be accessible
on django admin for an analyst)
4 - analyze the data for transition domains
and generate a report
"""
# SETUP
# Grab all arguments relevant to
# orchestrating which parts of this script
# should execute. Print some indicators to
# the terminal so the user knows what is
# enabled.
debug_on = options.get("debug")
prompts_enabled = debug_on #TODO: add as argument?
run_loaders_enabled = options.get("runLoaders")
simulate_user_login_enabled = options.get("triggerLogins")
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.OKCYAN}
----------DEBUG MODE ON----------
Detailed print statements activated.
{TerminalColors.ENDC}
"""
)
TerminalHelper.print_conditional(
run_loaders_enabled,
f"""{TerminalColors.OKCYAN}
----------RUNNING LOADERS ON----------
All migration scripts will be run before
analyzing the data.
{TerminalColors.ENDC}
"""
)
TerminalHelper.print_conditional(
run_loaders_enabled,
f"""{TerminalColors.OKCYAN}
----------TRIGGER LOGINS ON----------
Will be simulating user logins
{TerminalColors.ENDC}
"""
)
# If a user decides to run all migration
# scripts, they may or may not wish to
# proceed with analysis of the data depending
# on the results of the migration.
# Provide a breakpoint for them to decide
# whether to continue or not.
# The same will happen if simulating user
# logins (to allow users to run only that
# portion of the script if desired)
prompt_continuation_of_analysis = False
# STEP 1 -- RUN LOADERS
# Run migration scripts if specified by user
if run_loaders_enabled:
self.run_migration_scripts(options, prompts_enabled)
prompt_continuation_of_analysis = True
# STEP 2 -- SIMULATE LOGINS
# Simulate user login for each user in domain
# invitation if specified by user OR if running
# migration scripts.
# (NOTE: Although users can choose to run login
# simulations separately (for testing purposes),
# if we are running all migration scripts, we should
# automatically execute this as the final step
# to ensure Domain Information objects get added
# to the database.)
if run_loaders_enabled:
if prompts_enabled:
simulate_user_login_enabled = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL}
Proceed with simulating user logins?
{TerminalColors.ENDC}"""
)
if not simulate_user_login_enabled:
return
self.simulate_user_logins(debug_on)
prompt_continuation_of_analysis = True
# STEP 3 -- SEND INVITES
if prompts_enabled:
proceed_with_sending_invites = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL}
Proceed with sending user invites?
{TerminalColors.ENDC}"""
)
if not proceed_with_sending_invites:
return
self.run_send_invites_script(debug_on)
prompt_continuation_of_analysis = True
# STEP 4 -- ANALYZE TABLES & GENERATE REPORT
# Analyze tables for corrupt data...
if prompt_continuation_of_analysis & prompts_enabled:
# ^ (only prompt if we ran steps 1 and/or 2)
analyze_tables = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL}
Proceed with table analysis?
{TerminalColors.ENDC}"""
)
if not analyze_tables:
return
self.compare_tables(debug_on)

View file

@ -12,7 +12,7 @@ from registrar.models import DomainInvitation
from registrar.management.commands.utility.terminal_helper import ( from registrar.management.commands.utility.terminal_helper import (
TerminalColors, TerminalColors,
TerminalHelper TerminalHelper,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -57,7 +57,6 @@ class Command(BaseCommand):
""", """,
) )
def update_domain_status( def update_domain_status(
self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool
) -> bool: ) -> bool:

View file

@ -2,6 +2,7 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class TerminalColors: class TerminalColors:
"""Colors for terminal outputs """Colors for terminal outputs
(makes reading the logs WAY easier)""" (makes reading the logs WAY easier)"""
@ -18,8 +19,8 @@ class TerminalColors:
UNDERLINE = "\033[4m" UNDERLINE = "\033[4m"
BackgroundLightYellow = "\033[103m" BackgroundLightYellow = "\033[103m"
class TerminalHelper:
class TerminalHelper:
def query_yes_no(question: str, default="yes") -> bool: def query_yes_no(question: str, default="yes") -> bool:
"""Ask a yes/no question via raw_input() and return their answer. """Ask a yes/no question via raw_input() and return their answer.
@ -57,4 +58,4 @@ class TerminalHelper:
terminal if print_condition is TRUE""" terminal if print_condition is TRUE"""
# DEBUG: # DEBUG:
if print_condition: if print_condition:
logger.info(print_statement) logger.info(print_statement)

View file

@ -1,26 +1,13 @@
from django.test import TestCase from django.test import TestCase
from django.db.utils import IntegrityError
from unittest.mock import patch
from registrar.models import ( from registrar.models import (
Contact,
DomainApplication,
DomainInformation,
User, User,
Website,
Domain, Domain,
DraftDomain,
DomainInvitation, DomainInvitation,
UserDomainRole, UserDomainRole,
) )
import boto3_mocking # type: ignore
from .common import MockSESClient, less_console_noise, completed_application
from django_fsm import TransitionNotAllowed
boto3_mocking.clients.register_handler("sesv2", MockSESClient)
@boto3_mocking.patching
class TestLogins(TestCase): class TestLogins(TestCase):
"""Test the retrieval of invitations.""" """Test the retrieval of invitations."""