Ran black for linting

This commit is contained in:
zandercymatics 2023-10-30 08:07:27 -06:00
parent b236be7f43
commit a33efbe772
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
4 changed files with 285 additions and 229 deletions

View file

@ -100,8 +100,7 @@ class Command(BaseCommand):
return domain_status_dictionary
def get_user_emails_dict(
self,
contacts_filename: str, sep
self, contacts_filename: str, sep
) -> defaultdict[str, str]:
"""Creates mapping of userId -> emails"""
user_emails_dictionary = defaultdict(str)

View file

@ -27,11 +27,12 @@ from registrar.models import (
from registrar.management.commands.utility.terminal_helper import (
TerminalColors,
TerminalHelper
TerminalHelper,
)
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = """ """
@ -78,24 +79,28 @@ class Command(BaseCommand):
--resetTable
Used by the migration scripts to trigger a prompt for deleting all table entries.
Useful for testing purposes, but USE WITH CAUTION
"""
""" # noqa - line length, impacts readability
parser.add_argument("--runMigrations",
parser.add_argument(
"--runMigrations",
help="Runs all scripts (in sequence) for transition domain migrations",
action=argparse.BooleanOptionalAction)
action=argparse.BooleanOptionalAction,
)
# --triggerLogins
# A boolean (default to true), which triggers running
# simulations of user logins for each user in domain invitation
parser.add_argument("--triggerLogins",
parser.add_argument(
"--triggerLogins",
help="Simulates a user login for each user in domain invitation",
action=argparse.BooleanOptionalAction)
action=argparse.BooleanOptionalAction,
)
# The following file arguments have default values for running in the sandbox
parser.add_argument(
"--migrationDirectory",
default="migrationData",
help="The location of the files used for load_transition_domain migration script"
help="The location of the files used for load_transition_domain migration script",
)
parser.add_argument(
"--migrationFilenames",
@ -107,10 +112,12 @@ class Command(BaseCommand):
where...
- domain_contacts_filename is the Data file with domain contact information
- contacts_filename is the Data file with contact information
- domain_statuses_filename is the Data file with domain status information"""
- domain_statuses_filename is the Data file with domain status information""",
)
parser.add_argument("--sep", default="|", help="Delimiter character for the migration files")
parser.add_argument(
"--sep", default="|", help="Delimiter character for the migration files"
)
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
@ -147,13 +154,13 @@ class Command(BaseCommand):
"""
)
#TODO: would filteredRelation be faster?
# TODO: would filteredRelation be faster?
missing_domains = []
duplicate_domains = []
missing_domain_informations = []
missing_domain_invites = []
for transition_domain in TransitionDomain.objects.all():# DEBUG:
for transition_domain in TransitionDomain.objects.all(): # DEBUG:
transition_domain_name = transition_domain.domain_name
transition_domain_email = transition_domain.username
@ -165,22 +172,38 @@ class Command(BaseCommand):
# Check Domain table
matching_domains = Domain.objects.filter(name=transition_domain_name)
# Check Domain Information table
matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name)
matching_domain_informations = DomainInformation.objects.filter(
domain__name=transition_domain_name
)
# Check Domain Invitation table
matching_domain_invitations = DomainInvitation.objects.filter(email=transition_domain_email.lower(),
domain__name=transition_domain_name)
matching_domain_invitations = DomainInvitation.objects.filter(
email=transition_domain_email.lower(),
domain__name=transition_domain_name,
)
if len(matching_domains) == 0:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain{TerminalColors.ENDC}""")
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}Missing Domain{TerminalColors.ENDC}""",
)
missing_domains.append(transition_domain_name)
elif len(matching_domains) > 1:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Duplicate Domain{TerminalColors.ENDC}""")
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}Duplicate Domain{TerminalColors.ENDC}""",
)
duplicate_domains.append(transition_domain_name)
if len(matching_domain_informations) == 0:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Information{TerminalColors.ENDC}""")
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}Missing Domain Information{TerminalColors.ENDC}""",
)
missing_domain_informations.append(transition_domain_name)
if len(matching_domain_invitations) == 0:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Invitation{TerminalColors.ENDC}""")
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}Missing Domain Invitation{TerminalColors.ENDC}""",
)
missing_domain_invites.append(transition_domain_name)
total_missing_domains = len(missing_domains)
@ -189,9 +212,15 @@ class Command(BaseCommand):
total_missing_domain_invitations = len(missing_domain_invites)
missing_domains_as_string = "{}".format(", ".join(map(str, missing_domains)))
duplicate_domains_as_string = "{}".format(", ".join(map(str, duplicate_domains)))
missing_domain_informations_as_string = "{}".format(", ".join(map(str, missing_domain_informations)))
missing_domain_invites_as_string = "{}".format(", ".join(map(str, missing_domain_invites)))
duplicate_domains_as_string = "{}".format(
", ".join(map(str, duplicate_domains))
)
missing_domain_informations_as_string = "{}".format(
", ".join(map(str, missing_domain_informations))
)
missing_domain_invites_as_string = "{}".format(
", ".join(map(str, missing_domain_invites))
)
logger.info(
f"""{TerminalColors.OKGREEN}
@ -216,7 +245,8 @@ class Command(BaseCommand):
"""
)
def run_load_transition_domain_script(self,
def run_load_transition_domain_script(
self,
file_location: str,
domain_contacts_filename: str,
contacts_filename: str,
@ -225,8 +255,8 @@ class Command(BaseCommand):
reset_table: bool,
debug_on: bool,
prompts_enabled: bool,
debug_max_entries_to_parse: int):
debug_max_entries_to_parse: int,
):
"""Runs the load_transition_domain script"""
# Create the command string
command_script = "load_transition_domain"
@ -245,11 +275,14 @@ class Command(BaseCommand):
if debug_max_entries_to_parse > 0:
command_string += f"--limitParse {debug_max_entries_to_parse} "
# Execute the command string
if prompts_enabled:
system_exit_on_terminate = True
TerminalHelper.prompt_for_execution(system_exit_on_terminate, command_string, "Running load_transition_domain script")
TerminalHelper.prompt_for_execution(
system_exit_on_terminate,
command_string,
"Running load_transition_domain script",
)
# TODO: make this somehow run inside TerminalHelper prompt
call_command(
@ -257,16 +290,13 @@ class Command(BaseCommand):
f"{file_location+domain_contacts_filename}",
f"{file_location+contacts_filename}",
f"{file_location+domain_statuses_filename}",
sep = sep,
resetTable = reset_table,
debug = debug_on,
limitParse = debug_max_entries_to_parse
sep=sep,
resetTable=reset_table,
debug=debug_on,
limitParse=debug_max_entries_to_parse,
)
def run_transfer_script(self, debug_on:bool, prompts_enabled: bool):
def run_transfer_script(self, debug_on: bool, prompts_enabled: bool):
"""Runs the transfer_transition_domains_to_domains script"""
# Create the command string
command_script = "transfer_transition_domains_to_domains"
@ -276,12 +306,14 @@ class Command(BaseCommand):
# Execute the command string
if prompts_enabled:
system_exit_on_terminate = True
TerminalHelper.prompt_for_execution(system_exit_on_terminate,command_string, "Running transfer_transition_domains_to_domains script")
TerminalHelper.prompt_for_execution(
system_exit_on_terminate,
command_string,
"Running transfer_transition_domains_to_domains script",
)
# TODO: make this somehow run inside TerminalHelper prompt
call_command(
command_script
)
call_command(command_script)
def run_send_invites_script(self, debug_on: bool, prompts_enabled: bool):
"""Runs the send_domain_invitations script"""
@ -291,15 +323,17 @@ class Command(BaseCommand):
# Execute the command string
if prompts_enabled:
system_exit_on_terminate = True
TerminalHelper.prompt_for_execution(system_exit_on_terminate,command_string, "Running send_domain_invitations script")
# TODO: make this somehow run inside TerminalHelper prompt
call_command(
command_script,
send_emails=True
TerminalHelper.prompt_for_execution(
system_exit_on_terminate,
command_string,
"Running send_domain_invitations script",
)
def run_migration_scripts(self,
# TODO: make this somehow run inside TerminalHelper prompt
call_command(command_script, send_emails=True)
def run_migration_scripts(
self,
file_location,
domain_contacts_filename,
contacts_filename,
@ -308,7 +342,8 @@ class Command(BaseCommand):
reset_table,
debug_on,
prompts_enabled,
debug_max_entries_to_parse):
debug_max_entries_to_parse,
):
"""Runs the following migration scripts (in order):
1 - imports for trans domains
2 - transfer to domain & domain invitation"""
@ -339,18 +374,21 @@ class Command(BaseCommand):
# correct file inputs in their original command
# prompt and exit this subroutine
if not files_are_correct:
logger.info(f"""
logger.info(
f"""
{TerminalColors.YELLOW}
PLEASE Re-Run the script with the correct file location and filenames:
EXAMPLE:
docker compose run -T app ./manage.py test_domain_migration --runMigrations --migrationDirectory /app/tmp --migrationFilenames escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt
""") # noqa
"""
) # noqa
return
# Proceed executing the migration scripts
self.run_load_transition_domain_script(file_location,
self.run_load_transition_domain_script(
file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
@ -358,10 +396,10 @@ class Command(BaseCommand):
reset_table,
debug_on,
prompts_enabled,
debug_max_entries_to_parse)
debug_max_entries_to_parse,
)
self.run_transfer_script(debug_on, prompts_enabled)
def simulate_user_logins(self, debug_on):
"""Simulates logins for users (this will add
Domain Information objects to our tables)"""
@ -389,7 +427,6 @@ class Command(BaseCommand):
# logger.info(f"""{TerminalColors.YELLOW}(Deleting temporary user object){TerminalColors.ENDC}""")
# user.delete()
def handle(
self,
**options,
@ -412,13 +449,13 @@ class Command(BaseCommand):
# the terminal so the user knows what is
# enabled.
# Get arguments
debug_on = options.get("debug")
prompts_enabled = options.get("prompt")
run_migrations_enabled = options.get("runMigrations")
simulate_user_login_enabled = False # TODO: delete? Moving to unit test... options.get("triggerLogins")
simulate_user_login_enabled = (
False # TODO: delete? Moving to unit test... options.get("triggerLogins")
)
TerminalHelper.print_conditional(
debug_on,
@ -426,7 +463,7 @@ class Command(BaseCommand):
----------DEBUG MODE ON----------
Detailed print statements activated.
{TerminalColors.ENDC}
"""
""",
)
TerminalHelper.print_conditional(
run_migrations_enabled,
@ -435,7 +472,7 @@ class Command(BaseCommand):
All migration scripts will be run before
analyzing the data.
{TerminalColors.ENDC}
"""
""",
)
TerminalHelper.print_conditional(
run_migrations_enabled,
@ -443,7 +480,7 @@ class Command(BaseCommand):
----------TRIGGER LOGINS ON----------
Will be simulating user logins
{TerminalColors.ENDC}
"""
""",
)
# If a user decides to run all migration
@ -463,16 +500,15 @@ class Command(BaseCommand):
# grab arguments for running migrations
sep = options.get("sep")
reset_table = options.get("resetTable")
debug_max_entries_to_parse = int(
options.get("limitParse")
)
debug_max_entries_to_parse = int(options.get("limitParse"))
# Grab filepath information from the arguments
file_location = options.get("migrationDirectory")+"/"
file_location = options.get("migrationDirectory") + "/"
filenames = options.get("migrationFilenames").split(",")
if len(filenames) < 3:
filenames_as_string = "{}".format(", ".join(map(str, filenames)))
logger.info(f"""
logger.info(
f"""
{TerminalColors.FAIL}
--migrationFilenames expected 3 filenames to follow it,
but only {len(filenames)} were given:
@ -481,14 +517,16 @@ class Command(BaseCommand):
PLEASE MODIFY THE SCRIPT AND TRY RUNNING IT AGAIN
============= TERMINATING =============
{TerminalColors.ENDC}
""")
"""
)
sys.exit()
domain_contacts_filename = filenames[0]
contacts_filename = filenames[1]
domain_statuses_filename = filenames[2]
# Run migration scripts
self.run_migration_scripts(file_location,
self.run_migration_scripts(
file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
@ -496,7 +534,8 @@ class Command(BaseCommand):
reset_table,
debug_on,
prompts_enabled,
debug_max_entries_to_parse)
debug_max_entries_to_parse,
)
prompt_continuation_of_analysis = True
# STEP 2 -- SIMULATE LOGINS

View file

@ -62,10 +62,9 @@ class TerminalHelper:
if print_condition:
logger.info(print_statement)
def prompt_for_execution(system_exit_on_terminate: bool,
info_to_inspect: str,
prompt_title: str) -> bool:
def prompt_for_execution(
system_exit_on_terminate: bool, info_to_inspect: str, prompt_title: str
) -> bool:
"""Create to reduce code complexity.
Prompts the user to inspect the given string
and asks if they wish to execute it.

