Auto stash before merge of "nl/981-test-domain-migration-script" and "origin/nl/981-test-domain-migration-script"

Unit test experiments...
This commit is contained in:
CocoByte 2023-10-25 18:38:39 -06:00
parent b62a9b4223
commit 1b5d7a6248
No known key found for this signature in database
GPG key ID: BBFAA2526384C97F
7 changed files with 302 additions and 155 deletions

View file

@ -101,11 +101,12 @@ class Command(BaseCommand):
return domain_status_dictionary
def get_user_emails_dict(
self, contacts_filename: str, sep
self,
contacts_filename: str, sep
) -> defaultdict[str, str]:
"""Creates mapping of userId -> emails"""
user_emails_dictionary = defaultdict(str)
logger.info("Reading domain-contacts data file %s", contacts_filename)
logger.info("Reading contacts data file %s", contacts_filename)
with open(contacts_filename, "r") as contacts_file:
for row in csv.reader(contacts_file, delimiter=sep):
user_id = row[0]

View file

@ -37,67 +37,71 @@ class Command(BaseCommand):
def add_arguments(self, parser):
"""
OPTIONAL ARGUMENTS:
--runLoaders
--runMigrations
A boolean (default to true), which triggers running
all scripts (in sequence) for transition domain migrations
--triggerLogins
A boolean (default to true), which triggers running
simulations of user logins for each user in domain invitation
--loaderDirectory
--migrationDirectory
The location of the files used for load_transition_domain migration script
EXAMPLE USAGE:
> --loaderDirectory /app/tmp
> --migrationDirectory /app/tmp
--loaderFilenames
--migrationFilenames
The files used for load_transition_domain migration script.
Must appear IN ORDER and separated by spaces:
Must appear IN ORDER and comma-delimiteds:
EXAMPLE USAGE:
> --loaderFilenames domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt
> --migrationFilenames domain_contacts_filename.txt,contacts_filename.txt,domain_statuses_filename.txt
where...
- domain_contacts_filename is the Data file with domain contact information
- contacts_filename is the Data file with contact information
- domain_statuses_filename is the Data file with domain status information
--sep
Delimiter for the loaders to correctly parse the given text files.
Delimiter for the migration scripts to correctly parse the given text files.
(usually this can remain at default value of |)
--debug
A boolean (default to true), which activates additional print statements
--prompt
A boolean (default to true), which activates terminal prompts
that allows the user to step through each portion of this
script.
--limitParse
Used by the loaders (load_transition_domain) to set the limit for the
Used by the migration scripts (load_transition_domain) to set the limit for the
number of data entries to insert. Set to 0 (or just don't use this
argument) to parse every entry. This was provided primarily for testing
purposes
--resetTable
Used by the loaders to trigger a prompt for deleting all table entries.
Used by the migration scripts to trigger a prompt for deleting all table entries.
Useful for testing purposes, but USE WITH CAUTION
"""
parser.add_argument("--runLoaders",
parser.add_argument("--runMigrations",
help="Runs all scripts (in sequence) for transition domain migrations",
action=argparse.BooleanOptionalAction)
# --triggerLogins
# A boolean (default to true), which triggers running
# simulations of user logins for each user in domain invitation
parser.add_argument("--triggerLogins",
help="Simulates a user login for each user in domain invitation",
action=argparse.BooleanOptionalAction)
# The following file arguments have default values for running in the sandbox
parser.add_argument(
"--loaderDirectory",
"--migrationDirectory",
default="migrationData",
help="The location of the files used for load_transition_domain migration script"
)
parser.add_argument(
"--loaderFilenames",
"--migrationFilenames",
default="escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt",
help="""The files used for load_transition_domain migration script.
Must appear IN ORDER and separated by spaces:
domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt
Must appear IN ORDER and separated by commas:
domain_contacts_filename.txt,contacts_filename.txt,domain_statuses_filename.txt
where...
- domain_contacts_filename is the Data file with domain contact information
@ -105,10 +109,12 @@ class Command(BaseCommand):
- domain_statuses_filename is the Data file with domain status information"""
)
parser.add_argument("--sep", default="|", help="Delimiter character for the loader files")
parser.add_argument("--sep", default="|", help="Delimiter character for the migration files")
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
parser.add_argument("--prompt", action=argparse.BooleanOptionalAction)
parser.add_argument(
"--limitParse", default=0, help="Sets max number of entries to load"
)
@ -119,8 +125,6 @@ class Command(BaseCommand):
action=argparse.BooleanOptionalAction,
)
def compare_tables(self, debug_on: bool):
"""Does a diff between the transition_domain and the following tables:
domain, domain_information and the domain_invitation.
@ -211,44 +215,6 @@ class Command(BaseCommand):
"""
)
def prompt_for_execution(self, command_string: str, prompt_title: str) -> bool:
"""Prompts the user to inspect the given terminal command string
and asks if they wish to execute it. If the user responds (y),
execute the command"""
# Allow the user to inspect the command string
# and ask if they wish to proceed
proceed_execution = TerminalHelper.query_yes_no(
f"""{TerminalColors.OKCYAN}
=====================================================
{prompt_title}
=====================================================
*** IMPORTANT: VERIFY THE FOLLOWING COMMAND LOOKS CORRECT ***
{command_string}
{TerminalColors.FAIL}
Proceed? (Y = proceed, N = skip)
{TerminalColors.ENDC}"""
)
# If the user decided to proceed executing the command,
# run the command for loading transition domains.
# Otherwise, exit this subroutine.
if not proceed_execution:
sys.exit()
self.execute_command(command_string)
return True
def execute_command(self, command_string:str):
"""Executes the given command string"""
logger.info(f"""{TerminalColors.OKCYAN}
==== EXECUTING... ====
{TerminalColors.ENDC}""")
os.system(f"{command_string}")
def run_load_transition_domain_script(self,
file_location: str,
domain_contacts_filename: str,
@ -276,9 +242,10 @@ class Command(BaseCommand):
# Execute the command string
if prompts_enabled:
self.prompt_for_execution(command_string, "Running load_transition_domain script")
system_exit_on_terminate = True
TerminalHelper.prompt_for_execution(system_exit_on_terminate, command_string, "Running load_transition_domain script")
return
self.execute_command(command_string)
TerminalHelper.execute_command(command_string)
def run_transfer_script(self, debug_on:bool, prompts_enabled: bool):
@ -289,9 +256,10 @@ class Command(BaseCommand):
command_string += "--debug "
# Execute the command string
if prompts_enabled:
self.prompt_for_execution(command_string, "Running transfer_transition_domains_to_domains script")
system_exit_on_terminate = True
TerminalHelper.prompt_for_execution(system_exit_on_terminate,command_string, "Running transfer_transition_domains_to_domains script")
return
self.execute_command(command_string)
TerminalHelper.execute_command(command_string)
def run_send_invites_script(self, debug_on: bool, prompts_enabled: bool):
@ -300,45 +268,26 @@ class Command(BaseCommand):
command_string = "./manage.py send_domain_invitations -s"
# Execute the command string
if prompts_enabled:
self.prompt_for_execution(command_string, "Running send_domain_invitations script")
system_exit_on_terminate = True
TerminalHelper.prompt_for_execution(system_exit_on_terminate,command_string, "Running send_domain_invitations script")
return
self.execute_command(command_string)
TerminalHelper.execute_command(command_string)
def run_migration_scripts(self,
prompts_enabled: bool,
options):
file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
prompts_enabled,
debug_max_entries_to_parse):
"""Runs the following migration scripts (in order):
1 - imports for trans domains
2 - transfer to domain & domain invitation"""
# Get arguments
sep = options.get("sep")
reset_table = options.get("resetTable")
debug_on = options.get("debug")
debug_max_entries_to_parse = int(
options.get("limitParse")
)
# Grab filepath information from the arguments
file_location = options.get("loaderDirectory")+"/"
filenames = options.get("loaderFilenames").split(",")
if len(filenames) < 3:
filenames_as_string = "{}".format(", ".join(map(str, filenames)))
logger.info(f"""
{TerminalColors.FAIL}
--loaderFilenames expected 3 filenames to follow it,
but only {len(filenames)} were given:
{filenames_as_string}
PLEASE MODIFY THE SCRIPT AND TRY RUNNING IT AGAIN
============= TERMINATING =============
{TerminalColors.ENDC}
""")
sys.exit()
domain_contacts_filename = filenames[0]
contacts_filename = filenames[1]
domain_statuses_filename = filenames[2]
if prompts_enabled:
# Allow the user to inspect the filepath
@ -371,9 +320,9 @@ class Command(BaseCommand):
PLEASE Re-Run the script with the correct file location and filenames:
EXAMPLE:
docker compose run -T app ./manage.py test_domain_migration --runLoaders --loaderDirectory /app/tmp --loaderFilenames escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt
docker compose run -T app ./manage.py test_domain_migration --runMigrations --migrationDirectory /app/tmp --migrationFilenames escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt
""")
""") # noqa
return
# Proceed executing the migration scripts
@ -398,9 +347,6 @@ class Command(BaseCommand):
f"================== SIMULATING LOGINS =================="
f"{TerminalColors.ENDC}")
# command_string = "python ./manage.py test registrar.tests.test_transition_domain_migrations_wiuth_logins.TestLogins.test_user_logins"
# for invite in DomainInvitation.objects.all(): #TODO: limit to our stuff
# #DEBUG:
# TerminalHelper.print_conditional(debug_on,
@ -426,7 +372,7 @@ class Command(BaseCommand):
):
"""
Does the following;
1 - run loader scripts
1 - run migration scripts
2 - simulate logins
3 - send domain invitations (Emails should be sent to the appropriate users
note that all moved domains should now be accessible
@ -441,10 +387,39 @@ class Command(BaseCommand):
# should execute. Print some indicators to
# the terminal so the user knows what is
# enabled.
# Get arguments
debug_on = options.get("debug")
prompts_enabled = debug_on #TODO: add as argument?
run_loaders_enabled = options.get("runLoaders")
simulate_user_login_enabled = options.get("triggerLogins")
prompts_enabled = options.get("prompt")
run_migrations_enabled = options.get("runMigrations")
# simulate_user_login_enabled = options.get("triggerLogins")
sep = options.get("sep")
reset_table = options.get("resetTable")
debug_max_entries_to_parse = int(
options.get("limitParse")
)
# Grab filepath information from the arguments
file_location = options.get("migrationDirectory")+"/"
filenames = options.get("migrationFilenames").split(",")
if len(filenames) < 3:
filenames_as_string = "{}".format(", ".join(map(str, filenames)))
logger.info(f"""
{TerminalColors.FAIL}
--migrationFilenames expected 3 filenames to follow it,
but only {len(filenames)} were given:
{filenames_as_string}
PLEASE MODIFY THE SCRIPT AND TRY RUNNING IT AGAIN
============= TERMINATING =============
{TerminalColors.ENDC}
""")
sys.exit()
domain_contacts_filename = filenames[0]
contacts_filename = filenames[1]
domain_statuses_filename = filenames[2]
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.OKCYAN}
@ -454,22 +429,23 @@ class Command(BaseCommand):
"""
)
TerminalHelper.print_conditional(
run_loaders_enabled,
run_migrations_enabled,
f"""{TerminalColors.OKCYAN}
----------RUNNING LOADERS ON----------
----------RUNNING MIGRATIONS ON----------
All migration scripts will be run before
analyzing the data.
{TerminalColors.ENDC}
"""
)
TerminalHelper.print_conditional(
run_loaders_enabled,
run_migrations_enabled,
f"""{TerminalColors.OKCYAN}
----------TRIGGER LOGINS ON----------
Will be simulating user logins
{TerminalColors.ENDC}
"""
)
# If a user decides to run all migration
# scripts, they may or may not wish to
# proceed with analysis of the data depending
@ -481,10 +457,18 @@ class Command(BaseCommand):
# portion of the script if desired)
prompt_continuation_of_analysis = False
# STEP 1 -- RUN LOADERS
# STEP 1 -- RUN MIGRATIONS
# Run migration scripts if specified by user
if run_loaders_enabled:
self.run_migration_scripts(prompts_enabled, options)
if run_migrations_enabled:
self.run_migration_scripts(file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
prompts_enabled,
debug_max_entries_to_parse)
prompt_continuation_of_analysis = True
# STEP 2 -- SIMULATE LOGINS
@ -498,7 +482,7 @@ class Command(BaseCommand):
# to ensure Domain Information objects get added
# to the database.)
# if run_loaders_enabled:
# if run_migrations_enabled:
# if prompts_enabled:
# simulate_user_login_enabled = TerminalHelper.query_yes_no(
# f"""{TerminalColors.FAIL}
@ -511,24 +495,26 @@ class Command(BaseCommand):
# prompt_continuation_of_analysis = True
# STEP 3 -- SEND INVITES
if prompts_enabled:
proceed_with_sending_invites = run_migrations_enabled
if prompts_enabled and run_migrations_enabled:
proceed_with_sending_invites = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL}
Proceed with sending user invites?
(Y = proceed, N = skip)
{TerminalColors.ENDC}"""
)
if not proceed_with_sending_invites:
return
self.run_send_invites_script(debug_on)
if proceed_with_sending_invites:
self.run_send_invites_script(debug_on, prompts_enabled)
prompt_continuation_of_analysis = True
# STEP 4 -- ANALYZE TABLES & GENERATE REPORT
# Analyze tables for corrupt data...
if prompt_continuation_of_analysis & prompts_enabled:
if prompt_continuation_of_analysis and prompts_enabled:
# ^ (only prompt if we ran steps 1 and/or 2)
analyze_tables = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL}
Proceed with table analysis?
(Y = proceed, N = exit)
{TerminalColors.ENDC}"""
)
if not analyze_tables:

