Some linting and test cases

This commit is contained in:
zandercymatics 2023-11-09 07:44:29 -07:00
parent 72d95f6fad
commit 3e3cfde00a
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
5 changed files with 23 additions and 372 deletions

View file

@ -1,273 +0,0 @@
import argparse
import csv
import logging
from django.core.management import BaseCommand
from registrar.management.commands.utility.terminal_helper import (
TerminalColors,
TerminalHelper,
)
from registrar.models.domain_application import DomainApplication
from registrar.models.transition_domain import TransitionDomain
logger = logging.getLogger(__name__)
# DEV SHORTCUT:
# Example command for running this script:
# docker compose run -T app ./manage.py agency_data_extractor 20231009.agency.adhoc.dotgov.txt --dir /app/tmp --debug
class Command(BaseCommand):
help = """Loads data for domains that are in transition
(populates transition_domain model objects)."""
def add_arguments(self, parser):
"""Add file that contains agency data"""
parser.add_argument(
"agency_data_filename", help="Data file with agency information"
)
parser.add_argument("--dir", default="migrationdata", help="Desired directory")
parser.add_argument("--sep", default="|", help="Delimiter character")
parser.add_argument(
"--debug",
help="Prints additional debug statements to the terminal",
action=argparse.BooleanOptionalAction,
)
parser.add_argument("--prompt", action=argparse.BooleanOptionalAction)
@staticmethod
def extract_agencies(agency_data_filepath: str, sep: str, debug: bool) -> [str]:
"""Extracts all the agency names from the provided
agency file (skips any duplicates) and returns those
names in an array"""
agency_names = []
logger.info(
f"{TerminalColors.OKCYAN}Reading agency data file {agency_data_filepath}{TerminalColors.ENDC}"
)
with open(agency_data_filepath, "r") as agency_data_filepath: # noqa
for row in csv.reader(agency_data_filepath, delimiter=sep):
agency_name = row[1]
TerminalHelper.print_conditional(debug, f"Checking: {agency_name}")
if agency_name not in agency_names:
agency_names.append(agency_name)
logger.info(
f"{TerminalColors.OKCYAN}Checked {len(agency_names)} agencies{TerminalColors.ENDC}"
)
return agency_names
@staticmethod
def compare_agency_lists(
provided_agencies: [str], existing_agencies: [str], debug: bool
):
"""
Compares new_agencies with existing_agencies and
provides the equivalent of an outer-join on the two
(printed to the terminal)
"""
new_agencies = []
# 1 - Get all new agencies that we don't already have (We might want to ADD these to our list)
for agency in provided_agencies:
if agency not in existing_agencies and agency not in new_agencies:
new_agencies.append(agency)
TerminalHelper.print_conditional(
debug,
f"{TerminalColors.YELLOW}Found new agency: {agency}{TerminalColors.ENDC}",
)
possibly_unused_agencies = []
# 2 - Get all new agencies that we don't already have (We might want to ADD these to our list)
for agency in existing_agencies:
if (
agency not in provided_agencies
and agency not in possibly_unused_agencies
):
possibly_unused_agencies.append(agency)
TerminalHelper.print_conditional(
debug,
f"{TerminalColors.YELLOW}Possibly unused agency detected: {agency}{TerminalColors.ENDC}",
)
matched_agencies = []
for agency in provided_agencies:
if agency in existing_agencies:
matched_agencies.append(agency)
TerminalHelper.print_conditional(
debug,
f"{TerminalColors.YELLOW}Matched agencies: {agency}{TerminalColors.ENDC}",
)
# Print the summary of findings
# 1 - Print the list of agencies in the NEW list, which we do not already have
# 2 - Print the list of agencies that we currently have, which are NOT in the new list (these might be eligible for removal?) TODO: would we ever want to remove existing agencies?
new_agencies_as_string = "{}".format(",\n ".join(map(str, new_agencies)))
possibly_unused_agencies_as_string = "{}".format(
",\n ".join(map(str, possibly_unused_agencies))
)
matched_agencies_as_string = "{}".format(
",\n ".join(map(str, matched_agencies))
)
logger.info(
f"""
{TerminalColors.OKGREEN}
======================== SUMMARY OF FINDINGS ============================
{len(provided_agencies)} AGENCIES WERE PROVIDED in the agency file.
{len(existing_agencies)} AGENCIES FOUND IN THE TARGETED SYSTEM.
{len(provided_agencies)-len(new_agencies)} AGENCIES MATCHED
(These are agencies that are in the given agency file AND in our system already)
{TerminalColors.YELLOW}{matched_agencies_as_string}
{TerminalColors.OKGREEN}
{len(new_agencies)} AGENCIES TO ADD:
These agencies were in the provided agency file, but are not in our system.
{TerminalColors.YELLOW}{new_agencies_as_string}
{TerminalColors.OKGREEN}
{len(possibly_unused_agencies)} AGENCIES TO (POSSIBLY) REMOVE:
These agencies are in our system, but not in the provided agency file:
{TerminalColors.YELLOW}{possibly_unused_agencies_as_string}
{TerminalColors.ENDC}
"""
)
@staticmethod
def print_agency_list(agencies, filename):
full_agency_list_as_string = "{}".format(",\n".join(map(str, agencies)))
logger.info(
f"\n{TerminalColors.YELLOW}"
f"\n{full_agency_list_as_string}"
f"{TerminalColors.OKGREEN}"
)
logger.info(f"{TerminalColors.MAGENTA}Writing to file...{TerminalColors.ENDC}")
with open(f"tmp/[{filename}].txt", "w+") as f:
f.write(full_agency_list_as_string)
def handle(
self,
agency_data_filename,
**options,
):
"""Parse the agency data file."""
# Get all the arguments
sep = options.get("sep")
debug = options.get("debug")
prompt = options.get("prompt")
dir = options.get("dir")
agency_data_file = dir + "/" + agency_data_filename
new_agencies = self.extract_agencies(agency_data_file, sep, debug)
hard_coded_agencies = DomainApplication.AGENCIES
transition_domain_agencies = (
TransitionDomain.objects.all()
.values_list("federal_agency", flat=True)
.distinct()
)
print(transition_domain_agencies)
merged_agencies = new_agencies
for agency in hard_coded_agencies:
if agency not in merged_agencies:
merged_agencies.append(agency)
merged_transition_agencies = new_agencies
for agency in transition_domain_agencies:
if agency not in merged_transition_agencies:
merged_transition_agencies.append(agency)
prompt_successful = False
# OPTION to compare the agency file to our hard-coded list
if prompt:
prompt_successful = TerminalHelper.query_yes_no(
f"\n\n{TerminalColors.FAIL}Check {agency_data_filename} against our (hard-coded) dropdown list of agencies?{TerminalColors.ENDC}"
)
if prompt_successful or not prompt:
self.compare_agency_lists(new_agencies, hard_coded_agencies, debug)
# OPTION to compare the agency file to Transition Domains
if prompt:
prompt_successful = TerminalHelper.query_yes_no(
f"\n\n{TerminalColors.FAIL}Check {agency_data_filename} against Transition Domain contents?{TerminalColors.ENDC}"
)
if prompt_successful or not prompt:
self.compare_agency_lists(new_agencies, transition_domain_agencies, debug)
# OPTION to print out the full list of agencies from the agency file
if prompt:
prompt_successful = TerminalHelper.query_yes_no(
f"\n\n{TerminalColors.FAIL}Would you like to print the full list of agencies from the given agency file?{TerminalColors.ENDC}"
)
if prompt_successful or not prompt:
logger.info(
f"\n{TerminalColors.OKGREEN}"
f"\n======================== FULL LIST OF IMPORTED AGENCIES ============================"
f"\nThese are all the agencies provided by the given agency file."
f"\n\n{len(new_agencies)} TOTAL\n\n"
)
self.print_agency_list(new_agencies, "Imported_Agencies")
# OPTION to print out the full list of agencies from the agency file
if prompt:
prompt_successful = TerminalHelper.query_yes_no(
f"{TerminalColors.FAIL}Would you like to print the full list of agencies from the dropdown?{TerminalColors.ENDC}"
)
if prompt_successful or not prompt:
logger.info(
f"\n{TerminalColors.OKGREEN}"
f"\n======================== FULL LIST OF AGENCIES IN DROPDOWN ============================"
f"\nThese are all the agencies hard-coded in our system for the dropdown list."
f"\n\n{len(hard_coded_agencies)} TOTAL\n\n"
)
self.print_agency_list(hard_coded_agencies, "Dropdown_Agencies")
# OPTION to print out the full list of agencies from the agency file
if prompt:
prompt_successful = TerminalHelper.query_yes_no(
f"{TerminalColors.FAIL}Would you like to print the full list of agencies from the dropdown?{TerminalColors.ENDC}"
)
if prompt_successful or not prompt:
logger.info(
f"\n{TerminalColors.OKGREEN}"
f"\n======================== FULL LIST OF AGENCIES IN TRANSITION DOMAIN ============================"
f"\nThese are all the agencies in the Transition Domains table."
f"\n\n{len(transition_domain_agencies)} TOTAL\n\n"
)
self.print_agency_list(
transition_domain_agencies, "Transition_Domain_Agencies"
)
# OPTION to print out the full list of agencies from the agency file
if prompt:
prompt_successful = TerminalHelper.query_yes_no(
f"{TerminalColors.FAIL}Would you like to print the MERGED list of agencies (dropdown + agency file)?{TerminalColors.ENDC}"
)
if prompt_successful or not prompt:
logger.info(
f"\n{TerminalColors.OKGREEN}"
f"\n======================== MERGED LISTS (dropdown + agency file) ============================"
f"\nThese are all the agencies our dropdown plus all the agencies in the agency file."
f"\n\n{len(merged_agencies)} TOTAL\n\n"
)
self.print_agency_list(merged_agencies, "Merged_Dropdown_Agency_List")
# OPTION to print out the full list of agencies from the agency file
if prompt:
prompt_successful = TerminalHelper.query_yes_no(
f"{TerminalColors.FAIL}Would you like to print the MERGED list of agencies (dropdown + agency file)?{TerminalColors.ENDC}"
)
if prompt_successful or not prompt:
logger.info(
f"\n{TerminalColors.OKGREEN}"
f"\n======================== MERGED LISTS (transition domain + agency file) ============================"
f"\nThese are all the agencies our transition domains table plus all the agencies in the agency file."
f"\n\n{len(merged_agencies)} TOTAL\n\n"
)
self.print_agency_list(
merged_transition_agencies, "Merged_Transition_Domain_Agency_List"
)