View file

@ -10,9 +10,12 @@ from registrar.models import (
UserDomainRole,
)
from registrar.management.commands.master_domain_migrations import Command as master_migration_command
from registrar.management.commands.master_domain_migrations import (
Command as master_migration_command,
)
from django.core.management import call_command
class TestLogins(TestCase):
""" """
@ -53,12 +56,15 @@ class TestLogins(TestCase):
"master_domain_migrations",
runMigrations=True,
migrationDirectory=f"{self.test_data_file_location}",
migrationFilenames=(f"{self.test_domain_contact_filename},"
migrationFilenames=(
f"{self.test_domain_contact_filename},"
f"{self.test_contact_filename},"
f"{self.test_domain_status_filename}"),
f"{self.test_domain_status_filename}"
),
)
def compare_tables(self,
def compare_tables(
self,
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
@ -66,7 +72,8 @@ class TestLogins(TestCase):
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
expected_missing_domain_invitations):
expected_missing_domain_invitations,
):
"""Does a diff between the transition_domain and the following tables:
domain, domain_information and the domain_invitation.
Verifies that the data loaded correctly."""
@ -75,17 +82,21 @@ class TestLogins(TestCase):
duplicate_domains = []
missing_domain_informations = []
missing_domain_invites = []
for transition_domain in TransitionDomain.objects.all():# DEBUG:
for transition_domain in TransitionDomain.objects.all(): # DEBUG:
transition_domain_name = transition_domain.domain_name
transition_domain_email = transition_domain.username
# Check Domain table
matching_domains = Domain.objects.filter(name=transition_domain_name)
# Check Domain Information table
matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name)
matching_domain_informations = DomainInformation.objects.filter(
domain__name=transition_domain_name
)
# Check Domain Invitation table
matching_domain_invitations = DomainInvitation.objects.filter(email=transition_domain_email.lower(),
domain__name=transition_domain_name)
matching_domain_invitations = DomainInvitation.objects.filter(
email=transition_domain_email.lower(),
domain__name=transition_domain_name,
)
if len(matching_domains) == 0:
missing_domains.append(transition_domain_name)
@ -106,7 +117,8 @@ class TestLogins(TestCase):
total_domain_informations = len(DomainInformation.objects.all())
total_domain_invitations = len(DomainInvitation.objects.all())
print(f"""
print(
f"""
total_missing_domains = {len(missing_domains)}
total_duplicate_domains = {len(duplicate_domains)}
total_missing_domain_informations = {len(missing_domain_informations)}
@ -116,25 +128,29 @@ class TestLogins(TestCase):
total_domains = {len(Domain.objects.all())}
total_domain_informations = {len(DomainInformation.objects.all())}
total_domain_invitations = {len(DomainInvitation.objects.all())}
""")
"""
)
self.assertTrue(total_missing_domains == expected_missing_domains)
self.assertTrue(total_duplicate_domains == expected_duplicate_domains)
self.assertTrue(total_missing_domain_informations == expected_missing_domain_informations)
self.assertTrue(total_missing_domain_invitations == expected_missing_domain_invitations)
self.assertTrue(
total_missing_domain_informations == expected_missing_domain_informations
)
self.assertTrue(
total_missing_domain_invitations == expected_missing_domain_invitations
)
self.assertTrue(total_transition_domains == expected_total_transition_domains)
self.assertTrue(total_domains == expected_total_domains)
self.assertTrue(total_domain_informations == expected_total_domain_informations)
self.assertTrue(total_domain_invitations == expected_total_domain_invitations)
def test_master_migration_functions(self):
""" Run the full master migration script using local test data.
"""Run the full master migration script using local test data.
NOTE: This is more of an integration test and so far does not
follow best practice of limiting the number of assertions per test.
But for now, this will double-check that the script
works as intended. """
works as intended."""
self.run_master_script()
@ -166,7 +182,8 @@ class TestLogins(TestCase):
expected_missing_domain_informations = 8
# we expect 1 missing invite from anomaly.gov (an injected error)
expected_missing_domain_invitations = 1
self.compare_tables(expected_total_transition_domains,
self.compare_tables(
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_total_domain_invitations,
@ -177,7 +194,6 @@ class TestLogins(TestCase):
)
def test_load_transition_domain(self):
self.run_load_domains()
# STEP 2: (analyze the tables just like the migration script does, but add assert statements)
@ -190,7 +206,8 @@ class TestLogins(TestCase):
expected_duplicate_domains = 0
expected_missing_domain_informations = 8
expected_missing_domain_invitations = 8
self.compare_tables(expected_total_transition_domains,
self.compare_tables(
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_total_domain_invitations,
@ -201,7 +218,6 @@ class TestLogins(TestCase):
)
def test_transfer_transition_domains_to_domains(self):
# TODO: setup manually instead of calling other script
self.run_load_domains()
self.run_transfer_domains()
@ -216,7 +232,8 @@ class TestLogins(TestCase):
expected_duplicate_domains = 0
expected_missing_domain_informations = 8
expected_missing_domain_invitations = 1
self.compare_tables(expected_total_transition_domains,
self.compare_tables(
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_total_domain_invitations,
@ -226,7 +243,6 @@ class TestLogins(TestCase):
expected_missing_domain_invitations,
)
def test_logins(self):
# TODO: setup manually instead of calling other scripts
self.run_load_domains()
@ -235,7 +251,9 @@ class TestLogins(TestCase):
# Simluate Logins
for invite in DomainInvitation.objects.all():
# get a user with this email address
user, user_created = User.objects.get_or_create(email=invite.email, username=invite.email)
user, user_created = User.objects.get_or_create(
email=invite.email, username=invite.email
)
user.first_login()
# Analyze the tables
@ -248,7 +266,8 @@ class TestLogins(TestCase):
expected_duplicate_domains = 0
expected_missing_domain_informations = 1
expected_missing_domain_invitations = 1
self.compare_tables(expected_total_transition_domains,
self.compare_tables(
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_total_domain_invitations,