finished options for running loader scripts. Added boilerplate for simulating logins

This commit is contained in:
CocoByte 2023-10-24 00:23:45 -06:00
parent d5c0ac7a0c
commit cb3cfe3a6d
No known key found for this signature in database
GPG key ID: BBFAA2526384C97F
3 changed files with 162 additions and 55 deletions

View file

@ -297,20 +297,29 @@ class Command(BaseCommand):
)
TransitionDomain.objects.all().delete()
def parse_files(self, # noqa: C901
def handle( # noqa: C901
self,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
reset_table,
sep,
debug_on,
debug_max_entries_to_parse):
**options,
):
"""Parse the data files and create TransitionDomains."""
sep = options.get("sep")
# If --resetTable was used, prompt user to confirm
# deletion of table data
if reset_table:
if options.get("resetTable"):
self.prompt_table_reset()
# Get --debug argument
debug_on = options.get("debug")
# Get --LimitParse argument
debug_max_entries_to_parse = int(
options.get("limitParse")
) # set to 0 to parse all entries
# print message to terminal about which args are in use
self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse)
@ -513,32 +522,3 @@ class Command(BaseCommand):
duplicate_domain_user_combos, duplicate_domains, users_without_email
)
self.print_summary_status_findings(domains_without_status, outlier_statuses)
def handle(
self,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
**options,
):
"""Parse the data files and create TransitionDomains."""
# Get --sep argument
sep = options.get("sep")
# Get --resetTable argument
reset_table = options.get("resetTable")
# Get --debug argument
debug_on = options.get("debug")
# Get --limitParse argument
debug_max_entries_to_parse = int(
options.get("limitParse")
) # set to 0 to parse all entries
self.parse_files(domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
debug_max_entries_to_parse)

View file

