added comments and typehints. Refactored a little.

This commit is contained in:
CocoByte 2023-10-24 15:13:56 -06:00
parent 852f22a57a
commit 4f6b5e2b06
No known key found for this signature in database
GPG key ID: BBFAA2526384C97F
4 changed files with 371 additions and 328 deletions

View file

@ -9,56 +9,15 @@ from django.core.management import BaseCommand
from registrar.models import TransitionDomain from registrar.models import TransitionDomain
from registrar.management.commands.utility.terminal_helper import (
TerminalColors,
TerminalHelper
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class termColors:
"""Colors for terminal outputs
(makes reading the logs WAY easier)"""
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
YELLOW = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
BackgroundLightYellow = "\033[103m"
def query_yes_no(question: str, default="yes") -> bool:
"""Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
"""
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
logger.info(question + prompt)
choice = input().lower()
if default is not None and choice == "":
return valid[default]
elif choice in valid:
return valid[choice]
else:
logger.info("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
class Command(BaseCommand): class Command(BaseCommand):
help = """Loads data for domains that are in transition help = """Loads data for domains that are in transition
(populates transition_domain model objects).""" (populates transition_domain model objects)."""
@ -110,20 +69,20 @@ class Command(BaseCommand):
or --limitParse are in use""" or --limitParse are in use"""
if debug_on: if debug_on:
logger.info( logger.info(
f"""{termColors.OKCYAN} f"""{TerminalColors.OKCYAN}
----------DEBUG MODE ON---------- ----------DEBUG MODE ON----------
Detailed print statements activated. Detailed print statements activated.
{termColors.ENDC} {TerminalColors.ENDC}
""" """
) )
if debug_max_entries_to_parse > 0: if debug_max_entries_to_parse > 0:
logger.info( logger.info(
f"""{termColors.OKCYAN} f"""{TerminalColors.OKCYAN}
----------LIMITER ON---------- ----------LIMITER ON----------
Parsing of entries will be limited to Parsing of entries will be limited to
{debug_max_entries_to_parse} lines per file.") {debug_max_entries_to_parse} lines per file.")
Detailed print statements activated. Detailed print statements activated.
{termColors.ENDC} {TerminalColors.ENDC}
""" """
) )
@ -195,7 +154,7 @@ class Command(BaseCommand):
", ".join(map(str, duplicate_domain_user_combos)) ", ".join(map(str, duplicate_domain_user_combos))
) )
logger.warning( logger.warning(
f"{termColors.YELLOW} No e-mails found for users: {users_without_email_as_string}" # noqa f"{TerminalColors.YELLOW} No e-mails found for users: {users_without_email_as_string}" # noqa
) )
if total_duplicate_pairs > 0 or total_duplicate_domains > 0: if total_duplicate_pairs > 0 or total_duplicate_domains > 0:
duplicate_pairs_as_string = "{}".format( duplicate_pairs_as_string = "{}".format(
@ -205,7 +164,7 @@ class Command(BaseCommand):
", ".join(map(str, duplicate_domains)) ", ".join(map(str, duplicate_domains))
) )
logger.warning( logger.warning(
f"""{termColors.YELLOW} f"""{TerminalColors.YELLOW}
----DUPLICATES FOUND----- ----DUPLICATES FOUND-----
@ -218,7 +177,7 @@ class Command(BaseCommand):
the supplied data files; the supplied data files;
{duplicate_domains_as_string} {duplicate_domains_as_string}
{termColors.ENDC}""" {TerminalColors.ENDC}"""
) )
def print_summary_status_findings( def print_summary_status_findings(
@ -237,7 +196,7 @@ class Command(BaseCommand):
", ".join(map(str, domains_without_status)) ", ".join(map(str, domains_without_status))
) )
logger.warning( logger.warning(
f"""{termColors.YELLOW} f"""{TerminalColors.YELLOW}
-------------------------------------------- --------------------------------------------
Found {total_domains_without_status} domains Found {total_domains_without_status} domains
@ -245,7 +204,7 @@ class Command(BaseCommand):
--------------------------------------------- ---------------------------------------------
{domains_without_status_as_string} {domains_without_status_as_string}
{termColors.ENDC}""" {TerminalColors.ENDC}"""
) )
if total_outlier_statuses > 0: if total_outlier_statuses > 0:
@ -253,7 +212,7 @@ class Command(BaseCommand):
", ".join(map(str, outlier_statuses)) ", ".join(map(str, outlier_statuses))
) # noqa ) # noqa
logger.warning( logger.warning(
f"""{termColors.YELLOW} f"""{TerminalColors.YELLOW}
-------------------------------------------- --------------------------------------------
Found {total_outlier_statuses} unaccounted Found {total_outlier_statuses} unaccounted
@ -264,36 +223,27 @@ class Command(BaseCommand):
(defaulted to Ready): (defaulted to Ready):
{domains_without_status_as_string} {domains_without_status_as_string}
{termColors.ENDC}""" {TerminalColors.ENDC}"""
) )
def print_debug(self, print_condition: bool, print_statement: str):
"""This function reduces complexity of debug statements
in other functions.
It uses the logger to write the given print_statement to the
terminal if print_condition is TRUE"""
# DEBUG:
if print_condition:
logger.info(print_statement)
def prompt_table_reset(self): def prompt_table_reset(self):
"""Brings up a prompt in the terminal asking """Brings up a prompt in the terminal asking
if the user wishes to delete data in the if the user wishes to delete data in the
TransitionDomain table. If the user confirms, TransitionDomain table. If the user confirms,
deletes all the data in the TransitionDomain table""" deletes all the data in the TransitionDomain table"""
confirm_reset = query_yes_no( confirm_reset = TerminalHelper.query_yes_no(
f""" f"""
{termColors.FAIL} {TerminalColors.FAIL}
WARNING: Resetting the table will permanently delete all WARNING: Resetting the table will permanently delete all
the data! the data!
Are you sure you want to continue?{termColors.ENDC}""" Are you sure you want to continue?{TerminalColors.ENDC}"""
) )
if confirm_reset: if confirm_reset:
logger.info( logger.info(
f"""{termColors.YELLOW} f"""{TerminalColors.YELLOW}
----------Clearing Table Data---------- ----------Clearing Table Data----------
(please wait) (please wait)
{termColors.ENDC}""" {TerminalColors.ENDC}"""
) )
TransitionDomain.objects.all().delete() TransitionDomain.objects.all().delete()
@ -428,18 +378,18 @@ class Command(BaseCommand):
) )
if existing_domain is not None: if existing_domain is not None:
# DEBUG: # DEBUG:
self.print_debug( TerminalHelper.print_conditional(
debug_on, debug_on,
f"{termColors.YELLOW} DUPLICATE file entries found for domain: {new_entry_domain_name} {termColors.ENDC}", # noqa f"{TerminalColors.YELLOW} DUPLICATE file entries found for domain: {new_entry_domain_name} {TerminalColors.ENDC}", # noqa
) )
if new_entry_domain_name not in duplicate_domains: if new_entry_domain_name not in duplicate_domains:
duplicate_domains.append(new_entry_domain_name) duplicate_domains.append(new_entry_domain_name)
if existing_domain_user_pair is not None: if existing_domain_user_pair is not None:
# DEBUG: # DEBUG:
self.print_debug( TerminalHelper.print_conditional(
debug_on, debug_on,
f"""{termColors.YELLOW} DUPLICATE file entries found for domain - user {termColors.BackgroundLightYellow} PAIR {termColors.ENDC}{termColors.YELLOW}: f"""{TerminalColors.YELLOW} DUPLICATE file entries found for domain - user {TerminalColors.BackgroundLightYellow} PAIR {TerminalColors.ENDC}{TerminalColors.YELLOW}:
{new_entry_domain_name} - {new_entry_email} {termColors.ENDC}""", # noqa {new_entry_domain_name} - {new_entry_email} {TerminalColors.ENDC}""", # noqa
) )
if existing_domain_user_pair not in duplicate_domain_user_combos: if existing_domain_user_pair not in duplicate_domain_user_combos:
duplicate_domain_user_combos.append(existing_domain_user_pair) duplicate_domain_user_combos.append(existing_domain_user_pair)
@ -456,20 +406,20 @@ class Command(BaseCommand):
if existing_entry.status != new_entry_status: if existing_entry.status != new_entry_status:
# DEBUG: # DEBUG:
self.print_debug( TerminalHelper.print_conditional(
debug_on, debug_on,
f"{termColors.OKCYAN}" f"{TerminalColors.OKCYAN}"
f"Updating entry: {existing_entry}" f"Updating entry: {existing_entry}"
f"Status: {existing_entry.status} > {new_entry_status}" # noqa f"Status: {existing_entry.status} > {new_entry_status}" # noqa
f"Email Sent: {existing_entry.email_sent} > {new_entry_emailSent}" # noqa f"Email Sent: {existing_entry.email_sent} > {new_entry_emailSent}" # noqa
f"{termColors.ENDC}", f"{TerminalColors.ENDC}",
) )
existing_entry.status = new_entry_status existing_entry.status = new_entry_status
existing_entry.email_sent = new_entry_emailSent existing_entry.email_sent = new_entry_emailSent
existing_entry.save() existing_entry.save()
except TransitionDomain.MultipleObjectsReturned: except TransitionDomain.MultipleObjectsReturned:
logger.info( logger.info(
f"{termColors.FAIL}" f"{TerminalColors.FAIL}"
f"!!! ERROR: duplicate entries exist in the" f"!!! ERROR: duplicate entries exist in the"
f"transtion_domain table for domain:" f"transtion_domain table for domain:"
f"{new_entry_domain_name}" f"{new_entry_domain_name}"
@ -488,9 +438,9 @@ class Command(BaseCommand):
total_new_entries += 1 total_new_entries += 1
# DEBUG: # DEBUG:
self.print_debug( TerminalHelper.print_conditional(
debug_on, debug_on,
f"{termColors.OKCYAN} Adding entry {total_new_entries}: {new_entry} {termColors.ENDC}", # noqa f"{TerminalColors.OKCYAN} Adding entry {total_new_entries}: {new_entry} {TerminalColors.ENDC}", # noqa
) )
# Check Parse limit and exit loop if needed # Check Parse limit and exit loop if needed
@ -499,20 +449,20 @@ class Command(BaseCommand):
and debug_max_entries_to_parse != 0 and debug_max_entries_to_parse != 0
): ):
logger.info( logger.info(
f"{termColors.YELLOW}" f"{TerminalColors.YELLOW}"
f"----PARSE LIMIT REACHED. HALTING PARSER.----" f"----PARSE LIMIT REACHED. HALTING PARSER.----"
f"{termColors.ENDC}" f"{TerminalColors.ENDC}"
) )
break break
TransitionDomain.objects.bulk_create(to_create) TransitionDomain.objects.bulk_create(to_create)
logger.info( logger.info(
f"""{termColors.OKGREEN} f"""{TerminalColors.OKGREEN}
============= FINISHED =============== ============= FINISHED ===============
Created {total_new_entries} transition domain entries, Created {total_new_entries} transition domain entries,
updated {total_updated_domain_entries} transition domain entries updated {total_updated_domain_entries} transition domain entries
{termColors.ENDC} {TerminalColors.ENDC}
""" """
) )