View file

@ -66,7 +66,8 @@ class Command(BaseCommand):
parser.add_argument(
"--infer_filenames",
action=argparse.BooleanOptionalAction,
help="Determines if we should infer filenames or not. Recommended to be enabled only in a development or testing setting.",
help="Determines if we should infer filenames or not."
"Recommended to be enabled only in a development or testing setting.",
)
parser.add_argument(
@ -331,7 +332,7 @@ class Command(BaseCommand):
directory += "/"
json_filepath = directory + migration_json_filename
### Process JSON file ###
# Process JSON file #
# If a JSON was provided, use its values instead of defaults.
# TODO: there is no way to discern user overrides from those args defaults.
with open(json_filepath, "r") as jsonFile:
@ -339,7 +340,6 @@ class Command(BaseCommand):
try:
data = json.load(jsonFile)
# Create an instance of TransitionDomainArguments
has_desired_args = False
# Iterate over the data from the JSON file
for key, value in data.items():
# Check if the key exists in TransitionDomainArguments
@ -348,10 +348,10 @@ class Command(BaseCommand):
options[key] = value
except Exception as err:
logger.error(
f"""{TerminalColors.FAIL}There was an error loading the JSON responsible
for providing filepaths.
{TerminalColors.ENDC}
"""
f"{TerminalColors.FAIL}"
"There was an error loading "
"the JSON responsible for providing filepaths."
f"{TerminalColors.ENDC}"
)
raise err
@ -370,7 +370,7 @@ class Command(BaseCommand):
args.limitParse
) # set to 0 to parse all entries
## Variables for Additional TransitionDomain Information ##
# Variables for Additional TransitionDomain Information #
# Main script filenames - these do not have defaults
domain_contacts_filename = None
@ -378,7 +378,7 @@ class Command(BaseCommand):
domain_contacts_filename = directory + options.get(
"domain_contacts_filename"
)
except TypeError as err:
except TypeError:
logger.error(
f"Invalid filename of '{args.domain_contacts_filename}'"
" was provided for domain_contacts_filename"
@ -387,7 +387,7 @@ class Command(BaseCommand):
contacts_filename = None
try:
contacts_filename = directory + options.get("contacts_filename")
except TypeError as err:
except TypeError:
logger.error(
f"Invalid filename of '{args.contacts_filename}'"
" was provided for contacts_filename"
@ -398,7 +398,7 @@ class Command(BaseCommand):
domain_statuses_filename = directory + options.get(
"domain_statuses_filename"
)
except TypeError as err:
except TypeError:
logger.error(
f"Invalid filename of '{args.domain_statuses_filename}'"
" was provided for domain_statuses_filename"
@ -458,7 +458,6 @@ class Command(BaseCommand):
# Start parsing the main file and create TransitionDomain objects
logger.info("Reading domain-contacts data file %s", domain_contacts_filename)
total_lines = TerminalHelper.get_file_line_count(domain_contacts_filename)
with open(domain_contacts_filename, "r") as domain_contacts_file:
for row in csv.reader(domain_contacts_file, delimiter=sep):
# TerminalHelper.printProgressBar(total_rows_parsed, total_lines)

View file

@ -7,7 +7,6 @@
import logging
import argparse
import sys
from django.core.management import BaseCommand
from django.core.management import call_command
@ -89,7 +88,8 @@ class Command(BaseCommand):
# The following file arguments have default values for running in the sandbox
# TODO: make this a mandatory argument (if/when we strip out defaults, it will be mandatory)
# TODO: make this a mandatory argument
# (if/when we strip out defaults, it will be mandatory)
# TODO: use the migration directory arg or force user to type FULL filepath?
parser.add_argument(
"--migrationJSON",

View file

@ -114,7 +114,7 @@ class Command(BaseCommand):
Created {total_domain_invitation_entries} domain invitation entries
(NOTE: no invitations are SENT in this script)
{TerminalColors.ENDC}
"""
""" # noqa
)
if len(skipped_domain_entries) > 0:
logger.info(
@ -192,15 +192,19 @@ class Command(BaseCommand):
# ---- UPDATE THE DOMAIN
# update the status
update_made = self.update_domain_status(
self.update_domain_status(
transition_domain, target_domain, debug_on
)
# TODO: not all domains need to be updated (the information is the same). Need to bubble this up to the final report.
# TODO: not all domains need to be updated (the information is the same).
# Need to bubble this up to the final report.
# update dates (creation and expiration)
if transition_domain_creation_date is not None:
# TODO: added this because I ran into a situation where the created_at date was null (violated a key constraint). How do we want to handle this case?
# TODO: added this because I ran into a situation where
# the created_at date was null (violated a key constraint).
# How do we want to handle this case?
target_domain.created_at = transition_domain_creation_date
if transition_domain_expiration_date is not None:
target_domain.expiration_date = transition_domain_expiration_date
target_domain.save()
@ -486,7 +490,7 @@ class Command(BaseCommand):
# for existing entry, update the status to
# the transition domain status
update_made = self.update_domain_information(
self.update_domain_information(
target_domain_information, template_domain_information, debug_on
)
# TODO: not all domains need to be updated (the information is the same). Need to bubble this up to the final report.

View file

@ -212,80 +212,6 @@ class TestMigrations(TestCase):
expected_missing_domain_invitations,
)
def test_load_full_transition_domain(self):
# Load command
self.run_load_domains()
# We should get a consistent number
# of records
expected_total_transition_domains = 9
expected_total_domains = 0
expected_total_domain_informations = 0
expected_total_domain_invitations = 0
expected_missing_domains = 9
expected_duplicate_domains = 0
expected_missing_domain_informations = 9
expected_missing_domain_invitations = 9
self.compare_tables(
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_total_domain_invitations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
expected_missing_domain_invitations,
)
expected_transition_domains = [
TransitionDomain(
username="alexandra.bobbitt5@test.com",
domain_name="fakewebsite2.gov",
status="on hold",
email_sent=False,
organization_type="Federal",
organization_name="Fanoodle",
federal_type="Executive",
federal_agency="InnoZ",
epp_creation_date=None,
epp_expiration_date=None,
),
TransitionDomain(
username="reginald.ratcliff4@test.com",
domain_name="fakewebsite3.gov",
status="ready",
email_sent=False,
organization_type="City",
organization_name="Sushi",
federal_type=None,
federal_agency=None,
epp_creation_date=None,
epp_expiration_date=None,
),
]
expected_transition_domains = TransitionDomain.objects.filter(
username="alexandra.bobbitt5@test.com"
)
self.assertEqual(expected_transition_domains.count(), 1)
expected_transition_domain = expected_transition_domains.get()
# TransitionDomain.objects.filter(domain_name = "fakewebsite3.gov")
# Afterwards, their values should be what we expect
all_transition_domains = TransitionDomain.objects.all()
for domain in all_transition_domains:
for expected in expected_transition_domains:
# This data gets created when the object is,
# so we should just match it. Not relevant
# to the added data.
expected.id = domain.id
expected.created_at = domain.created_at
expected.updated_at = domain.updated_at
# Each TransitionDomain should have the correct data
self.assertEqual(domain, expected)
def test_load_full_domain(self):
self.run_load_domains()
self.run_transfer_domains()
@ -317,11 +243,7 @@ class TestMigrations(TestCase):
anomaly = anomaly_domains.get()
self.assertEqual(anomaly.expiration_date, datetime.date(2023, 3, 9))
"""
self.assertEqual(
anomaly.created_at, datetime.datetime(2023, 11, 8, 17, 23, 46, 764663, tzinfo=datetime.timezone.utc)
)
"""
self.assertEqual(anomaly.name, "anomaly.gov")
self.assertEqual(anomaly.state, "ready")
@ -331,7 +253,6 @@ class TestMigrations(TestCase):
testdomain = testdomain_domains.get()
self.assertEqual(testdomain.expiration_date, datetime.date(2023, 9, 30))
# self.assertEqual(testdomain.created_at, "test")
self.assertEqual(testdomain.name, "fakewebsite2.gov")
self.assertEqual(testdomain.state, "on hold")