Add logic to read current-full.csv

This commit is contained in:
zandercymatics 2024-01-02 14:26:35 -07:00
parent 20f34d56be
commit 58bb2f0f87
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
2 changed files with 118 additions and 73 deletions

View file

@ -43,81 +43,29 @@ class Command(BaseCommand):
raise argparse.ArgumentTypeError(f"Invalid file path '{current_full_filepath}'")
# === Update the "federal_agency" field === #
self.patch_agency_info(debug)
was_success = self.patch_agency_info(debug)
# === Try to process anything that was skipped === #
if len(self.di_skipped) > 0:
# We should only correct skipped records if the previous step was successful.
# If something goes wrong, then we risk corrupting data, so skip this step.
if len(self.di_skipped) > 0 and was_success:
# Flush out the list of DomainInformations to update
self.di_to_update.clear()
self.process_skipped_records(current_full_filepath, seperator, debug)
# Clear the old skipped list, and log the run summary
self.di_skipped.clear()
self.log_script_run_summary(debug)
def process_skipped_records(self, file_path, seperator, debug):
"""If we encounter any DomainInformation records that do not have data in the associated
TransitionDomain record, then check the associated current-full.csv file for this
information."""
self.di_to_update.clear()
self.di_failed_to_update.clear()
# Code execution will stop here if the user prompts "N"
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
==File location==
current-full.csv filepath: {file_path}
==Proposed Changes==
Number of DomainInformation objects to change: {len(self.di_skipped)}
The following DomainInformation objects will be modified: {self.di_skipped}
""",
prompt_title="Do you wish to patch skipped records?",
)
logger.info("Updating...")
file_data = self.read_current_full(file_path, seperator)
for di in self.di_skipped:
domain_name = di.domain.name
row = file_data.get(domain_name)
fed_agency = None
if row is not None and "Agency" in row:
fed_agency = row.get("Agency")
# Determine if we should update this record or not.
# If we don't get any data back, something went wrong.
if fed_agency is not None:
di.federal_agency = fed_agency
self.di_to_update.append(di)
if debug:
logger.info(
f"{TerminalColors.OKCYAN}"
f"Updating {di}"
f"{TerminalColors.ENDC}"
)
else:
self.di_failed_to_update.append(di)
logger.error(
f"{TerminalColors.FAIL}"
f"Could not update {di}. No information found."
f"{TerminalColors.ENDC}"
)
# Bulk update the federal agency field in DomainInformation objects
DomainInformation.objects.bulk_update(self.di_to_update, ["federal_agency"])
def read_current_full(self, file_path, seperator):
"""Reads the current-full.csv file and stores it in a dictionary"""
with open(file_path, "r") as requested_file:
reader = csv.DictReader(requested_file, delimiter=seperator)
# Return a dictionary with the domain name as the key,
# and the row information as the value
dict_data = {}
for row in reader:
domain_name = row.get("Domain Name")
if domain_name is not None:
domain_name = domain_name.lower()
row[domain_name] = row
return dict_data
else:
# This code should never execute. This can only occur if bulk_update somehow fails,
# which may indicate some sort of data corruption.
logger.error(
f"{TerminalColors.FAIL}"
"Could not automatically patch skipped records. "
"An error was encountered when running this script, please inspect the following "
f"records for accuracy and completeness: {self.di_failed_to_update}"
f"{TerminalColors.ENDC}"
)
def patch_agency_info(self, debug):
"""
@ -126,6 +74,9 @@ class Command(BaseCommand):
federal_agency field is None. Logs the update, skip, and failure actions if debug mode is on.
After all updates, logs a summary of the results.
"""
# Grab all DomainInformation objects (and their associated TransitionDomains)
# that need to be updated
empty_agency_query = Q(federal_agency=None) | Q(federal_agency="")
domain_info_to_fix = DomainInformation.objects.filter(empty_agency_query)
@ -184,6 +135,78 @@ class Command(BaseCommand):
# === Log results and return data === #
self.log_script_run_summary(debug)
# Tracks if this script was successful. If any errors are found, something went very wrong.
was_success = len(self.di_failed_to_update) != 0
return was_success
def process_skipped_records(self, file_path, seperator, debug):
"""If we encounter any DomainInformation records that do not have data in the associated
TransitionDomain record, then check the associated current-full.csv file for this
information."""
self.di_to_update.clear()
self.di_failed_to_update.clear()
# Code execution will stop here if the user prompts "N"
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
==File location==
current-full.csv filepath: {file_path}
==Proposed Changes==
Number of DomainInformation objects to change: {len(self.di_skipped)}
The following DomainInformation objects will be modified: {self.di_skipped}
""",
prompt_title="Do you wish to patch skipped records?",
)
logger.info("Updating...")
file_data = self.read_current_full(file_path, seperator)
for di in self.di_skipped:
domain_name = di.domain.name
row = file_data.get(domain_name)
fed_agency = None
if row is not None and "agency" in row:
fed_agency = row.get("agency")
# Determine if we should update this record or not.
# If we don't get any data back, something went wrong.
if fed_agency is not None:
di.federal_agency = fed_agency
self.di_to_update.append(di)
if debug:
logger.info(f"{TerminalColors.OKCYAN}" f"Updating {di}" f"{TerminalColors.ENDC}")
else:
self.di_failed_to_update.append(di)
logger.error(
f"{TerminalColors.FAIL}" f"Could not update {di}. No information found." f"{TerminalColors.ENDC}"
)
# Bulk update the federal agency field in DomainInformation objects
DomainInformation.objects.bulk_update(self.di_to_update, ["federal_agency"])
def read_current_full(self, file_path, seperator):
"""Reads the current-full.csv file and stores it in a dictionary"""
with open(file_path, "r") as requested_file:
old_reader = csv.DictReader(requested_file, delimiter=seperator)
# Some variants of current-full.csv have key casing differences for fields
# such as "Domain name" or "Domain Name". This corrects that.
reader = self.lowercase_fieldnames(old_reader)
# Return a dictionary with the domain name as the key,
# and the row information as the value
dict_data = {}
for row in reader:
domain_name = row.get("domain name")
if domain_name is not None:
domain_name = domain_name.lower()
dict_data[domain_name] = row
return dict_data
def lowercase_fieldnames(self, reader):
"""Lowercases all field keys in a dictreader to account for potential casing differences"""
for row in reader:
yield {k.lower(): v for k, v in row.items()}
def log_script_run_summary(self, debug):
"""Prints success, failed, and skipped counts, as well as
all affected objects."""