View file

@ -105,8 +105,8 @@ class Command(BaseCommand):
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============
Created {total_new_entries} transition domain entries,
Updated {total_updated_domain_entries} transition domain entries
Created {total_new_entries} domain entries,
Updated {total_updated_domain_entries} domain entries
Created {total_domain_invitation_entries} domain invitation entries
(NOTE: no invitations are SENT in this script)

View file

@ -1,4 +1,6 @@
import logging
import os
import sys
logger = logging.getLogger(__name__)
@ -59,3 +61,48 @@ class TerminalHelper:
# DEBUG:
if print_condition:
logger.info(print_statement)
def execute_command(command_string:str):
"""Executes the given command string"""
logger.info(f"""{TerminalColors.OKCYAN}
==== EXECUTING... ====
{TerminalColors.ENDC}""")
os.system(f"{command_string}")
def prompt_for_execution(system_exit_on_terminate: bool, command_string: str, prompt_title: str) -> bool:
"""Prompts the user to inspect the given terminal command string
and asks if they wish to execute it. If the user responds (y),
execute the command"""
action_description_for_selecting_no = "skip"
if system_exit_on_terminate:
action_description_for_selecting_no = "exit"
# Allow the user to inspect the command string
# and ask if they wish to proceed
proceed_execution = TerminalHelper.query_yes_no(
f"""{TerminalColors.OKCYAN}
=====================================================
{prompt_title}
=====================================================
*** IMPORTANT: VERIFY THE FOLLOWING COMMAND LOOKS CORRECT ***
{command_string}
{TerminalColors.FAIL}
Proceed? (Y = proceed, N = {action_description_for_selecting_no})
{TerminalColors.ENDC}"""
)
# If the user decided to proceed executing the command,
# run the command for loading transition domains.
# Otherwise, exit this subroutine.
if not proceed_execution:
if system_exit_on_terminate:
sys.exit()
return False
TerminalHelper.execute_command(command_string)
return True

