file renames

Signed-off-by: CocoByte <nicolle.leclair@gmail.com>
This commit is contained in:
CocoByte 2023-10-25 11:26:26 -06:00
parent 53c74a766f
commit b62a9b4223
No known key found for this signature in database
GPG key ID: BBFAA2526384C97F
2 changed files with 146 additions and 179 deletions

View file

@ -10,6 +10,10 @@ import argparse
import sys
import os
from django.test import Client
from django_fsm import TransitionNotAllowed # type: ignore
from django.core.management import BaseCommand
from registrar.models import (
@ -17,26 +21,19 @@ from registrar.models import (
DomainInformation,
DomainInvitation,
TransitionDomain,
User,
)
from registrar.management.commands.utility.terminal_helper import (
TerminalColors,
TerminalHelper,
TerminalHelper
)
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = """ """
# Specifies which files the loaderFilenames commands should target
default_filenames = [
"escrow_domain_contacts.daily.gov.GOV.txt",
"escrow_contacts.daily.gov.GOV.txt",
"escrow_domain_statuses.daily.gov.GOV.txt",
]
def add_arguments(self, parser):
"""
OPTIONAL ARGUMENTS:
@ -54,8 +51,8 @@ class Command(BaseCommand):
> --loaderDirectory /app/tmp
--loaderFilenames
The files used for load_transition_domain migration script.
Must appear IN ORDER and separated by spaces:
The files used for load_transition_domain migration script.
Must appear IN ORDER and separated by spaces:
EXAMPLE USAGE:
> --loaderFilenames domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt
where...
@ -77,49 +74,38 @@ class Command(BaseCommand):
purposes
--resetTable
Used by the loaders to trigger a prompt for deleting all table entries.
Used by the loaders to trigger a prompt for deleting all table entries.
Useful for testing purposes, but USE WITH CAUTION
""" # noqa - line length
"""
parser.add_argument(
"--runLoaders",
parser.add_argument("--runLoaders",
help="Runs all scripts (in sequence) for transition domain migrations",
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"--triggerLogins",
action=argparse.BooleanOptionalAction)
parser.add_argument("--triggerLogins",
help="Simulates a user login for each user in domain invitation",
action=argparse.BooleanOptionalAction,
)
action=argparse.BooleanOptionalAction)
# The following file arguments have default values for running in the sandbox
# For linter
script = "load_transition_domain migration script"
parser.add_argument(
"--loaderDirectory",
default="migrationdata",
help=f"The location of the files used for the {script}",
default="migrationData",
help="The location of the files used for load_transition_domain migration script"
)
parser.add_argument(
"--loaderFilenames",
# For linter.
# Join all of the default filenames with a space.
# The resulting list will look like this: file1.txt file2.txt file3.txt
default=" ".join(self.default_filenames),
help="""The files used for load_transition_domain migration script.
Must appear IN ORDER and separated by spaces:
default="escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt",
help="""The files used for load_transition_domain migration script.
Must appear IN ORDER and separated by spaces:
domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt
where...
- domain_contacts_filename is the Data file with domain contact information
- contacts_filename is the Data file with contact information
- domain_statuses_filename is the Data file with domain status information""", # noqa - linter length
- domain_statuses_filename is the Data file with domain status information"""
)
parser.add_argument(
"--sep", default="|", help="Delimiter character for the loader files"
)
parser.add_argument("--sep", default="|", help="Delimiter character for the loader files")
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
@ -133,20 +119,22 @@ class Command(BaseCommand):
action=argparse.BooleanOptionalAction,
)
def compare_tables(self, debug_on: bool):
"""Does a diff between the transition_domain and the following tables:
domain, domain_information and the domain_invitation.
def compare_tables(self, debug_on: bool):
"""Does a diff between the transition_domain and the following tables:
domain, domain_information and the domain_invitation.
Produces the following report (printed to the terminal):
#1 - Print any domains that exist in the transition_domain table
but not in their corresponding domain, domain information or
domain invitation tables.
#2 - Print which table this domain is missing from
#3- Check for duplicate entries in domain or
domain_information tables and print which are
#3- Check for duplicate entries in domain or
domain_information tables and print which are
duplicates and in which tables
"""
"""
logger.info(
f"""{TerminalColors.OKCYAN}
============= BEGINNING ANALYSIS ===============
@ -154,13 +142,13 @@ class Command(BaseCommand):
"""
)
# TODO: would filteredRelation be faster?
#TODO: would filteredRelation be faster?
missing_domains = []
duplicate_domains = []
missing_domain_informations = []
missing_domain_invites = []
for transition_domain in TransitionDomain.objects.all(): # DEBUG:
for transition_domain in TransitionDomain.objects.all():# DEBUG:
transition_domain_name = transition_domain.domain_name
transition_domain_email = transition_domain.username
@ -172,38 +160,22 @@ class Command(BaseCommand):
# Check Domain table
matching_domains = Domain.objects.filter(name=transition_domain_name)
# Check Domain Information table
matching_domain_informations = DomainInformation.objects.filter(
domain__name=transition_domain_name
)
matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name)
# Check Domain Invitation table
matching_domain_invitations = DomainInvitation.objects.filter(
email=transition_domain_email.lower(),
domain__name=transition_domain_name,
)
matching_domain_invitations = DomainInvitation.objects.filter(email=transition_domain_email.lower(),
domain__name=transition_domain_name)
if len(matching_domains) == 0:
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}Missing Domain{TerminalColors.ENDC}""", # noqa - line length
)
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain{TerminalColors.ENDC}""")
missing_domains.append(transition_domain_name)
elif len(matching_domains) > 1:
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}Duplicate Domain{TerminalColors.ENDC}""", # noqa - line length
)
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Duplicate Domain{TerminalColors.ENDC}""")
duplicate_domains.append(transition_domain_name)
if len(matching_domain_informations) == 0:
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}Missing Domain Information{TerminalColors.ENDC}""", # noqa - line length
)
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Information{TerminalColors.ENDC}""")
missing_domain_informations.append(transition_domain_name)
if len(matching_domain_invitations) == 0:
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}Missing Domain Invitation{TerminalColors.ENDC}""", # noqa - line length
)
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Invitation{TerminalColors.ENDC}""")
missing_domain_invites.append(transition_domain_name)
total_missing_domains = len(missing_domains)
@ -212,16 +184,10 @@ class Command(BaseCommand):
total_missing_domain_invitations = len(missing_domain_invites)
missing_domains_as_string = "{}".format(", ".join(map(str, missing_domains)))
duplicate_domains_as_string = "{}".format(
", ".join(map(str, duplicate_domains))
)
missing_domain_informations_as_string = "{}".format(
", ".join(map(str, missing_domain_informations))
)
missing_domain_invites_as_string = "{}".format(
", ".join(map(str, missing_domain_invites))
)
duplicate_domains_as_string = "{}".format(", ".join(map(str, duplicate_domains)))
missing_domain_informations_as_string = "{}".format(", ".join(map(str, missing_domain_informations)))
missing_domain_invites_as_string = "{}".format(", ".join(map(str, missing_domain_invites)))
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ANALYSIS ===============
@ -242,9 +208,9 @@ class Command(BaseCommand):
(These are transition domains which have no entires in the Domain Invitation Table)
{TerminalColors.YELLOW}{missing_domain_invites_as_string}{TerminalColors.OKGREEN}
{TerminalColors.ENDC}
""" # noqa - line length
"""
)
def prompt_for_execution(self, command_string: str, prompt_title: str) -> bool:
"""Prompts the user to inspect the given terminal command string
and asks if they wish to execute it. If the user responds (y),
@ -270,39 +236,35 @@ class Command(BaseCommand):
# Otherwise, exit this subroutine.
if not proceed_execution:
sys.exit()
self.execute_command(command_string)
return True
def execute_command(self, command_string: str):
def execute_command(self, command_string:str):
"""Executes the given command string"""
logger.info(
f"""{TerminalColors.OKCYAN}
logger.info(f"""{TerminalColors.OKCYAN}
==== EXECUTING... ====
{TerminalColors.ENDC}"""
)
{TerminalColors.ENDC}""")
os.system(f"{command_string}")
def run_load_transition_domain_script(
self,
file_location: str,
domain_contacts_filename: str,
contacts_filename: str,
domain_statuses_filename: str,
sep: str,
reset_table: bool,
debug_on: bool,
prompts_enabled: bool,
debug_max_entries_to_parse: int,
):
def run_load_transition_domain_script(self,
file_location: str,
domain_contacts_filename: str,
contacts_filename: str,
domain_statuses_filename: str,
sep: str,
reset_table: bool,
debug_on: bool,
prompts_enabled: bool,
debug_max_entries_to_parse: int):
"""Runs the load_transition_domain script"""
# Create the command string
command_string = "./manage.py load_transition_domain "
command_string += file_location + domain_contacts_filename + " "
command_string += file_location + contacts_filename + " "
command_string += file_location + domain_statuses_filename + " "
command_string += file_location+domain_contacts_filename + " "
command_string += file_location+contacts_filename + " "
command_string += file_location+domain_statuses_filename + " "
if sep is not None and sep != "|":
command_string += f"--sep {sep} "
if reset_table:
@ -314,13 +276,12 @@ class Command(BaseCommand):
# Execute the command string
if prompts_enabled:
self.prompt_for_execution(
command_string, "Running load_transition_domain script"
)
self.prompt_for_execution(command_string, "Running load_transition_domain script")
return
self.execute_command(command_string)
def run_transfer_script(self, debug_on: bool, prompts_enabled: bool):
def run_transfer_script(self, debug_on:bool, prompts_enabled: bool):
"""Runs the transfer_transition_domains_to_domains script"""
# Create the command string
command_string = "./manage.py transfer_transition_domains_to_domains "
@ -328,42 +289,43 @@ class Command(BaseCommand):
command_string += "--debug "
# Execute the command string
if prompts_enabled:
self.prompt_for_execution(
command_string, "Running transfer_transition_domains_to_domains script"
)
self.prompt_for_execution(command_string, "Running transfer_transition_domains_to_domains script")
return
self.execute_command(command_string)
def run_send_invites_script(self, debug_on: bool, prompts_enabled: bool):
"""Runs the send_domain_invitations script"""
# Create the command string...
command_string = "./manage.py send_domain_invitations -s"
# Execute the command string
if prompts_enabled:
self.prompt_for_execution(
command_string, "Running send_domain_invitations script"
)
self.prompt_for_execution(command_string, "Running send_domain_invitations script")
return
self.execute_command(command_string)
def run_migration_scripts(self, prompts_enabled: bool, options):
"""Runs the following migration scripts (in order):
1 - imports for trans domains
2 - transfer to domain & domain invitation"""
def run_migration_scripts(self,
prompts_enabled: bool,
options):
"""Runs the following migration scripts (in order):
1 - imports for trans domains
2 - transfer to domain & domain invitation"""
# Get arguments
sep = options.get("sep")
reset_table = options.get("resetTable")
debug_on = options.get("debug")
debug_max_entries_to_parse = int(options.get("limitParse"))
debug_max_entries_to_parse = int(
options.get("limitParse")
)
# Grab filepath information from the arguments
file_location = options.get("loaderDirectory") + "/"
filenames = options.get("loaderFilenames").split()
file_location = options.get("loaderDirectory")+"/"
filenames = options.get("loaderFilenames").split(",")
if len(filenames) < 3:
filenames_as_string = "{}".format(", ".join(map(str, filenames)))
logger.info(
f"""
logger.info(f"""
{TerminalColors.FAIL}
--loaderFilenames expected 3 filenames to follow it,
but only {len(filenames)} were given:
@ -372,8 +334,7 @@ class Command(BaseCommand):
PLEASE MODIFY THE SCRIPT AND TRY RUNNING IT AGAIN
============= TERMINATING =============
{TerminalColors.ENDC}
"""
)
""")
sys.exit()
domain_contacts_filename = filenames[0]
contacts_filename = filenames[1]
@ -401,49 +362,45 @@ class Command(BaseCommand):
)
# If the user rejected the filepath information
# as incorrect, prompt the user to provide
# as incorrect, prompt the user to provide
# correct file inputs in their original command
# prompt and exit this subroutine
if not files_are_correct:
logger.info(
f"""
logger.info(f"""
{TerminalColors.YELLOW}
PLEASE Re-Run the script with the correct file location and filenames:
EXAMPLE:
docker compose run -T app ./manage.py test_domain_migration --runLoaders --loaderDirectory /app/tmp --loaderFilenames escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt
""" # noqa - line length
)
""")
return
# Proceed executing the migration scripts
self.run_load_transition_domain_script(
file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
prompts_enabled,
debug_max_entries_to_parse,
)
self.run_load_transition_domain_script(file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
prompts_enabled,
debug_max_entries_to_parse)
self.run_transfer_script(debug_on, prompts_enabled)
def simulate_user_logins(self, debug_on):
"""Simulates logins for users (this will add
Domain Information objects to our tables)"""
logger.info(
f""
f"{TerminalColors.OKCYAN}"
f"================== SIMULATING LOGINS =================="
f"{TerminalColors.ENDC}"
)
command_string = "python ./manage.py test registrar.tests.test_transition_domain_migrations_wiuth_logins.TestLogins.test_user_logins"
logger.info(f""
f"{TerminalColors.OKCYAN}"
f"================== SIMULATING LOGINS =================="
f"{TerminalColors.ENDC}")
# command_string = "python ./manage.py test registrar.tests.test_transition_domain_migrations_wiuth_logins.TestLogins.test_user_logins"
# for invite in DomainInvitation.objects.all(): #TODO: limit to our stuff
# #DEBUG:
# TerminalHelper.print_conditional(debug_on,
@ -462,6 +419,7 @@ class Command(BaseCommand):
# logger.info(f"""{TerminalColors.YELLOW}(Deleting temporary user object){TerminalColors.ENDC}""")
# user.delete()
def handle(
self,
**options,
@ -472,7 +430,7 @@ class Command(BaseCommand):
2 - simulate logins
3 - send domain invitations (Emails should be sent to the appropriate users
note that all moved domains should now be accessible
on django admin for an analyst)
on django admin for an analyst)
4 - analyze the data for transition domains
and generate a report
"""
@ -484,34 +442,34 @@ class Command(BaseCommand):
# the terminal so the user knows what is
# enabled.
debug_on = options.get("debug")
prompts_enabled = debug_on # TODO: add as argument?
prompts_enabled = debug_on #TODO: add as argument?
run_loaders_enabled = options.get("runLoaders")
simulate_user_login_enabled = options.get("triggerLogins")
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.OKCYAN}
debug_on,
f"""{TerminalColors.OKCYAN}
----------DEBUG MODE ON----------
Detailed print statements activated.
{TerminalColors.ENDC}
""",
)
"""
)
TerminalHelper.print_conditional(
run_loaders_enabled,
f"""{TerminalColors.OKCYAN}
run_loaders_enabled,
f"""{TerminalColors.OKCYAN}
----------RUNNING LOADERS ON----------
All migration scripts will be run before
analyzing the data.
{TerminalColors.ENDC}
""",
)
"""
)
TerminalHelper.print_conditional(
run_loaders_enabled,
f"""{TerminalColors.OKCYAN}
run_loaders_enabled,
f"""{TerminalColors.OKCYAN}
----------TRIGGER LOGINS ON----------
Will be simulating user logins
{TerminalColors.ENDC}
""",
)
"""
)
# If a user decides to run all migration
# scripts, they may or may not wish to
# proceed with analysis of the data depending
@ -526,9 +484,9 @@ class Command(BaseCommand):
# STEP 1 -- RUN LOADERS
# Run migration scripts if specified by user
if run_loaders_enabled:
self.run_migration_scripts(options, prompts_enabled)
self.run_migration_scripts(prompts_enabled, options)
prompt_continuation_of_analysis = True
# STEP 2 -- SIMULATE LOGINS
# Simulate user login for each user in domain
# invitation if specified by user OR if running
@ -539,25 +497,26 @@ class Command(BaseCommand):
# automatically execute this as the final step
# to ensure Domain Information objects get added
# to the database.)
if run_loaders_enabled:
if prompts_enabled:
simulate_user_login_enabled = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL}
Proceed with simulating user logins?
{TerminalColors.ENDC}"""
)
if not simulate_user_login_enabled:
return
self.simulate_user_logins(debug_on)
prompt_continuation_of_analysis = True
# if run_loaders_enabled:
# if prompts_enabled:
# simulate_user_login_enabled = TerminalHelper.query_yes_no(
# f"""{TerminalColors.FAIL}
# Proceed with simulating user logins?
# {TerminalColors.ENDC}"""
# )
# if not simulate_user_login_enabled:
# return
# self.simulate_user_logins(debug_on)
# prompt_continuation_of_analysis = True
# STEP 3 -- SEND INVITES
if prompts_enabled:
proceed_with_sending_invites = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL}
Proceed with sending user invites?
{TerminalColors.ENDC}"""
)
)
if not proceed_with_sending_invites:
return
self.run_send_invites_script(debug_on)

View file

@ -23,6 +23,14 @@ class TestLogins(TestCase):
# clean out the roles each time
UserDomainRole.objects.all().delete()
def test_migration_functions(self):
""" Run the master migration script using local test data """
""" (analyze the tables just like the migration script does, but add assert statements) """
#TODO: finish me!
self.assertTrue(True)
def test_user_logins(self):
"""A new user's first_login callback retrieves their invitations."""
self.user.first_login()