Merge Nicolles branch

This commit is contained in:
zandercymatics 2023-10-27 13:45:26 -06:00
commit d70e5a2d77
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
11 changed files with 980 additions and 620 deletions

View file

@ -84,6 +84,10 @@ FILE 1: **escrow_domain_contacts.daily.gov.GOV.txt** -> has the map of domain na
FILE 2: **escrow_contacts.daily.gov.GOV.txt** -> has the mapping of contact id to contact email address (which is what we care about for sending domain invitations)
FILE 3: **escrow_domain_statuses.daily.gov.GOV.txt** -> has the map of domains and their statuses
We need to run a few scripts to parse these files into our domain tables.
We can do this both locally and in a sandbox.
## OPTION 1: SANDBOX
## Load migration data onto a production or sandbox environment
**WARNING:** All files uploaded in this manner are temporary, i.e. they will be deleted when the app is restaged.
Do not use this method to store data you want to keep around permanently.
@ -151,7 +155,18 @@ From this directory, run the following command:
```
NOTE: This will look for all files in /tmp with the .txt extension, but this can
be changed if you are dealing with different extensions.
be changed if you are dealing with different extensions. For instance, a .tar.gz could be expressed
as `--file_extension tar.gz`.
If you are using a tar.gz file, you will need to perform one additional step to extract it.
Run the following command from the same directory:
```shell
tar -xvf migrationdata/{FILE_NAME}.tar.gz -C migrationdata/ --strip-components=1
```
*FILE_NAME* - Name of the desired file, ex: exportdata
#### Manual method
If the `cat_files_into_getgov.py` script isn't working, follow these steps instead.
@ -169,6 +184,7 @@ Run the following script to transfer the existing data on our .txt files to our
./manage.py load_transition_domain migrationdata/escrow_domain_contacts.daily.gov.GOV.txt migrationdata/escrow_contacts.daily.gov.GOV.txt migrationdata/escrow_domain_statuses.daily.gov.GOV.txt
```
## OPTION 2: LOCAL
## Load migration data onto our local environments
Transferring this data from these files into our domain tables happens in two steps;

View file

@ -1,6 +1,5 @@
"""Loads files from /tmp into our sandboxes"""
import glob
import csv
import logging
import os
@ -44,9 +43,9 @@ class Command(BaseCommand):
desired_file_path = f"{directory}/{filename}"
if os.path.exists(desired_file_path):
replace = input(
f"{desired_file_path} already exists. Do you want to replace it? (y/n) "
)
# For linter
prompt = " Do you want to replace it? (y/n) "
replace = input(f"{desired_file_path} already exists. {prompt}")
if replace.lower() != "y":
do_command = False
@ -58,8 +57,10 @@ class Command(BaseCommand):
if exit_status == 0:
logger.info(f"Successfully copied {filename}")
else:
logger.info(f"Failed to copy {filename}")
logger.error(f"Failed to copy {filename}")
def cat(self, copy_from, copy_to):
"""Runs the cat command to
copy_from a location to copy_to a location"""
exit_status = os.system(f"cat {copy_from} > {copy_to}")
return exit_status

View file