View file

@ -39,11 +39,7 @@ class TestPatchAgencyInfo(TestCase):
@patch("registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", return_value=True)
def call_patch_federal_agency_info(self, mock_prompt):
"""Calls the patch_federal_agency_info command and mimics a keypress"""
call_command(
"patch_federal_agency_info",
"registrar/tests/data/fake_current_full.csv",
debug=True
)
call_command("patch_federal_agency_info", "registrar/tests/data/fake_current_full.csv", debug=True)
def test_patch_agency_info(self):
"""
@ -83,6 +79,32 @@ class TestPatchAgencyInfo(TestCase):
# Check that the federal_agency field was not updated
self.assertIsNone(self.domain_info.federal_agency)
def test_patch_agency_info_skip_updates_data(self):
"""
Tests that the `patch_federal_agency_info` command logs a warning but
updates the DomainInformation object, because an record exists in the
provided current-full.csv file.
"""
# Set federal_agency to None to simulate a skip
self.transition_domain.federal_agency = None
self.transition_domain.save()
# Change the domain name to something parsable in the .csv
self.domain.name = "cdomain1.gov"
self.domain.save()
with self.assertLogs("registrar.management.commands.patch_federal_agency_info", level="WARNING") as context:
self.call_patch_federal_agency_info()
# Check that the correct log message was output
self.assertIn("SOME AGENCY DATA WAS NONE", context.output[0])
# Reload the domain_info object from the database
self.domain_info.refresh_from_db()
# Check that the federal_agency field was not updated
self.assertEqual(self.domain_info.federal_agency, "World War I Centennial Commission")
def test_patch_agency_info_skips_valid_domains(self):
"""
Tests that the `patch_federal_agency_info` command logs INFO and