View file

@ -1,5 +1,13 @@
"""Data migration:
1 - generates a report of data integrity across all
transition domain related tables
2 - allows users to run all migration scripts for
transition domain data
"""
import logging import logging
import argparse import argparse
import sys
import os import os
from django.test import Client from django.test import Client
@ -7,18 +15,19 @@ from django.test import Client
from django_fsm import TransitionNotAllowed # type: ignore from django_fsm import TransitionNotAllowed # type: ignore
from django.core.management import BaseCommand from django.core.management import BaseCommand
from django.contrib.auth import get_user_model
from registrar.models import TransitionDomain from registrar.models import (
from registrar.models import Domain Domain,
from registrar.models import DomainInvitation DomainInformation,
from registrar.models import DomainInformation DomainInvitation,
from registrar.models import User TransitionDomain,
User,
)
from registrar.management.commands.utility.terminal_helper import TerminalColors from registrar.management.commands.utility.terminal_helper import (
from registrar.management.commands.utility.terminal_helper import TerminalHelper TerminalColors,
TerminalHelper
from registrar.management.commands.load_transition_domain import Command as load_transition_domain_command )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -28,8 +37,45 @@ class Command(BaseCommand):
def add_arguments(self, parser): def add_arguments(self, parser):
""" """
OPTIONAL ARGUMENTS: OPTIONAL ARGUMENTS:
--runLoaders
A boolean (default to true), which triggers running
all scripts (in sequence) for transition domain migrations
--triggerLogins
A boolean (default to true), which triggers running
simulations of user logins for each user in domain invitation
--loaderDirectory
The location of the files used for load_transition_domain migration script
EXAMPLE USAGE:
> --loaderDirectory /app/tmp
--loaderFilenames
The files used for load_transition_domain migration script.
Must appear IN ORDER and separated by spaces:
EXAMPLE USAGE:
> --loaderFilenames domain_contacts_filename.txt contacts_filename.txt domain_statuses_filename.txt
where...
- domain_contacts_filename is the Data file with domain contact information
- contacts_filename is the Data file with contact information
- domain_statuses_filename is the Data file with domain status information
--sep
Delimiter for the loaders to correctly parse the given text files.
(usually this can remain at default value of |)
--debug --debug
A boolean (default to true), which activates additional print statements A boolean (default to true), which activates additional print statements
--limitParse
Used by the loaders (load_transition_domain) to set the limit for the
number of data entries to insert. Set to 0 (or just don't use this
argument) to parse every entry. This was provided primarily for testing
purposes
--resetTable
Used by the loaders to trigger a prompt for deleting all table entries.
Useful for testing purposes, but USE WITH CAUTION
""" """
parser.add_argument("--runLoaders", parser.add_argument("--runLoaders",
@ -59,22 +105,6 @@ class Command(BaseCommand):
- domain_statuses_filename is the Data file with domain status information""" - domain_statuses_filename is the Data file with domain status information"""
) )
# parser.add_argument(
# "domain_contacts_filename",
# default="escrow_domain_contacts.daily.gov.GOV.txt",
# help="Data file with domain contact information"
# )
# parser.add_argument(
# "contacts_filename",
# default="escrow_contacts.daily.gov.GOV.txt",
# help="Data file with contact information",
# )
# parser.add_argument(
# "domain_statuses_filename",
# default="escrow_domain_statuses.daily.gov.GOV.txt",
# help="Data file with domain status information"
# )
parser.add_argument("--sep", default="|", help="Delimiter character for the loader files") parser.add_argument("--sep", default="|", help="Delimiter character for the loader files")
parser.add_argument("--debug", action=argparse.BooleanOptionalAction) parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
@ -89,30 +119,22 @@ class Command(BaseCommand):
action=argparse.BooleanOptionalAction, action=argparse.BooleanOptionalAction,
) )
def print_debug_mode_statements(
self, debug_on: bool
):
"""Prints additional terminal statements to indicate if --debug
or --limitParse are in use"""
self.print_debug(
debug_on,
f"""{TerminalColors.OKCYAN}
----------DEBUG MODE ON----------
Detailed print statements activated.
{TerminalColors.ENDC}
""",
)
def print_debug(self, print_condition: bool, print_statement: str): def compare_tables(self, debug_on: bool):
"""This function reduces complexity of debug statements """Does a diff between the transition_domain and the following tables:
in other functions. domain, domain_information and the domain_invitation.
It uses the logger to write the given print_statement to the
terminal if print_condition is TRUE""" Produces the following report (printed to the terminal):
# DEBUG: #1 - Print any domains that exist in the transition_domain table
if print_condition: but not in their corresponding domain, domain information or
logger.info(print_statement) domain invitation tables.
#2 - Print which table this domain is missing from
def compare_tables(self, debug_on): #3- Check for duplicate entries in domain or
domain_information tables and print which are
duplicates and in which tables
"""
logger.info( logger.info(
f"""{TerminalColors.OKCYAN} f"""{TerminalColors.OKCYAN}
============= BEGINNING ANALYSIS =============== ============= BEGINNING ANALYSIS ===============
@ -121,20 +143,20 @@ class Command(BaseCommand):
) )
#TODO: would filteredRelation be faster? #TODO: would filteredRelation be faster?
missing_domains = []
duplicate_domains = []
missing_domain_informations = []
missing_domain_invites = []
for transition_domain in TransitionDomain.objects.all():# DEBUG: for transition_domain in TransitionDomain.objects.all():# DEBUG:
transition_domain_name = transition_domain.domain_name transition_domain_name = transition_domain.domain_name
transition_domain_email = transition_domain.username transition_domain_email = transition_domain.username
self.print_debug( TerminalHelper.print_conditional(
debug_on, debug_on,
f"{TerminalColors.OKCYAN}Checking: {transition_domain_name} {TerminalColors.ENDC}", # noqa f"{TerminalColors.OKCYAN}Checking: {transition_domain_name} {TerminalColors.ENDC}", # noqa
) )
missing_domains = []
duplicate_domains = []
missing_domain_informations = []
missing_domain_invites = []
# Check Domain table # Check Domain table
matching_domains = Domain.objects.filter(name=transition_domain_name) matching_domains = Domain.objects.filter(name=transition_domain_name)
# Check Domain Information table # Check Domain Information table
@ -144,12 +166,16 @@ class Command(BaseCommand):
domain__name=transition_domain_name) domain__name=transition_domain_name)
if len(matching_domains) == 0: if len(matching_domains) == 0:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain{TerminalColors.ENDC}""")
missing_domains.append(transition_domain_name) missing_domains.append(transition_domain_name)
elif len(matching_domains) > 1: elif len(matching_domains) > 1:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Duplicate Domain{TerminalColors.ENDC}""")
duplicate_domains.append(transition_domain_name) duplicate_domains.append(transition_domain_name)
if len(matching_domain_informations) == 0: if len(matching_domain_informations) == 0:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Information{TerminalColors.ENDC}""")
missing_domain_informations.append(transition_domain_name) missing_domain_informations.append(transition_domain_name)
if len(matching_domain_invitations) == 0: if len(matching_domain_invitations) == 0:
TerminalHelper.print_conditional(debug_on, f"""{TerminalColors.YELLOW}Missing Domain Invitation{TerminalColors.ENDC}""")
missing_domain_invites.append(transition_domain_name) missing_domain_invites.append(transition_domain_name)
total_missing_domains = len(missing_domains) total_missing_domains = len(missing_domains)
@ -174,7 +200,7 @@ class Command(BaseCommand):
(These are transition domains which have duplicate entries in the Domain Table) (These are transition domains which have duplicate entries in the Domain Table)
{TerminalColors.YELLOW}{duplicate_domains_as_string}{TerminalColors.OKGREEN} {TerminalColors.YELLOW}{duplicate_domains_as_string}{TerminalColors.OKGREEN}
{total_missing_domain_informations} Domains Information Entries missing: {total_missing_domain_informations} Domain Information Entries missing:
(These are transition domains which have no entries in the Domain Information Table) (These are transition domains which have no entries in the Domain Information Table)
{TerminalColors.YELLOW}{missing_domain_informations_as_string}{TerminalColors.OKGREEN} {TerminalColors.YELLOW}{missing_domain_informations_as_string}{TerminalColors.OKGREEN}
@ -185,79 +211,92 @@ class Command(BaseCommand):
""" """
) )
def run_load_transition_domain_script(self, def prompt_for_execution(self, command_string: str, prompt_title: str) -> bool:
file_location, """Prompts the user to inspect the given terminal command string
domain_contacts_filename, and asks if they wish to execute it. If the user responds (y),
contacts_filename, execute the command"""
domain_statuses_filename,
sep,
reset_table,
debug_on,
debug_max_entries_to_parse):
load_transition_domain_command_string = "./manage.py load_transition_domain "
load_transition_domain_command_string += file_location+domain_contacts_filename + " "
load_transition_domain_command_string += file_location+contacts_filename + " "
load_transition_domain_command_string += file_location+domain_statuses_filename + " "
if sep is not None and sep != "|": # Allow the user to inspect the command string
load_transition_domain_command_string += f"--sep {sep} " # and ask if they wish to proceed
proceed_execution = TerminalHelper.query_yes_no(
if reset_table:
load_transition_domain_command_string += "--resetTable "
if debug_on:
load_transition_domain_command_string += "--debug "
if debug_max_entries_to_parse > 0:
load_transition_domain_command_string += f"--limitParse {debug_max_entries_to_parse} "
proceed_load_transition_domain = TerminalHelper.query_yes_no(
f"""{TerminalColors.OKCYAN}
=====================================
Running load_transition_domain script
=====================================
{load_transition_domain_command_string}
{TerminalColors.FAIL}
Proceed?
{TerminalColors.ENDC}"""
)
if not proceed_load_transition_domain:
return
logger.info(f"""{TerminalColors.OKCYAN}
==== EXECUTING... ====
{TerminalColors.ENDC}""")
os.system(f"{load_transition_domain_command_string}")
def run_transfer_script(self, debug_on):
command_string = "./manage.py transfer_transition_domains_to_domains "
if debug_on:
command_string += "--debug "
proceed_load_transition_domain = TerminalHelper.query_yes_no(
f"""{TerminalColors.OKCYAN} f"""{TerminalColors.OKCYAN}
===================================================== =====================================================
Running transfer_transition_domains_to_domains script {prompt_title}
===================================================== =====================================================
*** IMPORTANT: VERIFY THE FOLLOWING COMMAND LOOKS CORRECT ***
{command_string} {command_string}
{TerminalColors.FAIL} {TerminalColors.FAIL}
Proceed? Proceed? (Y = proceed, N = skip)
{TerminalColors.ENDC}""" {TerminalColors.ENDC}"""
) )
if not proceed_load_transition_domain: # If the user decided to proceed executing the command,
return # run the command for loading transition domains.
# Otherwise, exit this subroutine.
if not proceed_execution:
sys.exit()
logger.info(f"""{TerminalColors.OKCYAN} logger.info(f"""{TerminalColors.OKCYAN}
==== EXECUTING... ==== ==== EXECUTING... ====
{TerminalColors.ENDC}""") {TerminalColors.ENDC}""")
os.system(f"{command_string}") os.system(f"{command_string}")
return True
def run_load_transition_domain_script(self,
file_location: str,
domain_contacts_filename: str,
contacts_filename: str,
domain_statuses_filename: str,
sep: str,
reset_table: bool,
debug_on: bool,
debug_max_entries_to_parse: int):
"""Runs the load_transition_domain script"""
# Create the command string
command_string = "./manage.py load_transition_domain "
command_string += file_location+domain_contacts_filename + " "
command_string += file_location+contacts_filename + " "
command_string += file_location+domain_statuses_filename + " "
if sep is not None and sep != "|":
command_string += f"--sep {sep} "
if reset_table:
command_string += "--resetTable "
if debug_on:
command_string += "--debug "
if debug_max_entries_to_parse > 0:
command_string += f"--limitParse {debug_max_entries_to_parse} "
# Execute the command string
self.prompt_for_execution(command_string, "Running load_transition_domain script")
def run_transfer_script(self, debug_on:bool):
"""Runs the transfer_transition_domains_to_domains script"""
# Create the command string
command_string = "./manage.py transfer_transition_domains_to_domains "
if debug_on:
command_string += "--debug "
# Execute the command string
self.prompt_for_execution(command_string, "Running transfer_transition_domains_to_domains script")
def run_send_invites_script(self):
"""Runs the send_domain_invitations script"""
# Create the command string...
command_string = "./manage.py send_domain_invitations -s"
# Execute the command string
self.prompt_for_execution(command_string, "Running send_domain_invitations script")
def run_migration_scripts(self, def run_migration_scripts(self,
options): options):
"""Runs the following migration scripts (in order):
1 - imports for trans domains
2 - transfer to domain & domain invitation"""
# Grab filepath information from the arguments
file_location = options.get("loaderDirectory")+"/" file_location = options.get("loaderDirectory")+"/"
filenames = options.get("loaderFilenames").split() filenames = options.get("loaderFilenames").split()
if len(filenames) < 3: if len(filenames) < 3:
@ -277,6 +316,9 @@ class Command(BaseCommand):
contacts_filename = filenames[1] contacts_filename = filenames[1]
domain_statuses_filename = filenames[2] domain_statuses_filename = filenames[2]
# Allow the user to inspect the filepath
# data given in the arguments, and prompt
# the user to verify this info before proceeding
files_are_correct = TerminalHelper.query_yes_no( files_are_correct = TerminalHelper.query_yes_no(
f""" f"""
{TerminalColors.OKCYAN} {TerminalColors.OKCYAN}
@ -288,14 +330,17 @@ class Command(BaseCommand):
....for the following files: ....for the following files:
- domain contacts: {domain_contacts_filename} - domain contacts: {domain_contacts_filename}
- contacts: {contacts_filename} - contacts: {contacts_filename}
- domain statuses: {domain_statuses_filename}y - domain statuses: {domain_statuses_filename}
{TerminalColors.FAIL} {TerminalColors.FAIL}
Does this look correct?{TerminalColors.ENDC}""" Does this look correct?{TerminalColors.ENDC}"""
) )
# If the user rejected the filepath information
# as incorrect, prompt the user to provide
# correct file inputs in their original command
# prompt and exit this subroutine
if not files_are_correct: if not files_are_correct:
# prompt the user to provide correct file inputs
logger.info(f""" logger.info(f"""
{TerminalColors.YELLOW} {TerminalColors.YELLOW}
PLEASE Re-Run the script with the correct file location and filenames: PLEASE Re-Run the script with the correct file location and filenames:
@ -306,20 +351,16 @@ class Command(BaseCommand):
""") """)
return return
# Get --sep argument # Proceed executing the migration scripts...
# ...First, Get all the arguments
sep = options.get("sep") sep = options.get("sep")
# Get --resetTable argument
reset_table = options.get("resetTable") reset_table = options.get("resetTable")
# Get --debug argument
debug_on = options.get("debug") debug_on = options.get("debug")
# Get --limitParse argument
debug_max_entries_to_parse = int( debug_max_entries_to_parse = int(
options.get("limitParse") options.get("limitParse")
) # set to 0 to parse all entries )
#...Second, Run the migration scripts in order
self.run_load_transition_domain_script(file_location, self.run_load_transition_domain_script(file_location,
domain_contacts_filename, domain_contacts_filename,
contacts_filename, contacts_filename,
@ -328,29 +369,49 @@ class Command(BaseCommand):
reset_table, reset_table,
debug_on, debug_on,
debug_max_entries_to_parse) debug_max_entries_to_parse)
self.run_transfer_script(debug_on) self.run_transfer_script(debug_on)
def simulate_user_logins(self, debug_on): def simulate_user_logins(self, debug_on):
logger.info(f"""{TerminalColors.OKCYAN} """Simulates logins for users (this will add
================== Domain Information objects to our tables)"""
SIMULATING LOGINS
================== logger.info(f""
{TerminalColors.ENDC} f"{TerminalColors.OKCYAN}"
""") f"================== SIMULATING LOGINS =================="
# for invite in DomainInvitation.objects.all(): f"{TerminalColors.ENDC}")
# #DEBUG: for invite in DomainInvitation.objects.all(): #TODO: limit to our stuff
# TerminalHelper.print_conditional(debug_on,f"""{TerminalColors.OKCYAN}Processing invite: {invite}{TerminalColors.ENDC}""") #DEBUG:
# # get a user with this email address TerminalHelper.print_conditional(debug_on,
# user_exists = User.objects.filter(email=invite.email).exists() f"{TerminalColors.OKCYAN}"
# user, _ = User.objects.get_or_create(email=invite.email) f"Processing invite: {invite}"
# #DEBUG: f"{TerminalColors.ENDC}")
# TerminalHelper.print_conditional(user_exists,f"""{TerminalColors.OKCYAN}No user found (creating temporary user object){TerminalColors.ENDC}""") # get a user with this email address
# TerminalHelper.print_conditional(debug_on,f"""{TerminalColors.OKCYAN}Logging in user: {user}{TerminalColors.ENDC}""") user, user_created = User.objects.get_or_create(email=invite.email, username=invite.email)
#DEBUG:
TerminalHelper.print_conditional(user_created,
f"""{TerminalColors.OKCYAN}No user found (creating temporary user object){TerminalColors.ENDC}""")
TerminalHelper.print_conditional(debug_on,
f"""{TerminalColors.OKCYAN}Executing first-time login for user: {user}{TerminalColors.ENDC}""")
user.first_login()
if user_created:
logger.info(f"""{TerminalColors.YELLOW}(Deleting temporary user object){TerminalColors.ENDC}""")
user.delete()
# get a user with this email address
# user_exists = User.objects.filter(email=invite.email).exists()
# if user_exists:
# user = User.objects.get(email=invite.email)
# TerminalHelper.print_conditional(debug_on,
# f"""{TerminalColors.OKCYAN}Logging in user: {user}{TerminalColors.ENDC}""")
# user.first_login() # user.first_login()
# if not user_exists: # else:
# logger.warn(f"""{TerminalColors.YELLOW}(Deleting temporary user object){TerminalColors.ENDC}""") # logger.info(f"""{TerminalColors.YELLOW}No user found -- creating temp user object...{TerminalColors.ENDC}""")
# user.delete() # user = User(email=invite.email)
# user.save()
# user.first_login()
# logger.info(f"""{TerminalColors.YELLOW}(Deleting temporary user object){TerminalColors.ENDC}""")
# user.delete()
# for invite in DomainInvitation.objects.all(): # for invite in DomainInvitation.objects.all():
# #DEBUG: # #DEBUG:
@ -371,58 +432,108 @@ class Command(BaseCommand):
**options, **options,
): ):
""" """
Does a diff between the transition_domain and the following tables: Does the following;
domain, domain_information and the domain_invitation. 1 - run loader scripts
2 - simulate logins
Produces the following report (printed to the terminal): 3 - send domain invitations (Emails should be sent to the appropriate users
#1 - Print any domains that exist in the transition_domain table note that all moved domains should now be accessible
but not in their corresponding domain, domain information or on django admin for an analyst)
domain invitation tables. 4 - analyze the data for transition domains
#2 - Print which table this domain is missing from and generate a report
#3- Check for duplicate entries in domain or
domain_information tables and print which are
duplicates and in which tables
(ONLY RUNS with full script option)
- Emails should be sent to the appropriate users
note that all moved domains should now be accessible
on django admin for an analyst
OPTIONS:
-- (run all other scripts:
1 - imports for trans domains
2 - transfer to domain & domain invitation
3 - send domain invite)
** Triggers table reset **
""" """
# Get --debug argument # SETUP
# Grab all arguments relevant to
# orchestrating which parts of this script
# should execute. Print some indicators to
# the terminal so the user knows what is
# enabled.
debug_on = options.get("debug") debug_on = options.get("debug")
# Get --runLoaders argument
run_loaders_on = options.get("runLoaders") run_loaders_on = options.get("runLoaders")
# Get --triggerLogins argument
simulate_user_login_enabled = options.get("triggerLogins") simulate_user_login_enabled = options.get("triggerLogins")
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.OKCYAN}
----------DEBUG MODE ON----------
Detailed print statements activated.
{TerminalColors.ENDC}
"""
)
TerminalHelper.print_conditional(
run_loaders_on,
f"""{TerminalColors.OKCYAN}
----------RUNNING LOADERS ON----------
All migration scripts will be run before
analyzing the data.
{TerminalColors.ENDC}
"""
)
TerminalHelper.print_conditional(
run_loaders_on,
f"""{TerminalColors.OKCYAN}
----------TRIGGER LOGINS ON----------
Will be simulating user logins
{TerminalColors.ENDC}
"""
)
# If a user decides to run all migration
# scripts, they may or may not wish to
# proceed with analysis of the data depending
# on the results of the migration.
# Provide a breakpoint for them to decide
# whether to continue or not.
# The same will happen if simulating user
# logins (to allow users to run only that
# portion of the script if desired)
prompt_continuation_of_analysis = False prompt_continuation_of_analysis = False
# Run migration scripts if specified by user... # STEP 1 -- RUN LOADERS
# Run migration scripts if specified by user
if run_loaders_on: if run_loaders_on:
self.run_migration_scripts(options) self.run_migration_scripts(options)
prompt_continuation_of_analysis = True prompt_continuation_of_analysis = True
# Simulate user login for each user in domain invitation if sepcified by user # STEP 2 -- SIMULATE LOGINS
# Simulate user login for each user in domain
# invitation if specified by user OR if running
# migration scripts.
# (NOTE: Although users can choose to run login
# simulations separately (for testing purposes),
# if we are running all migration scripts, we should
# automatically execute this as the final step
# to ensure Domain Information objects get added
# to the database.)
if run_loaders_on:
simulate_user_login_enabled = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL}
Proceed with simulating user logins?
{TerminalColors.ENDC}"""
)
if simulate_user_login_enabled: if simulate_user_login_enabled:
self.simulate_user_logins(debug_on) self.simulate_user_logins(debug_on)
prompt_continuation_of_analysis = True prompt_continuation_of_analysis = True
analyze_tables = True
# STEP 3 -- SEND INVITES
proceed_with_sending_invites = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL}
Proceed with sending user invites?
{TerminalColors.ENDC}"""
)
if proceed_with_sending_invites:
self.run_send_invites_script()
prompt_continuation_of_analysis = True
# STEP 4 -- ANALYZE TABLES & GENERATE REPORT
# Analyze tables for corrupt data...
# only prompt if we ran steps 1 and/or 2
if prompt_continuation_of_analysis: if prompt_continuation_of_analysis:
analyze_tables = TerminalHelper.query_yes_no( analyze_tables = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL} f"""{TerminalColors.FAIL}
Proceed with table analysis? Proceed with table analysis?
{TerminalColors.ENDC}""" {TerminalColors.ENDC}"""
) )
if not analyze_tables:
# Analyze tables for corrupt data... return
if analyze_tables: self.compare_tables(debug_on)
self.compare_tables(debug_on)

View file

@ -10,25 +10,14 @@ from registrar.models import TransitionDomain
from registrar.models import Domain from registrar.models import Domain
from registrar.models import DomainInvitation from registrar.models import DomainInvitation
from registrar.management.commands.utility.terminal_helper import (
TerminalColors,
TerminalHelper
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class termColors:
"""Colors for terminal outputs
(makes reading the logs WAY easier)"""
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
YELLOW = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
BackgroundLightYellow = "\033[103m"
class Command(BaseCommand): class Command(BaseCommand):
help = """Load data from transition domain tables help = """Load data from transition domain tables
into main domain tables. Also create domain invitation into main domain tables. Also create domain invitation
@ -49,33 +38,25 @@ class Command(BaseCommand):
): ):
"""Prints additional terminal statements to indicate if --debug """Prints additional terminal statements to indicate if --debug
or --limitParse are in use""" or --limitParse are in use"""
self.print_debug( TerminalHelper.print_conditional(
debug_on, debug_on,
f"""{termColors.OKCYAN} f"""{TerminalColors.OKCYAN}
----------DEBUG MODE ON---------- ----------DEBUG MODE ON----------
Detailed print statements activated. Detailed print statements activated.
{termColors.ENDC} {TerminalColors.ENDC}
""", """,
) )
self.print_debug( TerminalHelper.print_conditional(
debug_max_entries_to_parse > 0, debug_max_entries_to_parse > 0,
f"""{termColors.OKCYAN} f"""{TerminalColors.OKCYAN}
----------LIMITER ON---------- ----------LIMITER ON----------
Parsing of entries will be limited to Parsing of entries will be limited to
{debug_max_entries_to_parse} lines per file.") {debug_max_entries_to_parse} lines per file.")
Detailed print statements activated. Detailed print statements activated.
{termColors.ENDC} {TerminalColors.ENDC}
""", """,
) )
def print_debug(self, print_condition: bool, print_statement: str):
"""This function reduces complexity of debug statements
in other functions.
It uses the logger to write the given print_statement to the
terminal if print_condition is TRUE"""
# DEBUG:
if print_condition:
logger.info(print_statement)
def update_domain_status( def update_domain_status(
self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool
@ -96,13 +77,13 @@ class Command(BaseCommand):
target_domain.save() target_domain.save()
# DEBUG: # DEBUG:
self.print_debug( TerminalHelper.print_conditional(
debug_on, debug_on,
f"""{termColors.YELLOW} f"""{TerminalColors.YELLOW}
>> Updated {target_domain.name} state from >> Updated {target_domain.name} state from
'{existing_status}' to '{target_domain.state}' '{existing_status}' to '{target_domain.state}'
(no domain invitation entry added) (no domain invitation entry added)
{termColors.ENDC}""", {TerminalColors.ENDC}""",
) )
return True return True
return False return False
@ -123,22 +104,22 @@ class Command(BaseCommand):
total_domain_invitation_entries = len(domain_invitations_to_create) total_domain_invitation_entries = len(domain_invitations_to_create)
logger.info( logger.info(
f"""{termColors.OKGREEN} f"""{TerminalColors.OKGREEN}
============= FINISHED =============== ============= FINISHED ===============
Created {total_new_entries} transition domain entries, Created {total_new_entries} transition domain entries,
Updated {total_updated_domain_entries} transition domain entries Updated {total_updated_domain_entries} transition domain entries
Created {total_domain_invitation_entries} domain invitation entries Created {total_domain_invitation_entries} domain invitation entries
(NOTE: no invitations are SENT in this script) (NOTE: no invitations are SENT in this script)
{termColors.ENDC} {TerminalColors.ENDC}
""" """
) )
if len(skipped_domain_entries) > 0: if len(skipped_domain_entries) > 0:
logger.info( logger.info(
f"""{termColors.FAIL} f"""{TerminalColors.FAIL}
============= SKIPPED DOMAINS (ERRORS) =============== ============= SKIPPED DOMAINS (ERRORS) ===============
{skipped_domain_entries} {skipped_domain_entries}
{termColors.ENDC} {TerminalColors.ENDC}
""" """
) )
@ -151,25 +132,25 @@ class Command(BaseCommand):
skipped_domain_invitations.remove(domain_invite.domain) skipped_domain_invitations.remove(domain_invite.domain)
if len(skipped_domain_invitations) > 0: if len(skipped_domain_invitations) > 0:
logger.info( logger.info(
f"""{termColors.FAIL} f"""{TerminalColors.FAIL}
============= SKIPPED DOMAIN INVITATIONS (ERRORS) =============== ============= SKIPPED DOMAIN INVITATIONS (ERRORS) ===============
{skipped_domain_invitations} {skipped_domain_invitations}
{termColors.ENDC} {TerminalColors.ENDC}
""" """
) )
# DEBUG: # DEBUG:
self.print_debug( TerminalHelper.print_conditional(
debug_on, debug_on,
f"""{termColors.YELLOW} f"""{TerminalColors.YELLOW}
======= DEBUG OUTPUT =======
Created Domains: Created Domains:
{domains_to_create} {domains_to_create}
Updated Domains: Updated Domains:
{updated_domain_entries} {updated_domain_entries}
{termColors.ENDC} {TerminalColors.ENDC}
""", """,
) )
@ -184,7 +165,7 @@ class Command(BaseCommand):
if associated_domain is None: if associated_domain is None:
logger.warning( logger.warning(
f""" f"""
{termColors.FAIL} {TerminalColors.FAIL}
!!! ERROR: Domain cannot be null for a !!! ERROR: Domain cannot be null for a
Domain Invitation object! Domain Invitation object!
@ -241,11 +222,11 @@ class Command(BaseCommand):
total_rows_parsed = 0 total_rows_parsed = 0
logger.info( logger.info(
f"""{termColors.OKGREEN} f"""{TerminalColors.OKGREEN}
========================== ==========================
Beginning Data Transfer Beginning Data Transfer
========================== ==========================
{termColors.ENDC}""" {TerminalColors.ENDC}"""
) )
for transition_domain in TransitionDomain.objects.all(): for transition_domain in TransitionDomain.objects.all():
@ -254,11 +235,11 @@ class Command(BaseCommand):
transition_domain_email = transition_domain.username transition_domain_email = transition_domain.username
# DEBUG: # DEBUG:
self.print_debug( TerminalHelper.print_conditional(
debug_on, debug_on,
f"""{termColors.OKCYAN} f"""{TerminalColors.OKCYAN}
Processing Transition Domain: {transition_domain_name}, {transition_domain_status}, {transition_domain_email} Processing Transition Domain: {transition_domain_name}, {transition_domain_status}, {transition_domain_email}
{termColors.ENDC}""", # noqa {TerminalColors.ENDC}""", # noqa
) )
new_domain_invitation = None new_domain_invitation = None
@ -269,11 +250,11 @@ class Command(BaseCommand):
# get the existing domain # get the existing domain
domain_to_update = Domain.objects.get(name=transition_domain_name) domain_to_update = Domain.objects.get(name=transition_domain_name)
# DEBUG: # DEBUG:
self.print_debug( TerminalHelper.print_conditional(
debug_on, debug_on,
f"""{termColors.YELLOW} f"""{TerminalColors.YELLOW}
> Found existing entry in Domain table for: {transition_domain_name}, {domain_to_update.state} > Found existing entry in Domain table for: {transition_domain_name}, {domain_to_update.state}
{termColors.ENDC}""", # noqa {TerminalColors.ENDC}""", # noqa
) )
# for existing entry, update the status to # for existing entry, update the status to
@ -300,7 +281,7 @@ class Command(BaseCommand):
# immediate attention. # immediate attention.
logger.warning( logger.warning(
f""" f"""
{termColors.FAIL} {TerminalColors.FAIL}
!!! ERROR: duplicate entries already exist in the !!! ERROR: duplicate entries already exist in the
Domain table for the following domain: Domain table for the following domain:
{transition_domain_name} {transition_domain_name}
@ -316,7 +297,7 @@ class Command(BaseCommand):
except TransitionNotAllowed as err: except TransitionNotAllowed as err:
skipped_domain_entries.append(transition_domain_name) skipped_domain_entries.append(transition_domain_name)
logger.warning( logger.warning(
f"""{termColors.FAIL} f"""{TerminalColors.FAIL}
Unable to change state for {transition_domain_name} Unable to change state for {transition_domain_name}
RECOMMENDATION: RECOMMENDATION:
@ -343,15 +324,15 @@ class Command(BaseCommand):
None, None,
) )
if existing_domain_in_to_create is not None: if existing_domain_in_to_create is not None:
self.print_debug( TerminalHelper.print_conditional(
debug_on, debug_on,
f"""{termColors.YELLOW} f"""{TerminalColors.YELLOW}
Duplicate Detected: {transition_domain_name}. Duplicate Detected: {transition_domain_name}.
Cannot add duplicate entry for another username. Cannot add duplicate entry for another username.
Violates Unique Key constraint. Violates Unique Key constraint.
Checking for unique user e-mail for Domain Invitations... Checking for unique user e-mail for Domain Invitations...
{termColors.ENDC}""", {TerminalColors.ENDC}""",
) )
new_domain_invitation = self.try_add_domain_invitation( new_domain_invitation = self.try_add_domain_invitation(
transition_domain_email, existing_domain_in_to_create transition_domain_email, existing_domain_in_to_create
@ -363,9 +344,9 @@ class Command(BaseCommand):
) )
domains_to_create.append(new_domain) domains_to_create.append(new_domain)
# DEBUG: # DEBUG:
self.print_debug( TerminalHelper.print_conditional(
debug_on, debug_on,
f"{termColors.OKCYAN} Adding domain: {new_domain} {termColors.ENDC}", # noqa f"{TerminalColors.OKCYAN} Adding domain: {new_domain} {TerminalColors.ENDC}", # noqa
) )
new_domain_invitation = self.try_add_domain_invitation( new_domain_invitation = self.try_add_domain_invitation(
transition_domain_email, new_domain transition_domain_email, new_domain
@ -373,14 +354,14 @@ class Command(BaseCommand):
if new_domain_invitation is None: if new_domain_invitation is None:
logger.info( logger.info(
f"{termColors.YELLOW} ! No new e-mail detected !" # noqa f"{TerminalColors.YELLOW} ! No new e-mail detected !" # noqa
f"(SKIPPED ADDING DOMAIN INVITATION){termColors.ENDC}" f"(SKIPPED ADDING DOMAIN INVITATION){TerminalColors.ENDC}"
) )
else: else:
# DEBUG: # DEBUG:
self.print_debug( TerminalHelper.print_conditional(
debug_on, debug_on,
f"{termColors.OKCYAN} Adding domain invitation: {new_domain_invitation} {termColors.ENDC}", # noqa f"{TerminalColors.OKCYAN} Adding domain invitation: {new_domain_invitation} {TerminalColors.ENDC}", # noqa
) )
domain_invitations_to_create.append(new_domain_invitation) domain_invitations_to_create.append(new_domain_invitation)
@ -390,9 +371,9 @@ class Command(BaseCommand):
and total_rows_parsed >= debug_max_entries_to_parse and total_rows_parsed >= debug_max_entries_to_parse
): ):
logger.info( logger.info(
f"""{termColors.YELLOW} f"""{TerminalColors.YELLOW}
----PARSE LIMIT REACHED. HALTING PARSER.---- ----PARSE LIMIT REACHED. HALTING PARSER.----
{termColors.ENDC} {TerminalColors.ENDC}
""" """
) )
break break

View file

@ -11,6 +11,7 @@ class TerminalColors:
OKCYAN = "\033[96m" OKCYAN = "\033[96m"
OKGREEN = "\033[92m" OKGREEN = "\033[92m"
YELLOW = "\033[93m" YELLOW = "\033[93m"
MAGENTA = "\033[35m"
FAIL = "\033[91m" FAIL = "\033[91m"
ENDC = "\033[0m" ENDC = "\033[0m"
BOLD = "\033[1m" BOLD = "\033[1m"