@ -9,56 +9,14 @@ from django.core.management import BaseCommand
from registrar.models import TransitionDomain
from registrar.management.commands.utility.terminal_helper import (
TerminalColors,
TerminalHelper,
)
logger = logging.getLogger(__name__)
class termColors:
"""Colors for terminal outputs
(makes reading the logs WAY easier)"""
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
YELLOW = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
BackgroundLightYellow = "\033[103m"
def query_yes_no(question: str, default="yes") -> bool:
"""Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
"""
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
logger.info(question + prompt)
choice = input().lower()
if default is not None and choice == "":
return valid[default]
elif choice in valid:
return valid[choice]
else:
logger.info("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
class Command(BaseCommand):
help = """Loads data for domains that are in transition
(populates transition_domain model objects)."""
@ -110,20 +68,20 @@ class Command(BaseCommand):
or --limitParse are in use"""
if debug_on:
logger.info(
f"""{termColors.OKCYAN}
f"""{TerminalColors.OKCYAN}
----------DEBUG MODE ON----------
Detailed print statements activated.
{termColors.ENDC}
{TerminalColors.ENDC}
"""
)
if debug_max_entries_to_parse > 0:
logger.info(
f"""{termColors.OKCYAN}
f"""{TerminalColors.OKCYAN}
----------LIMITER ON----------
Parsing of entries will be limited to
{debug_max_entries_to_parse} lines per file.")
Detailed print statements activated.
{termColors.ENDC}
{TerminalColors.ENDC}
"""
)
@ -142,11 +100,12 @@ class Command(BaseCommand):
return domain_status_dictionary
def get_user_emails_dict(
self, contacts_filename: str, sep
self,
contacts_filename: str, sep
) -> defaultdict[str, str]:
"""Creates mapping of userId -> emails"""
user_emails_dictionary = defaultdict(str)
logger.info("Reading domain-contacts data file %s", contacts_filename)
logger.info("Reading contacts data file %s", contacts_filename)
with open(contacts_filename, "r") as contacts_file:
for row in csv.reader(contacts_file, delimiter=sep):
user_id = row[0]
@ -195,7 +154,7 @@ class Command(BaseCommand):
", ".join(map(str, duplicate_domain_user_combos))
)
logger.warning(
f"{termColors.YELLOW} No e-mails found for users: {users_without_email_as_string}" # noqa
f"{TerminalColors.YELLOW} No e-mails found for users: {users_without_email_as_string}" # noqa
)
if total_duplicate_pairs > 0 or total_duplicate_domains > 0:
duplicate_pairs_as_string = "{}".format(
@ -205,7 +164,7 @@ class Command(BaseCommand):
", ".join(map(str, duplicate_domains))
)
logger.warning(
f"""{termColors.YELLOW}
f"""{TerminalColors.YELLOW}
----DUPLICATES FOUND-----
@ -218,7 +177,7 @@ class Command(BaseCommand):
the supplied data files;
{duplicate_domains_as_string}
{termColors.ENDC}"""
{TerminalColors.ENDC}"""
)
def print_summary_status_findings(
@ -237,7 +196,7 @@ class Command(BaseCommand):
", ".join(map(str, domains_without_status))
)
logger.warning(
f"""{termColors.YELLOW}
f"""{TerminalColors.YELLOW}
--------------------------------------------
Found {total_domains_without_status} domains
@ -245,7 +204,7 @@ class Command(BaseCommand):
---------------------------------------------
{domains_without_status_as_string}
{termColors.ENDC}"""
{TerminalColors.ENDC}"""
)
if total_outlier_statuses > 0:
@ -253,7 +212,7 @@ class Command(BaseCommand):
", ".join(map(str, outlier_statuses))
) # noqa
logger.warning(
f"""{termColors.YELLOW}
f"""{TerminalColors.YELLOW}
--------------------------------------------
Found {total_outlier_statuses} unaccounted
@ -264,36 +223,27 @@ class Command(BaseCommand):
(defaulted to Ready):
{domains_without_status_as_string}
{termColors.ENDC}"""
{TerminalColors.ENDC}"""
)
def print_debug(self, print_condition: bool, print_statement: str):
"""This function reduces complexity of debug statements
in other functions.
It uses the logger to write the given print_statement to the
terminal if print_condition is TRUE"""
# DEBUG:
if print_condition:
logger.info(print_statement)
def prompt_table_reset(self):
"""Brings up a prompt in the terminal asking
if the user wishes to delete data in the
TransitionDomain table. If the user confirms,
deletes all the data in the TransitionDomain table"""
confirm_reset = query_yes_no(
confirm_reset = TerminalHelper.query_yes_no(
f"""
{termColors.FAIL}
{TerminalColors.FAIL}
WARNING: Resetting the table will permanently delete all
the data!
Are you sure you want to continue?{termColors.ENDC}"""
Are you sure you want to continue?{TerminalColors.ENDC}"""
)
if confirm_reset:
logger.info(
f"""{termColors.YELLOW}
f"""{TerminalColors.YELLOW}
----------Clearing Table Data----------
(please wait)
{termColors.ENDC}"""
{TerminalColors.ENDC}"""
)
TransitionDomain.objects.all().delete()
@ -429,18 +379,18 @@ class Command(BaseCommand):
)
if existing_domain is not None:
# DEBUG:
self.print_debug(
TerminalHelper.print_conditional(
debug_on,
f"{termColors.YELLOW} DUPLICATE file entries found for domain: {new_entry_domain_name} {termColors.ENDC}", # noqa
f"{TerminalColors.YELLOW} DUPLICATE file entries found for domain: {new_entry_domain_name} {TerminalColors.ENDC}", # noqa
)
if new_entry_domain_name not in duplicate_domains:
duplicate_domains.append(new_entry_domain_name)
if existing_domain_user_pair is not None:
# DEBUG:
self.print_debug(
TerminalHelper.print_conditional(
debug_on,
f"""{termColors.YELLOW} DUPLICATE file entries found for domain - user {termColors.BackgroundLightYellow} PAIR {termColors.ENDC}{termColors.YELLOW}:
{new_entry_domain_name} - {new_entry_email} {termColors.ENDC}""", # noqa
f"""{TerminalColors.YELLOW} DUPLICATE file entries found for domain - user {TerminalColors.BackgroundLightYellow} PAIR {TerminalColors.ENDC}{TerminalColors.YELLOW}:
{new_entry_domain_name} - {new_entry_email} {TerminalColors.ENDC}""", # noqa
)
if existing_domain_user_pair not in duplicate_domain_user_combos:
duplicate_domain_user_combos.append(existing_domain_user_pair)
@ -457,20 +407,20 @@ class Command(BaseCommand):
if existing_entry.status != new_entry_status:
# DEBUG:
self.print_debug(
TerminalHelper.print_conditional(
debug_on,
f"{termColors.OKCYAN}"
f"{TerminalColors.OKCYAN}"
f"Updating entry: {existing_entry}"
f"Status: {existing_entry.status} > {new_entry_status}" # noqa
f"Email Sent: {existing_entry.email_sent} > {new_entry_emailSent}" # noqa
f"{termColors.ENDC}",
f"{TerminalColors.ENDC}",
)
existing_entry.status = new_entry_status
existing_entry.email_sent = new_entry_emailSent
existing_entry.save()
except TransitionDomain.MultipleObjectsReturned:
logger.info(
f"{termColors.FAIL}"
f"{TerminalColors.FAIL}"
f"!!! ERROR: duplicate entries exist in the"
f"transtion_domain table for domain:"
f"{new_entry_domain_name}"
@ -489,9 +439,9 @@ class Command(BaseCommand):
total_new_entries += 1
# DEBUG:
self.print_debug(
TerminalHelper.print_conditional(
debug_on,
f"{termColors.OKCYAN} Adding entry {total_new_entries}: {new_entry} {termColors.ENDC}", # noqa
f"{TerminalColors.OKCYAN} Adding entry {total_new_entries}: {new_entry} {TerminalColors.ENDC}", # noqa
)
# Check Parse limit and exit loop if needed
@ -500,20 +450,20 @@ class Command(BaseCommand):
and debug_max_entries_to_parse != 0
):
logger.info(
f"{termColors.YELLOW}"
f"{TerminalColors.YELLOW}"
f"----PARSE LIMIT REACHED. HALTING PARSER.----"
f"{termColors.ENDC}"
f"{TerminalColors.ENDC}"
)
break
TransitionDomain.objects.bulk_create(to_create)
logger.info(
f"""{termColors.OKGREEN}
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============
Created {total_new_entries} transition domain entries,
updated {total_updated_domain_entries} transition domain entries
{termColors.ENDC}
{TerminalColors.ENDC}
"""
)

View file

@ -0,0 +1,550 @@
"""Data migration:
1 - generates a report of data integrity across all
transition domain related tables
2 - allows users to run all migration scripts for
transition domain data
"""
import logging
import argparse
import sys
import os
from django.test import Client
from django_fsm import TransitionNotAllowed # type: ignore
from django.core.management import BaseCommand
from django.core.management import call_command
from registrar.models import (
Domain,
DomainInformation,
DomainInvitation,
TransitionDomain,
User,
)
from registrar.management.commands.utility.terminal_helper import (
TerminalColors,
TerminalHelper
)
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = """ """
def add_arguments(self, parser):
"""
OPTIONAL ARGUMENTS:
--runMigrations
A boolean (default to true), which triggers running
all scripts (in sequence) for transition domain migrations
--migrationDirectory
The location of the files used for load_transition_domain migration script
EXAMPLE USAGE:
> --migrationDirectory /app/tmp
--migrationFilenames
The files used for load_transition_domain migration script.
Must appear IN ORDER and comma-delimiteds:
EXAMPLE USAGE:
> --migrationFilenames domain_contacts_filename.txt,contacts_filename.txt,domain_statuses_filename.txt
where...
- domain_contacts_filename is the Data file with domain contact information
- contacts_filename is the Data file with contact information
- domain_statuses_filename is the Data file with domain status information
--sep
Delimiter for the migration scripts to correctly parse the given text files.
(usually this can remain at default value of |)
--debug
A boolean (default to true), which activates additional print statements
--prompt
A boolean (default to true), which activates terminal prompts
that allows the user to step through each portion of this
script.
--limitParse
Used by the migration scripts (load_transition_domain) to set the limit for the
number of data entries to insert. Set to 0 (or just don't use this
argument) to parse every entry. This was provided primarily for testing
purposes
--resetTable
Used by the migration scripts to trigger a prompt for deleting all table entries.
Useful for testing purposes, but USE WITH CAUTION
"""
parser.add_argument("--runMigrations",
help="Runs all scripts (in sequence) for transition domain migrations",
action=argparse.BooleanOptionalAction)
# --triggerLogins
# A boolean (default to true), which triggers running
# simulations of user logins for each user in domain invitation
parser.add_argument("--triggerLogins",
help="Simulates a user login for each user in domain invitation",
action=argparse.BooleanOptionalAction)
# The following file arguments have default values for running in the sandbox
parser.add_argument(
"--migrationDirectory",
default="migrationData",
help="The location of the files used for load_transition_domain migration script"
)
parser.add_argument(
"--migrationFilenames",
default="escrow_domain_contacts.daily.gov.GOV.txt,escrow_contacts.daily.gov.GOV.txt,escrow_domain_statuses.daily.gov.GOV.txt",
help="""The files used for load_transition_domain migration script.
Must appear IN ORDER and separated by commas:
domain_contacts_filename.txt,contacts_filename.txt,domain_statuses_filename.txt
where...
- domain_contacts_filename is the Data file with domain contact information
- contacts_filename is the Data file with contact information
- domain_statuses_filename is the Data file with domain status information"""
)
parser.add_argument("--sep", default="|", help="Delimiter character for the migration files")
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
parser.add_argument("--prompt", action=argparse.BooleanOptionalAction)
parser.add_argument(
"--limitParse", default=0, help="Sets max number of entries to load"
)
parser.add_argument(
"--resetTable",
help="Deletes all data in the TransitionDomain table",
action=argparse.BooleanOptionalAction,
)
def compare_tables(self, debug_on: bool):
"""Does a diff between the transition_domain and the following tables:
domain, domain_information and the domain_invitation.
Produces the following report (printed to the terminal):
#1 - Print any domains that exist in the transition_domain table
but not in their corresponding domain, domain information or
domain invitation tables.
#2 - Print which table this domain is missing from
#3- Check for duplicate entries in domain or
domain_information tables and print which are
duplicates and in which tables
"""
logger.info(
f"""{TerminalColors.OKCYAN}
============= BEGINNING ANALYSIS ===============
{TerminalColors.ENDC}
"""
)
#TODO: would filteredRelation be faster?
missing_domains = []
duplicate_domains = []
missing_domain_informations = []
missing_domain_invites = []
for transition_domain in TransitionDomain.objects.all():# DEBUG:
transition_domain_name = transition_domain.domain_name
transition_domain_email = transition_domain.username
TerminalHelper.print_conditional(
debug_on,
f"{TerminalColors.OKCYAN}Checking: {transition_domain_name} {TerminalColors.ENDC}", # noqa
)
# Check Domain table
matching_domains = Domain.objects.filter(name=transition_domain_name)
# Check Domain Information table
matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name)
# Check Domain Invitation table
matching_domain_invitations = DomainInvitation.objects.filter(email=transition_domain_email.lower(),
domain__name=transition_domain_name)
if len(matching_domains) == 0:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain{TerminalColors.ENDC}""")
missing_domains.append(transition_domain_name)
elif len(matching_domains) > 1:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Duplicate Domain{TerminalColors.ENDC}""")
duplicate_domains.append(transition_domain_name)
if len(matching_domain_informations) == 0:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Information{TerminalColors.ENDC}""")
missing_domain_informations.append(transition_domain_name)
if len(matching_domain_invitations) == 0:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Invitation{TerminalColors.ENDC}""")
missing_domain_invites.append(transition_domain_name)
total_missing_domains = len(missing_domains)
total_duplicate_domains = len(duplicate_domains)
total_missing_domain_informations = len(missing_domain_informations)
total_missing_domain_invitations = len(missing_domain_invites)
missing_domains_as_string = "{}".format(", ".join(map(str, missing_domains)))
duplicate_domains_as_string = "{}".format(", ".join(map(str, duplicate_domains)))
missing_domain_informations_as_string = "{}".format(", ".join(map(str, missing_domain_informations)))
missing_domain_invites_as_string = "{}".format(", ".join(map(str, missing_domain_invites)))
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ANALYSIS ===============
{total_missing_domains} Missing Domains:
(These are transition domains that are missing from the Domain Table)
{TerminalColors.YELLOW}{missing_domains_as_string}{TerminalColors.OKGREEN}
{total_duplicate_domains} Duplicate Domains:
(These are transition domains which have duplicate entries in the Domain Table)
{TerminalColors.YELLOW}{duplicate_domains_as_string}{TerminalColors.OKGREEN}
{total_missing_domain_informations} Domain Information Entries missing:
(These are transition domains which have no entries in the Domain Information Table)
{TerminalColors.YELLOW}{missing_domain_informations_as_string}{TerminalColors.OKGREEN}
{total_missing_domain_invitations} Domain Invitations missing:
(These are transition domains which have no entires in the Domain Invitation Table)
{TerminalColors.YELLOW}{missing_domain_invites_as_string}{TerminalColors.OKGREEN}
{TerminalColors.ENDC}
"""
)
def run_load_transition_domain_script(self,
file_location: str,
domain_contacts_filename: str,
contacts_filename: str,
domain_statuses_filename: str,
sep: str,
reset_table: bool,
debug_on: bool,
prompts_enabled: bool,
debug_max_entries_to_parse: int):
"""Runs the load_transition_domain script"""
# Create the command string
command_script = "load_transition_domain"
command_string = (
f"./manage.py {command_script} "
f"{file_location+domain_contacts_filename} "
f"{file_location+contacts_filename} "
f"{file_location+domain_statuses_filename} "
)
if sep is not None and sep != "|":
command_string += f"--sep {sep} "
if reset_table:
command_string += "--resetTable "
if debug_on:
command_string += "--debug "
if debug_max_entries_to_parse > 0:
command_string += f"--limitParse {debug_max_entries_to_parse} "
# Execute the command string
if prompts_enabled:
system_exit_on_terminate = True
TerminalHelper.prompt_for_execution(system_exit_on_terminate, command_string, "Running load_transition_domain script")
# TODO: make this somehow run inside TerminalHelper prompt
call_command(
command_script,
f"{file_location+domain_contacts_filename}",
f"{file_location+contacts_filename}",
f"{file_location+domain_statuses_filename}",
sep = sep,
resetTable = reset_table,
debug = debug_on,
limitParse = debug_max_entries_to_parse
)
def run_transfer_script(self, debug_on:bool, prompts_enabled: bool):
"""Runs the transfer_transition_domains_to_domains script"""
# Create the command string
command_script = "transfer_transition_domains_to_domains"
command_string = f"./manage.py {command_script}"
if debug_on:
command_string += "--debug "
# Execute the command string
if prompts_enabled:
system_exit_on_terminate = True
TerminalHelper.prompt_for_execution(system_exit_on_terminate,command_string, "Running transfer_transition_domains_to_domains script")
# TODO: make this somehow run inside TerminalHelper prompt
call_command(
command_script
)
def run_send_invites_script(self, debug_on: bool, prompts_enabled: bool):
"""Runs the send_domain_invitations script"""
# Create the command string...
command_script = "send_domain_invitations"
command_string = f"./manage.py {command_script} -s"
# Execute the command string
if prompts_enabled:
system_exit_on_terminate = True
TerminalHelper.prompt_for_execution(system_exit_on_terminate,command_string, "Running send_domain_invitations script")
# TODO: make this somehow run inside TerminalHelper prompt
call_command(
command_script,
send_emails=True
)
def run_migration_scripts(self,
file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
prompts_enabled,
debug_max_entries_to_parse):
"""Runs the following migration scripts (in order):
1 - imports for trans domains
2 - transfer to domain & domain invitation"""
if prompts_enabled:
# Allow the user to inspect the filepath
# data given in the arguments, and prompt
# the user to verify this info before proceeding
files_are_correct = TerminalHelper.query_yes_no(
f"""
{TerminalColors.OKCYAN}
*** IMPORTANT: VERIFY THE FOLLOWING ***
The migration scripts are looking in directory....
{file_location}
....for the following files:
- domain contacts: {domain_contacts_filename}
- contacts: {contacts_filename}
- domain statuses: {domain_statuses_filename}
{TerminalColors.FAIL}
Does this look correct?{TerminalColors.ENDC}"""
)
# If the user rejected the filepath information
# as incorrect, prompt the user to provide
# correct file inputs in their original command
# prompt and exit this subroutine
if not files_are_correct:
logger.info(f"""
{TerminalColors.YELLOW}
PLEASE Re-Run the script with the correct file location and filenames:
EXAMPLE:
docker compose run -T app ./manage.py test_domain_migration --runMigrations --migrationDirectory /app/tmp --migrationFilenames escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt
""") # noqa
return
# Proceed executing the migration scripts
self.run_load_transition_domain_script(file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
prompts_enabled,
debug_max_entries_to_parse)
self.run_transfer_script(debug_on, prompts_enabled)
def simulate_user_logins(self, debug_on):
"""Simulates logins for users (this will add
Domain Information objects to our tables)"""
# logger.info(f""
# f"{TerminalColors.OKCYAN}"
# f"================== SIMULATING LOGINS =================="
# f"{TerminalColors.ENDC}")
# for invite in DomainInvitation.objects.all(): #TODO: move to unit test
# #DEBUG:
# TerminalHelper.print_conditional(debug_on,
# f"{TerminalColors.OKCYAN}"
# f"Processing invite: {invite}"
# f"{TerminalColors.ENDC}")
# # get a user with this email address
# user, user_created = User.objects.get_or_create(email=invite.email, username=invite.email)
# #DEBUG:
# TerminalHelper.print_conditional(user_created,
# f"""{TerminalColors.OKCYAN}No user found (creating temporary user object){TerminalColors.ENDC}""")
# TerminalHelper.print_conditional(debug_on,
# f"""{TerminalColors.OKCYAN}Executing first-time login for user: {user}{TerminalColors.ENDC}""")
# user.first_login()
# if user_created:
# logger.info(f"""{TerminalColors.YELLOW}(Deleting temporary user object){TerminalColors.ENDC}""")
# user.delete()
def handle(
self,
**options,
):
"""
Does the following;
1 - run migration scripts
2 - simulate logins
3 - send domain invitations (Emails should be sent to the appropriate users
note that all moved domains should now be accessible
on django admin for an analyst)
4 - analyze the data for transition domains
and generate a report
"""
# SETUP
# Grab all arguments relevant to
# orchestrating which parts of this script
# should execute. Print some indicators to
# the terminal so the user knows what is
# enabled.
# Get arguments
debug_on = options.get("debug")
prompts_enabled = options.get("prompt")
run_migrations_enabled = options.get("runMigrations")
simulate_user_login_enabled = False # TODO: delete? Moving to unit test... options.get("triggerLogins")
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.OKCYAN}
----------DEBUG MODE ON----------
Detailed print statements activated.
{TerminalColors.ENDC}
"""
)
TerminalHelper.print_conditional(
run_migrations_enabled,
f"""{TerminalColors.OKCYAN}
----------RUNNING MIGRATIONS ON----------
All migration scripts will be run before
analyzing the data.
{TerminalColors.ENDC}
"""
)
TerminalHelper.print_conditional(
run_migrations_enabled,
f"""{TerminalColors.OKCYAN}
----------TRIGGER LOGINS ON----------
Will be simulating user logins
{TerminalColors.ENDC}
"""
)
# If a user decides to run all migration
# scripts, they may or may not wish to
# proceed with analysis of the data depending
# on the results of the migration.
# Provide a breakpoint for them to decide
# whether to continue or not.
# The same will happen if simulating user
# logins (to allow users to run only that
# portion of the script if desired)
prompt_continuation_of_analysis = False
# STEP 1 -- RUN MIGRATIONS
# Run migration scripts if specified by user
if run_migrations_enabled:
# grab arguments for running migrations
sep = options.get("sep")
reset_table = options.get("resetTable")
debug_max_entries_to_parse = int(
options.get("limitParse")
)
# Grab filepath information from the arguments
file_location = options.get("migrationDirectory")+"/"
filenames = options.get("migrationFilenames").split(",")
if len(filenames) < 3:
filenames_as_string = "{}".format(", ".join(map(str, filenames)))
logger.info(f"""
{TerminalColors.FAIL}
--migrationFilenames expected 3 filenames to follow it,
but only {len(filenames)} were given:
{filenames_as_string}
PLEASE MODIFY THE SCRIPT AND TRY RUNNING IT AGAIN
============= TERMINATING =============
{TerminalColors.ENDC}
""")
sys.exit()
domain_contacts_filename = filenames[0]
contacts_filename = filenames[1]
domain_statuses_filename = filenames[2]
# Run migration scripts
self.run_migration_scripts(file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
prompts_enabled,
debug_max_entries_to_parse)
prompt_continuation_of_analysis = True
# STEP 2 -- SIMULATE LOGINS
# Simulate user login for each user in domain
# invitation if specified by user OR if running
# migration scripts.
# (NOTE: Although users can choose to run login
# simulations separately (for testing purposes),
# if we are running all migration scripts, we should
# automatically execute this as the final step
# to ensure Domain Information objects get added
# to the database.)
if run_migrations_enabled and simulate_user_login_enabled:
if prompts_enabled:
simulate_user_login_enabled = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL}
Proceed with simulating user logins?
{TerminalColors.ENDC}"""
)
if not simulate_user_login_enabled:
return
self.simulate_user_logins(debug_on)
prompt_continuation_of_analysis = True
# STEP 3 -- SEND INVITES
proceed_with_sending_invites = run_migrations_enabled
if prompts_enabled and run_migrations_enabled:
proceed_with_sending_invites = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL}
Proceed with sending user invites for all transition domains?
(Y = proceed, N = skip)
{TerminalColors.ENDC}"""
)
if proceed_with_sending_invites:
self.run_send_invites_script(debug_on, prompts_enabled)
prompt_continuation_of_analysis = True
# STEP 4 -- ANALYZE TABLES & GENERATE REPORT
# Analyze tables for corrupt data...
if prompt_continuation_of_analysis and prompts_enabled:
# ^ (only prompt if we ran steps 1 and/or 2)
analyze_tables = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL}
Proceed with table analysis?
(Y = proceed, N = exit)
{TerminalColors.ENDC}"""
)
if not analyze_tables:
return
self.compare_tables(debug_on)

View file

@ -1,456 +0,0 @@
import logging
import argparse
import os
from django.test import Client
from django_fsm import TransitionNotAllowed # type: ignore
from django.core.management import BaseCommand
from django.contrib.auth import get_user_model
from registrar.models import TransitionDomain
from registrar.models import Domain
from registrar.models import DomainInvitation
from registrar.models.domain_information import DomainInformation
from registrar.management.commands.utility.terminal_helper import TerminalColors
from registrar.management.commands.utility.terminal_helper import TerminalHelper
from registrar.management.commands.load_transition_domain import (
Command as load_transition_domain_command,
)
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = """ """
def add_arguments(self, parser):
"""
OPTIONAL ARGUMENTS:
--debug
A boolean (default to true), which activates additional print statements
"""
parser.add_argument(
"--runLoaders",
help="Runs all scripts (in sequence) for transition domain migrations",
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"--triggerLogins",
help="Simulates a user login for each user in domain invitation",
action=argparse.BooleanOptionalAction,
)
# The following file arguments have default values for running in the sandbox
parser.add_argument(
"--loaderDirectory",
default="migrationData",
help="The location of the files used for load_transition_domain migration script",
)
parser.add_argument(
"--loaderFilenames",
default="escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt",
help="""The files used for load_transition_domain migration script.
Must appear IN ORDER and separated by spaces:
domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt
where...
- domain_contacts_filename is the Data file with domain contact information
- contacts_filename is the Data file with contact information
- domain_statuses_filename is the Data file with domain status information""",
)
# parser.add_argument(
# "domain_contacts_filename",
# default="escrow_domain_contacts.daily.gov.GOV.txt",
# help="Data file with domain contact information"
# )
# parser.add_argument(
# "contacts_filename",
# default="escrow_contacts.daily.gov.GOV.txt",
# help="Data file with contact information",
# )
# parser.add_argument(
# "domain_statuses_filename",
# default="escrow_domain_statuses.daily.gov.GOV.txt",
# help="Data file with domain status information"
# )
parser.add_argument(
"--sep", default="|", help="Delimiter character for the loader files"
)
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
parser.add_argument(
"--limitParse", default=0, help="Sets max number of entries to load"
)
parser.add_argument(
"--resetTable",
help="Deletes all data in the TransitionDomain table",
action=argparse.BooleanOptionalAction,
)
def print_debug_mode_statements(self, debug_on: bool):
"""Prints additional terminal statements to indicate if --debug
or --limitParse are in use"""
self.print_debug(
debug_on,
f"""{TerminalColors.OKCYAN}
----------DEBUG MODE ON----------
Detailed print statements activated.
{TerminalColors.ENDC}
""",
)
def print_debug(self, print_condition: bool, print_statement: str):
"""This function reduces complexity of debug statements
in other functions.
It uses the logger to write the given print_statement to the
terminal if print_condition is TRUE"""
# DEBUG:
if print_condition:
logger.info(print_statement)
def compare_tables(self, debug_on):
logger.info(
f"""{TerminalColors.OKCYAN}
============= BEGINNING ANALYSIS ===============
{TerminalColors.ENDC}
"""
)
# TODO: would filteredRelation be faster?
for transition_domain in TransitionDomain.objects.all(): # DEBUG:
transition_domain_name = transition_domain.domain_name
transition_domain_email = transition_domain.username
self.print_debug(
debug_on,
f"{TerminalColors.OKCYAN}Checking: {transition_domain_name} {TerminalColors.ENDC}", # noqa
)
missing_domains = []
duplicate_domains = []
missing_domain_informations = []
missing_domain_invites = []
# Check Domain table
matching_domains = Domain.objects.filter(name=transition_domain_name)
# Check Domain Information table
matching_domain_informations = DomainInformation.objects.filter(
domain__name=transition_domain_name
)
# Check Domain Invitation table
matching_domain_invitations = DomainInvitation.objects.filter(
email=transition_domain_email.lower(),
domain__name=transition_domain_name,
)
if len(matching_domains) == 0:
missing_domains.append(transition_domain_name)
elif len(matching_domains) > 1:
duplicate_domains.append(transition_domain_name)
if len(matching_domain_informations) == 0:
missing_domain_informations.append(transition_domain_name)
if len(matching_domain_invitations) == 0:
missing_domain_invites.append(transition_domain_name)
total_missing_domains = len(missing_domains)
total_duplicate_domains = len(duplicate_domains)
total_missing_domain_informations = len(missing_domain_informations)
total_missing_domain_invitations = len(missing_domain_invites)
missing_domains_as_string = "{}".format(", ".join(map(str, missing_domains)))
duplicate_domains_as_string = "{}".format(
", ".join(map(str, duplicate_domains))
)
missing_domain_informations_as_string = "{}".format(
", ".join(map(str, missing_domain_informations))
)
missing_domain_invites_as_string = "{}".format(
", ".join(map(str, missing_domain_invites))
)
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ANALYSIS ===============
{total_missing_domains} Missing Domains:
(These are transition domains that are missing from the Domain Table)
{TerminalColors.YELLOW}{missing_domains_as_string}{TerminalColors.OKGREEN}
{total_duplicate_domains} Duplicate Domains:
(These are transition domains which have duplicate entries in the Domain Table)
{TerminalColors.YELLOW}{duplicate_domains_as_string}{TerminalColors.OKGREEN}
{total_missing_domain_informations} Domains Information Entries missing:
(These are transition domains which have no entries in the Domain Information Table)
{TerminalColors.YELLOW}{missing_domain_informations_as_string}{TerminalColors.OKGREEN}
{total_missing_domain_invitations} Domain Invitations missing:
(These are transition domains which have no entires in the Domain Invitation Table)
{TerminalColors.YELLOW}{missing_domain_invites_as_string}{TerminalColors.OKGREEN}
{TerminalColors.ENDC}
"""
)
def run_load_transition_domain_script(
self,
file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
debug_max_entries_to_parse,
):
load_transition_domain_command_string = "./manage.py load_transition_domain "
load_transition_domain_command_string += (
file_location + domain_contacts_filename + " "
)
load_transition_domain_command_string += file_location + contacts_filename + " "
load_transition_domain_command_string += (
file_location + domain_statuses_filename + " "
)
if sep is not None and sep != "|":
load_transition_domain_command_string += f"--sep {sep} "
if reset_table:
load_transition_domain_command_string += "--resetTable "
if debug_on:
load_transition_domain_command_string += "--debug "
if debug_max_entries_to_parse > 0:
load_transition_domain_command_string += (
f"--limitParse {debug_max_entries_to_parse} "
)
proceed_load_transition_domain = TerminalHelper.query_yes_no(
f"""{TerminalColors.OKCYAN}
=====================================
Running load_transition_domain script
=====================================
{load_transition_domain_command_string}
{TerminalColors.FAIL}
Proceed?
{TerminalColors.ENDC}"""
)
if not proceed_load_transition_domain:
return
logger.info(
f"""{TerminalColors.OKCYAN}
==== EXECUTING... ====
{TerminalColors.ENDC}"""
)
os.system(f"{load_transition_domain_command_string}")
def run_transfer_script(self, debug_on):
command_string = "./manage.py transfer_transition_domains_to_domains "
if debug_on:
command_string += "--debug "
proceed_load_transition_domain = TerminalHelper.query_yes_no(
f"""{TerminalColors.OKCYAN}
=====================================================
Running transfer_transition_domains_to_domains script
=====================================================
{command_string}
{TerminalColors.FAIL}
Proceed?
{TerminalColors.ENDC}"""
)
if not proceed_load_transition_domain:
return
logger.info(
f"""{TerminalColors.OKCYAN}
==== EXECUTING... ====
{TerminalColors.ENDC}"""
)
os.system(f"{command_string}")
def run_migration_scripts(self, options):
file_location = options.get("loaderDirectory") + "/"
filenames = options.get("loaderFilenames").split()
if len(filenames) < 3:
filenames_as_string = "{}".format(", ".join(map(str, filenames)))
logger.info(
f"""
{TerminalColors.FAIL}
--loaderFilenames expected 3 filenames to follow it,
but only {len(filenames)} were given:
{filenames_as_string}
PLEASE MODIFY THE SCRIPT AND TRY RUNNING IT AGAIN
============= TERMINATING =============
{TerminalColors.ENDC}
"""
)
return
domain_contacts_filename = filenames[0]
contacts_filename = filenames[1]
domain_statuses_filename = filenames[2]
files_are_correct = TerminalHelper.query_yes_no(
f"""
{TerminalColors.OKCYAN}
*** IMPORTANT: VERIFY THE FOLLOWING ***
The migration scripts are looking in directory....
{file_location}
....for the following files:
- domain contacts: {domain_contacts_filename}
- contacts: {contacts_filename}
- domain statuses: {domain_statuses_filename}y
{TerminalColors.FAIL}
Does this look correct?{TerminalColors.ENDC}"""
)
if not files_are_correct:
# prompt the user to provide correct file inputs
logger.info(
f"""
{TerminalColors.YELLOW}
PLEASE Re-Run the script with the correct file location and filenames:
EXAMPLE:
docker compose run -T app ./manage.py test_domain_migration --runLoaders --loaderDirectory /app/tmp --loaderFilenames escrow_domain_contacts.daily.gov.GOV.txt escrow_contacts.daily.gov.GOV.txt escrow_domain_statuses.daily.gov.GOV.txt
"""
)
return
# Get --sep argument
sep = options.get("sep")
# Get --resetTable argument
reset_table = options.get("resetTable")
# Get --debug argument
debug_on = options.get("debug")
# Get --limitParse argument
debug_max_entries_to_parse = int(
options.get("limitParse")
) # set to 0 to parse all entries
self.run_load_transition_domain_script(
file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
debug_max_entries_to_parse,
)
self.run_transfer_script(debug_on)
def simulate_user_logins(self, debug_on):
logger.info(
f"""{TerminalColors.OKCYAN}
==================
SIMULATING LOGINS
==================
{TerminalColors.ENDC}
"""
)
for invite in DomainInvitation.objects.all():
# DEBUG:
TerminalHelper.print_debug(
debug_on,
f"""{TerminalColors.OKCYAN}Processing invite: {invite}{TerminalColors.ENDC}""",
)
# get a user with this email address
User = get_user_model()
try:
user = User.objects.get(email=invite.email)
# DEBUG:
TerminalHelper.print_debug(
debug_on,
f"""{TerminalColors.OKCYAN}Logging in user: {user}{TerminalColors.ENDC}""",
)
Client.force_login(user)
except User.DoesNotExist:
# TODO: how should we handle this?
logger.warn(
f"""{TerminalColors.FAIL}No user found {invite.email}{TerminalColors.ENDC}"""
)
def handle(
self,
**options,
):
"""
Does a diff between the transition_domain and the following tables:
domain, domain_information and the domain_invitation.
Produces the following report (printed to the terminal):
#1 - Print any domains that exist in the transition_domain table
but not in their corresponding domain, domain information or
domain invitation tables.
#2 - Print which table this domain is missing from
#3- Check for duplicate entries in domain or
domain_information tables and print which are
duplicates and in which tables
(ONLY RUNS with full script option)
- Emails should be sent to the appropriate users
note that all moved domains should now be accessible
on django admin for an analyst
OPTIONS:
-- (run all other scripts:
1 - imports for trans domains
2 - transfer to domain & domain invitation
3 - send domain invite)
** Triggers table reset **
"""
# Get --debug argument
debug_on = options.get("debug")
# Get --runLoaders argument
run_loaders_on = options.get("runLoaders")
# Get --triggerLogins argument
simulate_user_login_enabled = options.get("triggerLogins")
prompt_continuation_of_analysis = False
# Run migration scripts if specified by user...
if run_loaders_on:
self.run_migration_scripts(options)
prompt_continuation_of_analysis = True
# Simulate user login for each user in domain invitation if sepcified by user
if simulate_user_login_enabled:
self.simulate_user_logins(debug_on)
prompt_continuation_of_analysis = True
analyze_tables = True
if prompt_continuation_of_analysis:
analyze_tables = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL}
Proceed with table analysis?
{TerminalColors.ENDC}"""
)
# Analyze tables for corrupt data...
if analyze_tables:
self.compare_tables(debug_on)

View file

@ -10,25 +10,14 @@ from registrar.models import TransitionDomain
from registrar.models import Domain
from registrar.models import DomainInvitation
from registrar.management.commands.utility.terminal_helper import (
TerminalColors,
TerminalHelper,
)
logger = logging.getLogger(__name__)
class termColors:
"""Colors for terminal outputs
(makes reading the logs WAY easier)"""
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
YELLOW = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
BackgroundLightYellow = "\033[103m"
class Command(BaseCommand):
help = """Load data from transition domain tables
into main domain tables. Also create domain invitation
@ -49,34 +38,25 @@ class Command(BaseCommand):
):
"""Prints additional terminal statements to indicate if --debug
or --limitParse are in use"""
self.print_debug(
TerminalHelper.print_conditional(
debug_on,
f"""{termColors.OKCYAN}
f"""{TerminalColors.OKCYAN}
----------DEBUG MODE ON----------
Detailed print statements activated.
{termColors.ENDC}
{TerminalColors.ENDC}
""",
)
self.print_debug(
TerminalHelper.print_conditional(
debug_max_entries_to_parse > 0,
f"""{termColors.OKCYAN}
f"""{TerminalColors.OKCYAN}
----------LIMITER ON----------
Parsing of entries will be limited to
{debug_max_entries_to_parse} lines per file.")
Detailed print statements activated.
{termColors.ENDC}
{TerminalColors.ENDC}
""",
)
def print_debug(self, print_condition: bool, print_statement: str):
"""This function reduces complexity of debug statements
in other functions.
It uses the logger to write the given print_statement to the
terminal if print_condition is TRUE"""
# DEBUG:
if print_condition:
logger.info(print_statement)
def update_domain_status(
self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool
) -> bool:
@ -96,13 +76,13 @@ class Command(BaseCommand):
target_domain.save()
# DEBUG:
self.print_debug(
TerminalHelper.print_conditional(
debug_on,
f"""{termColors.YELLOW}
f"""{TerminalColors.YELLOW}
>> Updated {target_domain.name} state from
'{existing_status}' to '{target_domain.state}'
(no domain invitation entry added)
{termColors.ENDC}""",
{TerminalColors.ENDC}""",
)
return True
return False
@ -123,22 +103,22 @@ class Command(BaseCommand):
total_domain_invitation_entries = len(domain_invitations_to_create)
logger.info(
f"""{termColors.OKGREEN}
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============
Created {total_new_entries} transition domain entries,
Updated {total_updated_domain_entries} transition domain entries
Created {total_new_entries} domain entries,
Updated {total_updated_domain_entries} domain entries
Created {total_domain_invitation_entries} domain invitation entries
(NOTE: no invitations are SENT in this script)
{termColors.ENDC}
{TerminalColors.ENDC}
"""
)
if len(skipped_domain_entries) > 0:
logger.info(
f"""{termColors.FAIL}
f"""{TerminalColors.FAIL}
============= SKIPPED DOMAINS (ERRORS) ===============
{skipped_domain_entries}
{termColors.ENDC}
{TerminalColors.ENDC}
"""
)
@ -151,25 +131,25 @@ class Command(BaseCommand):
skipped_domain_invitations.remove(domain_invite.domain)
if len(skipped_domain_invitations) > 0:
logger.info(
f"""{termColors.FAIL}
f"""{TerminalColors.FAIL}
============= SKIPPED DOMAIN INVITATIONS (ERRORS) ===============
{skipped_domain_invitations}
{termColors.ENDC}
{TerminalColors.ENDC}
"""
)
# DEBUG:
self.print_debug(
TerminalHelper.print_conditional(
debug_on,
f"""{termColors.YELLOW}
f"""{TerminalColors.YELLOW}
======= DEBUG OUTPUT =======
Created Domains:
{domains_to_create}
Updated Domains:
{updated_domain_entries}
{termColors.ENDC}
{TerminalColors.ENDC}
""",
)
@ -184,7 +164,7 @@ class Command(BaseCommand):
if associated_domain is None:
logger.warning(
f"""
{termColors.FAIL}
{TerminalColors.FAIL}
!!! ERROR: Domain cannot be null for a
Domain Invitation object!
@ -241,11 +221,11 @@ class Command(BaseCommand):
total_rows_parsed = 0
logger.info(
f"""{termColors.OKGREEN}
f"""{TerminalColors.OKGREEN}
==========================
Beginning Data Transfer
==========================
{termColors.ENDC}"""
{TerminalColors.ENDC}"""
)
for transition_domain in TransitionDomain.objects.all():
@ -254,11 +234,11 @@ class Command(BaseCommand):
transition_domain_email = transition_domain.username
# DEBUG:
self.print_debug(
TerminalHelper.print_conditional(
debug_on,
f"""{termColors.OKCYAN}
f"""{TerminalColors.OKCYAN}
Processing Transition Domain: {transition_domain_name}, {transition_domain_status}, {transition_domain_email}
{termColors.ENDC}""", # noqa
{TerminalColors.ENDC}""", # noqa
)
new_domain_invitation = None
@ -269,11 +249,11 @@ class Command(BaseCommand):
# get the existing domain
domain_to_update = Domain.objects.get(name=transition_domain_name)
# DEBUG:
self.print_debug(
TerminalHelper.print_conditional(
debug_on,
f"""{termColors.YELLOW}
f"""{TerminalColors.YELLOW}
> Found existing entry in Domain table for: {transition_domain_name}, {domain_to_update.state}
{termColors.ENDC}""", # noqa
{TerminalColors.ENDC}""", # noqa
)
# for existing entry, update the status to
@ -300,7 +280,7 @@ class Command(BaseCommand):
# immediate attention.
logger.warning(
f"""
{termColors.FAIL}
{TerminalColors.FAIL}
!!! ERROR: duplicate entries already exist in the
Domain table for the following domain:
{transition_domain_name}
@ -316,7 +296,7 @@ class Command(BaseCommand):
except TransitionNotAllowed as err:
skipped_domain_entries.append(transition_domain_name)
logger.warning(
f"""{termColors.FAIL}
f"""{TerminalColors.FAIL}
Unable to change state for {transition_domain_name}
RECOMMENDATION:
@ -343,15 +323,15 @@ class Command(BaseCommand):
None,
)
if existing_domain_in_to_create is not None:
self.print_debug(
TerminalHelper.print_conditional(
debug_on,
f"""{termColors.YELLOW}
f"""{TerminalColors.YELLOW}
Duplicate Detected: {transition_domain_name}.
Cannot add duplicate entry for another username.
Violates Unique Key constraint.
Checking for unique user e-mail for Domain Invitations...
{termColors.ENDC}""",
{TerminalColors.ENDC}""",
)
new_domain_invitation = self.try_add_domain_invitation(
transition_domain_email, existing_domain_in_to_create
@ -363,9 +343,9 @@ class Command(BaseCommand):
)
domains_to_create.append(new_domain)
# DEBUG:
self.print_debug(
TerminalHelper.print_conditional(
debug_on,
f"{termColors.OKCYAN} Adding domain: {new_domain} {termColors.ENDC}", # noqa
f"{TerminalColors.OKCYAN} Adding domain: {new_domain} {TerminalColors.ENDC}", # noqa
)
new_domain_invitation = self.try_add_domain_invitation(
transition_domain_email, new_domain
@ -373,14 +353,14 @@ class Command(BaseCommand):
if new_domain_invitation is None:
logger.info(
f"{termColors.YELLOW} ! No new e-mail detected !" # noqa
f"(SKIPPED ADDING DOMAIN INVITATION){termColors.ENDC}"
f"{TerminalColors.YELLOW} ! No new e-mail detected !" # noqa
f"(SKIPPED ADDING DOMAIN INVITATION){TerminalColors.ENDC}"
)
else:
# DEBUG:
self.print_debug(
TerminalHelper.print_conditional(
debug_on,
f"{termColors.OKCYAN} Adding domain invitation: {new_domain_invitation} {termColors.ENDC}", # noqa
f"{TerminalColors.OKCYAN} Adding domain invitation: {new_domain_invitation} {TerminalColors.ENDC}", # noqa
)
domain_invitations_to_create.append(new_domain_invitation)
@ -390,9 +370,9 @@ class Command(BaseCommand):
and total_rows_parsed >= debug_max_entries_to_parse
):
logger.info(
f"""{termColors.YELLOW}
f"""{TerminalColors.YELLOW}
----PARSE LIMIT REACHED. HALTING PARSER.----
{termColors.ENDC}
{TerminalColors.ENDC}
"""
)
break

View file

@ -1,4 +1,6 @@
import logging
import os
import sys
logger = logging.getLogger(__name__)
@ -12,6 +14,7 @@ class TerminalColors:
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
YELLOW = "\033[93m"
MAGENTA = "\033[35m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
@ -50,7 +53,7 @@ class TerminalHelper:
else:
logger.info("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
def print_debug(print_condition: bool, print_statement: str):
def print_conditional(print_condition: bool, print_statement: str):
"""This function reduces complexity of debug statements
in other functions.
It uses the logger to write the given print_statement to the
@ -58,3 +61,41 @@ class TerminalHelper:
# DEBUG:
if print_condition:
logger.info(print_statement)
def prompt_for_execution(system_exit_on_terminate: bool,
info_to_inspect: str,
prompt_title: str) -> bool:
"""Create to reduce code complexity.
Prompts the user to inspect the given string
and asks if they wish to execute it.
Returns true if the user responds (y),
Returns false if the user responds (n)"""
action_description_for_selecting_no = "skip"
if system_exit_on_terminate:
action_description_for_selecting_no = "exit"
# Allow the user to inspect the command string
# and ask if they wish to proceed
proceed_execution = TerminalHelper.query_yes_no(
f"""{TerminalColors.OKCYAN}
=====================================================
{prompt_title}
=====================================================
*** IMPORTANT: VERIFY THE FOLLOWING LOOKS CORRECT ***
{info_to_inspect}
{TerminalColors.FAIL}
Proceed? (Y = proceed, N = {action_description_for_selecting_no})
{TerminalColors.ENDC}"""
)
# If the user decided to proceed return true.
# Otherwise, either return false or exit this subroutine.
if not proceed_execution:
if system_exit_on_terminate:
sys.exit()
return False
return True

View file

@ -0,0 +1,7 @@
TESTUSER|52563_CONTACT_GOV-VRSN|919-000-0000||918-000-0000||testuser@gmail.com|GSA|VERISIGN|ctldbatch|2021-06-30T17:58:09Z|VERISIGN|ctldbatch|2021-06-30T18:18:09Z|
RJD1|52545_CONTACT_GOV-VRSN|919-000-0000||918-000-0000||agustina.wyman7@test.com|GSA|VERISIGN|ctldbatch|2021-06-29T18:53:09Z|VERISIGN|ctldbatch|2021-06-29T18:58:08Z|
JAKING|52555_CONTACT_GOV-VRSN|919-000-0000||918-000-0000||susy.martin4@test.com|GSA|VERISIGN|ctldbatch|2021-06-30T15:23:10Z|VERISIGN|ctldbatch|2021-06-30T15:38:10Z|
JBOONE|52556_CONTACT_GOV-VRSN|919-000-0000||918-000-0000||stephania.winters4@test.com|GSA|VERISIGN|ctldbatch|2021-06-30T15:23:10Z|VERISIGN|ctldbatch|2021-06-30T18:28:09Z|
MKELLEY|52557_CONTACT_GOV-VRSN|919-000-0000||918-000-0000||alexandra.bobbitt5@test.com|GSA|VERISIGN|ctldbatch|2021-06-30T15:23:10Z|VERISIGN|ctldbatch|2021-08-02T22:13:09Z|
CWILSON|52562_CONTACT_GOV-VRSN|919-000-0000||918-000-0000||jospeh.mcdowell3@test.com|GSA|VERISIGN|ctldbatch|2021-06-30T17:58:09Z|VERISIGN|ctldbatch|2021-06-30T18:33:09Z|
LMCCADE|52563_CONTACT_GOV-VRSN|919-000-0000||918-000-0000||reginald.ratcliff4@test.com|GSA|VERISIGN|ctldbatch|2021-06-30T17:58:09Z|VERISIGN|ctldbatch|2021-06-30T18:18:09Z|

View file

@ -0,0 +1,8 @@
Anomaly.gov|ANOMALY|tech
TestDomain.gov|TESTUSER|admin
NEHRP.GOV|RJD1|admin
NEHRP.GOV|JAKING|tech
NEHRP.GOV|JBOONE|billing
NELSONCOUNTY-VA.GOV|MKELLEY|admin
NELSONCOUNTY-VA.GOV|CWILSON|billing
NELSONCOUNTY-VA.GOV|LMCCADE|tech

View file

@ -0,0 +1,4 @@
Anomaly.gov|muahaha|
TestDomain.gov|ok|
NEHRP.GOV|serverHold|
NELSONCOUNTY-VA.GOV|Hold|

View file

@ -0,0 +1,259 @@
from unittest.mock import patch
from django.test import TestCase
from registrar.models import (
User,
Domain,
DomainInvitation,
TransitionDomain,
DomainInformation,
UserDomainRole,
)
from registrar.management.commands.master_domain_migrations import Command as master_migration_command
from django.core.management import call_command
class TestLogins(TestCase):
""" """
def setUp(self):
""" """
# self.load_transition_domain_script = "load_transition_domain",
# self.transfer_script = "transfer_transition_domains_to_domains",
# self.master_script = "load_transition_domain",
self.test_data_file_location = "/app/registrar/tests/data"
self.test_domain_contact_filename = "test_domain_contacts.txt"
self.test_contact_filename = "test_contacts.txt"
self.test_domain_status_filename = "test_domain_statuses.txt"
def tearDown(self):
super().tearDown()
TransitionDomain.objects.all().delete()
Domain.objects.all().delete()
DomainInvitation.objects.all().delete()
DomainInformation.objects.all().delete()
User.objects.all().delete()
UserDomainRole.objects.all().delete()
def run_load_domains(self):
call_command(
"load_transition_domain",
f"{self.test_data_file_location}/{self.test_domain_contact_filename}",
f"{self.test_data_file_location}/{self.test_contact_filename}",
f"{self.test_data_file_location}/{self.test_domain_status_filename}",
)
def run_transfer_domains(self):
call_command("transfer_transition_domains_to_domains")
def run_master_script(self):
command = call_command(
"master_domain_migrations",
runMigrations=True,
migrationDirectory=f"{self.test_data_file_location}",
migrationFilenames=(f"{self.test_domain_contact_filename},"
f"{self.test_contact_filename},"
f"{self.test_domain_status_filename}"),
)
def compare_tables(self,
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_total_domain_invitations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
expected_missing_domain_invitations):
"""Does a diff between the transition_domain and the following tables:
domain, domain_information and the domain_invitation.
Verifies that the data loaded correctly."""
missing_domains = []
duplicate_domains = []
missing_domain_informations = []
missing_domain_invites = []
for transition_domain in TransitionDomain.objects.all():# DEBUG:
transition_domain_name = transition_domain.domain_name
transition_domain_email = transition_domain.username
# Check Domain table
matching_domains = Domain.objects.filter(name=transition_domain_name)
# Check Domain Information table
matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name)
# Check Domain Invitation table
matching_domain_invitations = DomainInvitation.objects.filter(email=transition_domain_email.lower(),
domain__name=transition_domain_name)
if len(matching_domains) == 0:
missing_domains.append(transition_domain_name)
elif len(matching_domains) > 1:
duplicate_domains.append(transition_domain_name)
if len(matching_domain_informations) == 0:
missing_domain_informations.append(transition_domain_name)
if len(matching_domain_invitations) == 0:
missing_domain_invites.append(transition_domain_name)
total_missing_domains = len(missing_domains)
total_duplicate_domains = len(duplicate_domains)
total_missing_domain_informations = len(missing_domain_informations)
total_missing_domain_invitations = len(missing_domain_invites)
total_transition_domains = len(TransitionDomain.objects.all())
total_domains = len(Domain.objects.all())
total_domain_informations = len(DomainInformation.objects.all())
total_domain_invitations = len(DomainInvitation.objects.all())
print(f"""
total_missing_domains = {len(missing_domains)}
total_duplicate_domains = {len(duplicate_domains)}
total_missing_domain_informations = {len(missing_domain_informations)}
total_missing_domain_invitations = {len(missing_domain_invites)}
total_transition_domains = {len(TransitionDomain.objects.all())}
total_domains = {len(Domain.objects.all())}
total_domain_informations = {len(DomainInformation.objects.all())}
total_domain_invitations = {len(DomainInvitation.objects.all())}
""")
self.assertTrue(total_missing_domains == expected_missing_domains)
self.assertTrue(total_duplicate_domains == expected_duplicate_domains)
self.assertTrue(total_missing_domain_informations == expected_missing_domain_informations)
self.assertTrue(total_missing_domain_invitations == expected_missing_domain_invitations)
self.assertTrue(total_transition_domains == expected_total_transition_domains)
self.assertTrue(total_domains == expected_total_domains)
self.assertTrue(total_domain_informations == expected_total_domain_informations)
self.assertTrue(total_domain_invitations == expected_total_domain_invitations)
def test_master_migration_functions(self):
""" Run the full master migration script using local test data.
NOTE: This is more of an integration test and so far does not
follow best practice of limiting the number of assertions per test.
But for now, this will double-check that the script
works as intended. """
self.run_master_script()
# # TODO: instead of patching....there has got to be a way of making sure subsequent commands use the django database
# # Patch subroutines for migrations
# def side_effect():
# self.run_load_domains()
# self.run_transfer_domains()
# patcher = patch("registrar.management.commands.master_domain_migrations.Command.run_migration_scripts")
# mocked_get = patcher.start()
# mocked_get.side_effect = side_effect
# # Patch subroutines for sending invitations
# def side_effect():
# # TODO: what should happen here?
# return
# patcher = patch("registrar.management.commands.master_domain_migrations.Command.run_send_invites_script")
# mocked_get = patcher.start()
# mocked_get.side_effect = side_effect
# STEP 2: (analyze the tables just like the migration script does, but add assert statements)
expected_total_transition_domains = 8
expected_total_domains = 4
expected_total_domain_informations = 0
expected_total_domain_invitations = 7
expected_missing_domains = 0
expected_duplicate_domains = 0
# we expect 8 missing domain invites since the migration does not auto-login new users
expected_missing_domain_informations = 8
# we expect 1 missing invite from anomaly.gov (an injected error)
expected_missing_domain_invitations = 1
self.compare_tables(expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_total_domain_invitations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
expected_missing_domain_invitations,
)
def test_load_transition_domain(self):
self.run_load_domains()
# STEP 2: (analyze the tables just like the migration script does, but add assert statements)
expected_total_transition_domains = 8
expected_total_domains = 0
expected_total_domain_informations = 0
expected_total_domain_invitations = 0
expected_missing_domains = 8
expected_duplicate_domains = 0
expected_missing_domain_informations = 8
expected_missing_domain_invitations = 8
self.compare_tables(expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_total_domain_invitations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
expected_missing_domain_invitations,
)
def test_transfer_transition_domains_to_domains(self):
# TODO: setup manually instead of calling other script
self.run_load_domains()
self.run_transfer_domains()
# Analyze the tables
expected_total_transition_domains = 8
expected_total_domains = 4
expected_total_domain_informations = 0
expected_total_domain_invitations = 7
expected_missing_domains = 0
expected_duplicate_domains = 0
expected_missing_domain_informations = 8
expected_missing_domain_invitations = 1
self.compare_tables(expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_total_domain_invitations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
expected_missing_domain_invitations,
)
def test_logins(self):
# TODO: setup manually instead of calling other scripts
self.run_load_domains()
self.run_transfer_domains()
# Simluate Logins
for invite in DomainInvitation.objects.all():
# get a user with this email address
user, user_created = User.objects.get_or_create(email=invite.email, username=invite.email)
user.first_login()
# Analyze the tables
expected_total_transition_domains = 8
expected_total_domains = 4
expected_total_domain_informations = 3
expected_total_domain_invitations = 7
expected_missing_domains = 0
expected_duplicate_domains = 0
expected_missing_domain_informations = 1
expected_missing_domain_invitations = 1
self.compare_tables(expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_total_domain_invitations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
expected_missing_domain_invitations,
)