View file

@ -6,15 +6,3 @@ NEHRP.GOV|JBOONE|billing
NELSONCOUNTY-VA.GOV|MKELLEY|admin
NELSONCOUNTY-VA.GOV|CWILSON|billing
NELSONCOUNTY-VA.GOV|LMCCADE|tech
NELSONVILLENY.GOV|MBOWMAN|tech
NELSONVILLENY.GOV|PMINNERS|billing
NELSONVILLENY.GOV|CWINWARD|admin
NEMI.GOV|DJS1|tech
NEMI.GOV|EREAD|admin
NEMI.GOV|CHOPKINS|billing
NEPA.GOV|BPAUWELS|admin
NEPA.GOV|CRIBEIRO|billing
NEPA.GOV|DKAUFMAN1|tech
NERSC.GOV|TONEILL|admin
NERSC.GOV|JGERSTLE|billing
NERSC.GOV|RSTROMSNESS|tech

View file

@ -2,5 +2,3 @@ Anomaly.gov|muahaha|
TestDomain.gov|ok|
NEHRP.GOV|serverHold|
NELSONCOUNTY-VA.GOV|Hold|
NEMI.GOV|clientHold|
NERSC.GOV|ok|

View file

@ -4,34 +4,161 @@ from registrar.models import (
User,
Domain,
DomainInvitation,
TransitionDomain,
DomainInformation,
UserDomainRole,
)
from registrar.management.commands.master_domain_migrations import Command as master_migration_command
from registrar.management.commands.utility.terminal_helper import (
TerminalHelper,
)
class TestLogins(TestCase):
"""Test the retrieval of invitations."""
"""Test ......"""
def setUp(self):
self.domain, _ = Domain.objects.get_or_create(name="igorville.gov")
self.email = "mayor@igorville.gov"
self.invitation, _ = DomainInvitation.objects.get_or_create(
email=self.email, domain=self.domain
""" """
# self.user, _ = User.objects.get_or_create(email=self.email)
# # clean out the roles each time
# UserDomainRole.objects.all().delete()
def tearDown(self):
super().tearDown()
TransitionDomain.objects.all().delete()
Domain.objects.all().delete()
DomainInvitation.objects.all().delete()
DomainInformation.objects.all().delete()
def compare_tables(self,
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_total_domain_invitations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
expected_missing_domain_invitations):
"""Does a diff between the transition_domain and the following tables:
domain, domain_information and the domain_invitation.
Verifies that the data loaded correctly."""
missing_domains = []
duplicate_domains = []
missing_domain_informations = []
missing_domain_invites = []
for transition_domain in TransitionDomain.objects.all():# DEBUG:
transition_domain_name = transition_domain.domain_name
transition_domain_email = transition_domain.username
# Check Domain table
matching_domains = Domain.objects.filter(name=transition_domain_name)
# Check Domain Information table
matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name)
# Check Domain Invitation table
matching_domain_invitations = DomainInvitation.objects.filter(email=transition_domain_email.lower(),
domain__name=transition_domain_name)
if len(matching_domains) == 0:
missing_domains.append(transition_domain_name)
elif len(matching_domains) > 1:
duplicate_domains.append(transition_domain_name)
if len(matching_domain_informations) == 0:
missing_domain_informations.append(transition_domain_name)
if len(matching_domain_invitations) == 0:
missing_domain_invites.append(transition_domain_name)
total_missing_domains = len(missing_domains)
total_duplicate_domains = len(duplicate_domains)
total_missing_domain_informations = len(missing_domain_informations)
total_missing_domain_invitations = len(missing_domain_invites)
total_transition_domains = len(TransitionDomain.objects.all())
total_domains = len(Domain.objects.all())
total_domain_informations = len(DomainInformation.objects.all())
total_domain_invitations = len(DomainInvitation.objects.all())
print(f"""
total_missing_domains = {len(missing_domains)}
total_duplicate_domains = {len(duplicate_domains)}
total_missing_domain_informations = {len(missing_domain_informations)}
total_missing_domain_invitations = {len(missing_domain_invites)}
total_transition_domains = {len(TransitionDomain.objects.all())}
total_domains = {len(Domain.objects.all())}
total_domain_informations = {len(DomainInformation.objects.all())}
total_domain_invitations = {len(DomainInvitation.objects.all())}
""")
self.assertTrue(total_missing_domains == expected_missing_domains)
self.assertTrue(total_duplicate_domains == expected_duplicate_domains)
self.assertTrue(total_missing_domain_informations == expected_missing_domain_informations)
self.assertTrue(total_missing_domain_invitations == expected_missing_domain_invitations)
self.assertTrue(total_transition_domains == expected_total_transition_domains)
self.assertTrue(total_domains == expected_total_domains)
self.assertTrue(total_domain_informations == expected_total_domain_informations)
self.assertTrue(total_domain_invitations == expected_total_domain_invitations)
def test_master_migration_functions(self):
""" Run the full master migration script using local test data.
NOTE: This is more of an integration test and so far does not
follow best practice of limiting the number of assertions per test.
But for now, this will double-check that the script
works as intended. """
migration_directory = "/app/registrar/tests/data/"
contacts_filename = "test_contacts.txt"
domain_contacts_filename = "test_domain_contacts.txt"
domain_statuses_filename = "test_domain_statuses.txt"
# STEP 1: Run the master migration script using local test data
master_migration_command.run_load_transition_domain_script(master_migration_command(),
migration_directory,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
"|",
False,
False,
False,
0)
# run_master_script_command = "./manage.py master_domain_migrations"
# run_master_script_command += " --runMigrations"
# run_master_script_command += " --migrationDirectory /app/registrar/tests/data"
# run_master_script_command += " --migrationFilenames test_contacts.txt,test_domain_contacts.txt,test_domain_statuses.txt"
# TerminalHelper.execute_command(run_master_script_command)
# STEP 2: (analyze the tables just like the migration script does, but add assert statements)
expected_total_transition_domains = 8
expected_total_domains = 4
expected_total_domain_informations = 0
expected_total_domain_invitations = 7
expected_missing_domains = 0
expected_duplicate_domains = 0
# we expect 8 missing domain invites since the migration does not auto-login new users
expected_missing_domain_informations = 8
# we expect 1 missing invite from anomaly.gov (an injected error)
expected_missing_domain_invitations = 1
self.compare_tables(expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_total_domain_invitations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
expected_missing_domain_invitations,
)
self.user, _ = User.objects.get_or_create(email=self.email)
# clean out the roles each time
UserDomainRole.objects.all().delete()
def test_migration_functions(self):
""" Run the master migration script using local test data """
""" (analyze the tables just like the migration script does, but add assert statements) """
#TODO: finish me!
self.assertTrue(True)
def test_load_transition_domains():
""" """
def test_user_logins(self):
"""A new user's first_login callback retrieves their invitations."""
self.user.first_login()
self.assertTrue(UserDomainRole.objects.get(user=self.user, domain=self.domain))
# self.user.first_login()
# self.assertTrue(UserDomainRole.objects.get(user=self.user, domain=self.domain))