@ -1,10 +1,13 @@
import logging
import argparse
import sys
import os
from django.test import Client
from django_fsm import TransitionNotAllowed # type: ignore
from django.core.management import BaseCommand
from django.contrib.auth import get_user_model
from registrar.models import TransitionDomain
from registrar.models import Domain
@ -32,7 +35,11 @@ class Command(BaseCommand):
help="Runs all scripts (in sequence) for transition domain migrations",
action=argparse.BooleanOptionalAction)
# The file arguments have default values for running in the sandbox
parser.add_argument("--triggerLogins",
help="Simulates a user login for each user in domain invitation",
action=argparse.BooleanOptionalAction)
# The following file arguments have default values for running in the sandbox
parser.add_argument(
"--loaderDirectory",
default="migrationData",
@ -177,9 +184,79 @@ class Command(BaseCommand):
"""
)
def run_load_transition_domain_script(self,
file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
debug_max_entries_to_parse):
load_transition_domain_command_string = "./manage.py load_transition_domain "
load_transition_domain_command_string += file_location+domain_contacts_filename + " "
load_transition_domain_command_string += file_location+contacts_filename + " "
load_transition_domain_command_string += file_location+domain_statuses_filename + " "
if sep is not None and sep != "|":
load_transition_domain_command_string += f"--sep {sep} "
if reset_table:
load_transition_domain_command_string += "--resetTable "
if debug_on:
load_transition_domain_command_string += "--debug "
if debug_max_entries_to_parse > 0:
load_transition_domain_command_string += f"--limitParse {debug_max_entries_to_parse} "
proceed_load_transition_domain = TerminalHelper.query_yes_no(
f"""{TerminalColors.OKCYAN}
=====================================
Running load_transition_domain script
=====================================
{load_transition_domain_command_string}
{TerminalColors.FAIL}
Proceed?
{TerminalColors.ENDC}"""
)
if not proceed_load_transition_domain:
return
logger.info(f"""{TerminalColors.OKCYAN}
==== EXECUTING... ====
{TerminalColors.ENDC}""")
os.system(f"{load_transition_domain_command_string}")
def run_transfer_script(self, debug_on):
command_string = "./manage.py transfer_transition_domains_to_domains "
if debug_on:
command_string += "--debug "
proceed_load_transition_domain = TerminalHelper.query_yes_no(
f"""{TerminalColors.OKCYAN}
=====================================================
Running transfer_transition_domains_to_domains script
=====================================================
{command_string}
{TerminalColors.FAIL}
Proceed?
{TerminalColors.ENDC}"""
)
if not proceed_load_transition_domain:
return
logger.info(f"""{TerminalColors.OKCYAN}
==== EXECUTING... ====
{TerminalColors.ENDC}""")
os.system(f"{command_string}")
def run_migration_scripts(self,
options):
file_location = options.get("loaderDirectory")+"/"
filenames = options.get("loaderFilenames").split()
if len(filenames) < 3:
@ -201,7 +278,7 @@ class Command(BaseCommand):
files_are_correct = TerminalHelper.query_yes_no(
f"""
{TerminalColors.YELLOW}
{TerminalColors.OKCYAN}
*** IMPORTANT: VERIFY THE FOLLOWING ***
The migration scripts are looking in directory....
@ -210,8 +287,9 @@ class Command(BaseCommand):
....for the following files:
- domain contacts: {domain_contacts_filename}
- contacts: {contacts_filename}
- domain statuses: {domain_statuses_filename}
- domain statuses: {domain_statuses_filename}y
{TerminalColors.FAIL}
Does this look correct?{TerminalColors.ENDC}"""
)
@ -240,14 +318,38 @@ class Command(BaseCommand):
debug_max_entries_to_parse = int(
options.get("limitParse")
) # set to 0 to parse all entries
load_transition_domain_command.parse_files(load_transition_domain_command,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
debug_max_entries_to_parse)
self.run_load_transition_domain_script(file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
debug_max_entries_to_parse)
self.run_transfer_script(debug_on)
def simulate_user_logins(self, debug_on):
logger.info(f"""{TerminalColors.OKCYAN}
==================
SIMULATING LOGINS
==================
{TerminalColors.ENDC}
""")
for invite in DomainInvitation.objects.all():
#DEBUG:
TerminalHelper.print_debug(debug_on,f"""{TerminalColors.OKCYAN}Processing invite: {invite}{TerminalColors.ENDC}""")
# get a user with this email address
User = get_user_model()
try:
user = User.objects.get(email=invite.email)
#DEBUG:
TerminalHelper.print_debug(debug_on,f"""{TerminalColors.OKCYAN}Logging in user: {user}{TerminalColors.ENDC}""")
Client.force_login(user)
except User.DoesNotExist:
#TODO: how should we handle this?
logger.warn(f"""{TerminalColors.FAIL}No user found {invite.email}{TerminalColors.ENDC}""")
def handle(
self,
@ -283,13 +385,29 @@ class Command(BaseCommand):
debug_on = options.get("debug")
# Get --runLoaders argument
run_loaders_on = options.get("runLoaders")
# Get --triggerLogins argument
simulate_user_login_enabled = options.get("triggerLogins")
# Analyze tables for corrupt data...
self.compare_tables(debug_on)
prompt_continuation_of_analysis = False
# Run migration scripts if specified by user...
if run_loaders_on:
# domain_contacts_filename = options.get("domain_contacts_filename")
# contacts_filename = options.get("contacts_filename")
# domain_statuses_filename = options.get("domain_statuses_filename")
self.run_migration_scripts(options)
prompt_continuation_of_analysis = True
# Simulate user login for each user in domain invitation if sepcified by user
if simulate_user_login_enabled:
self.simulate_user_logins(debug_on)
prompt_continuation_of_analysis = True
analyze_tables = True
if prompt_continuation_of_analysis:
analyze_tables = TerminalHelper.query_yes_no(
f"""{TerminalColors.FAIL}
Proceed with table analysis?
{TerminalColors.ENDC}"""
)
# Analyze tables for corrupt data...
if analyze_tables:
self.compare_tables(debug_on)

View file

@ -48,3 +48,12 @@ class TerminalHelper:
return valid[choice]
else:
logger.info("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
def print_debug(print_condition: bool, print_statement: str):
"""This function reduces complexity of debug statements
in other functions.
It uses the logger to write the given print_statement to the
terminal if print_condition is TRUE"""
# DEBUG:
if print_condition:
logger.info(print_statement)