Merge pull request #1579 from cisagov/za/patch-agency-info

Ticket #1513: Fix blank values for federal_agency information
This commit is contained in:
zandercymatics 2024-01-11 11:02:09 -07:00 committed by GitHub
commit c362b1a23f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 519 additions and 2 deletions

View file

@ -524,3 +524,37 @@ Example: `cf ssh getgov-za`
| 2 | **debug** | Increases logging detail. Defaults to False. |
| 3 | **limitParse** | Determines how many domains to parse. Defaults to all. |
| 4 | **disableIdempotentCheck** | Boolean that determines if we should check for idempotence or not. Compares the proposed extension date to the value in TransitionDomains. Defaults to False. |
## Patch Federal Agency Info
This section outlines how to use `patch_federal_agency_info.py`
### Running on sandboxes
#### Step 1: Grab the latest `current-full.csv` file from the dotgov-data repo
Download the csv from [here](https://github.com/cisagov/dotgov-data/blob/main/current-full.csv) and place this file under the `src/migrationdata/` directory.
#### Step 2: Transfer the `current-full.csv` file to your sandbox
[Click here to go to the section about transferring data to sandboxes](#step-1-transfer-data-to-sandboxes)
#### Step 3: Login to CloudFoundry
```cf login -a api.fr.cloud.gov --sso```
#### Step 4: SSH into your environment
```cf ssh getgov-{space}```
Example: `cf ssh getgov-za`
#### Step 5: Create a shell instance
```/tmp/lifecycle/shell```
#### Step 6: Patch agency info
```./manage.py patch_federal_agency_info migrationdata/current-full.csv --debug```
### Running locally
```docker-compose exec app ./manage.py patch_federal_agency_info migrationdata/current-full.csv --debug```
##### Optional parameters
| | Parameter | Description |
|:-:|:-------------------------- |:----------------------------------------------------------------------------|
| 1 | **debug** | Increases logging detail. Defaults to False. |

View file

@ -0,0 +1,262 @@
"""Loops through each valid DomainInformation object and updates its agency value"""
import argparse
import csv
import logging
import os
from typing import List
from django.core.management import BaseCommand
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from registrar.models.domain_information import DomainInformation
from django.db.models import Q
from registrar.models.transition_domain import TransitionDomain
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Loops through each valid DomainInformation object and updates its agency value"
def __init__(self):
super().__init__()
self.di_to_update: List[DomainInformation] = []
self.di_failed_to_update: List[DomainInformation] = []
self.di_skipped: List[DomainInformation] = []
def add_arguments(self, parser):
"""Adds command line arguments"""
parser.add_argument(
"current_full_filepath",
help="TBD",
)
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
parser.add_argument("--sep", default=",", help="Delimiter character")
def handle(self, current_full_filepath, **kwargs):
"""Loops through each valid DomainInformation object and updates its agency value"""
debug = kwargs.get("debug")
separator = kwargs.get("sep")
# Check if the provided file path is valid
if not os.path.isfile(current_full_filepath):
raise argparse.ArgumentTypeError(f"Invalid file path '{current_full_filepath}'")
# === Update the "federal_agency" field === #
was_success = self.patch_agency_info(debug)
# === Try to process anything that was skipped === #
# We should only correct skipped records if the previous step was successful.
# If something goes wrong, then we risk corrupting data, so skip this step.
if len(self.di_skipped) > 0 and was_success:
# Flush out the list of DomainInformations to update
self.di_to_update.clear()
self.process_skipped_records(current_full_filepath, separator, debug)
# Clear the old skipped list, and log the run summary
self.di_skipped.clear()
self.log_script_run_summary(debug)
elif not was_success:
# This code should never execute. This can only occur if bulk_update somehow fails,
# which may indicate some sort of data corruption.
logger.error(
f"{TerminalColors.FAIL}"
"Could not automatically patch skipped records. The initial update failed."
"An error was encountered when running this script, please inspect the following "
f"records for accuracy and completeness: {self.di_failed_to_update}"
f"{TerminalColors.ENDC}"
)
def patch_agency_info(self, debug):
"""
Updates the federal_agency field of each valid DomainInformation object based on the corresponding
TransitionDomain object. Skips the update if the TransitionDomain object does not exist or its
federal_agency field is None. Logs the update, skip, and failure actions if debug mode is on.
After all updates, logs a summary of the results.
"""
# Grab all DomainInformation objects (and their associated TransitionDomains)
# that need to be updated
empty_agency_query = Q(federal_agency=None) | Q(federal_agency="")
domain_info_to_fix = DomainInformation.objects.filter(empty_agency_query)
domain_names = domain_info_to_fix.values_list("domain__name", flat=True)
transition_domains = TransitionDomain.objects.filter(domain_name__in=domain_names).exclude(empty_agency_query)
# Get the domain names from TransitionDomain
td_agencies = transition_domains.values_list("domain_name", "federal_agency").distinct()
human_readable_domain_names = list(domain_names)
# Code execution will stop here if the user prompts "N"
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
==Proposed Changes==
Number of DomainInformation objects to change: {len(human_readable_domain_names)}
The following DomainInformation objects will be modified: {human_readable_domain_names}
""",
prompt_title="Do you wish to patch federal_agency data?",
)
logger.info("Updating...")
# Create a dictionary mapping of domain_name to federal_agency
td_dict = dict(td_agencies)
for di in domain_info_to_fix:
domain_name = di.domain.name
federal_agency = td_dict.get(domain_name)
log_message = None
# If agency exists on a TransitionDomain, update the related DomainInformation object
if domain_name in td_dict:
di.federal_agency = federal_agency
self.di_to_update.append(di)
log_message = f"{TerminalColors.OKCYAN}Updated {di}{TerminalColors.ENDC}"
else:
self.di_skipped.append(di)
log_message = f"{TerminalColors.YELLOW}Skipping update for {di}{TerminalColors.ENDC}"
# Log the action if debug mode is on
if debug and log_message is not None:
logger.info(log_message)
# Bulk update the federal agency field in DomainInformation objects
DomainInformation.objects.bulk_update(self.di_to_update, ["federal_agency"])
# Get a list of each domain we changed
corrected_domains = DomainInformation.objects.filter(domain__name__in=domain_names)
# After the update has happened, do a sweep of what we get back.
# If the fields we expect to update are still None, then something is wrong.
for di in corrected_domains:
if di not in self.di_skipped and di.federal_agency is None:
logger.info(f"{TerminalColors.FAIL}Failed to update {di}{TerminalColors.ENDC}")
self.di_failed_to_update.append(di)
# === Log results and return data === #
self.log_script_run_summary(debug)
# Tracks if this script was successful. If any errors are found, something went very wrong.
was_success = len(self.di_failed_to_update) == 0
return was_success
def process_skipped_records(self, file_path, separator, debug):
"""If we encounter any DomainInformation records that do not have data in the associated
TransitionDomain record, then check the associated current-full.csv file for this
information."""
# Code execution will stop here if the user prompts "N"
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
==File location==
current-full.csv filepath: {file_path}
==Proposed Changes==
Number of DomainInformation objects to change: {len(self.di_skipped)}
The following DomainInformation objects will be modified if agency data exists in file: {self.di_skipped}
""",
prompt_title="Do you wish to patch skipped records?",
)
logger.info("Updating...")
file_data = self.read_current_full(file_path, separator)
for di in self.di_skipped:
domain_name = di.domain.name
row = file_data.get(domain_name)
fed_agency = None
if row is not None and "agency" in row:
fed_agency = row.get("agency")
# Determine if we should update this record or not.
# If we don't get any data back, something went wrong.
if fed_agency is not None:
di.federal_agency = fed_agency
self.di_to_update.append(di)
if debug:
logger.info(f"{TerminalColors.OKCYAN}" f"Updating {di}" f"{TerminalColors.ENDC}")
else:
self.di_failed_to_update.append(di)
logger.error(
f"{TerminalColors.FAIL}" f"Could not update {di}. No information found." f"{TerminalColors.ENDC}"
)
# Bulk update the federal agency field in DomainInformation objects
DomainInformation.objects.bulk_update(self.di_to_update, ["federal_agency"])
def read_current_full(self, file_path, separator):
"""Reads the current-full.csv file and stores it in a dictionary"""
with open(file_path, "r") as requested_file:
old_reader = csv.DictReader(requested_file, delimiter=separator)
# Some variants of current-full.csv have key casing differences for fields
# such as "Domain name" or "Domain Name". This corrects that.
reader = self.lowercase_fieldnames(old_reader)
# Return a dictionary with the domain name as the key,
# and the row information as the value
dict_data = {}
for row in reader:
domain_name = row.get("domain name")
if domain_name is not None:
domain_name = domain_name.lower()
dict_data[domain_name] = row
return dict_data
def lowercase_fieldnames(self, reader):
"""Lowercases all field keys in a dictreader to account for potential casing differences"""
for row in reader:
yield {k.lower(): v for k, v in row.items()}
def log_script_run_summary(self, debug):
"""Prints success, failed, and skipped counts, as well as
all affected objects."""
update_success_count = len(self.di_to_update)
update_failed_count = len(self.di_failed_to_update)
update_skipped_count = len(self.di_skipped)
# Prepare debug messages
debug_messages = {
"success": (f"{TerminalColors.OKCYAN}Updated: {self.di_to_update}{TerminalColors.ENDC}\n"),
"skipped": (f"{TerminalColors.YELLOW}Skipped: {self.di_skipped}{TerminalColors.ENDC}\n"),
"failed": (f"{TerminalColors.FAIL}Failed: {self.di_failed_to_update}{TerminalColors.ENDC}\n"),
}
# Print out a list of everything that was changed, if we have any changes to log.
# Otherwise, don't print anything.
TerminalHelper.print_conditional(
debug,
f"{debug_messages.get('success') if update_success_count > 0 else ''}"
f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}"
f"{debug_messages.get('failed') if update_failed_count > 0 else ''}",
)
if update_failed_count == 0 and update_skipped_count == 0:
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============
Updated {update_success_count} DomainInformation entries
{TerminalColors.ENDC}
"""
)
elif update_failed_count == 0:
logger.warning(
f"""{TerminalColors.YELLOW}
============= FINISHED ===============
Updated {update_success_count} DomainInformation entries
----- SOME AGENCY DATA WAS NONE (WILL BE PATCHED AUTOMATICALLY) -----
Skipped updating {update_skipped_count} DomainInformation entries
{TerminalColors.ENDC}
"""
)
else:
logger.error(
f"""{TerminalColors.FAIL}
============= FINISHED ===============
Updated {update_success_count} DomainInformation entries
----- UPDATE FAILED -----
Failed to update {update_failed_count} DomainInformation entries,
Skipped updating {update_skipped_count} DomainInformation entries
{TerminalColors.ENDC}
"""
)

View file

@ -743,6 +743,25 @@ class MockEppLib(TestCase):
],
)
mockVerisignDataInfoContact = mockDataInfoDomain.dummyInfoContactResultData(
"defaultVeri", "registrar@dotgov.gov", datetime.datetime(2023, 5, 25, 19, 45, 35), "lastPw"
)
InfoDomainWithVerisignSecurityContact = fakedEppObject(
"fakepw",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
contacts=[
common.DomainContact(
contact="defaultVeri",
type=PublicContact.ContactTypeChoices.SECURITY,
)
],
hosts=["fake.host.com"],
statuses=[
common.Status(state="serverTransferProhibited", description="", lang="en"),
common.Status(state="inactive", description="", lang="en"),
],
)
InfoDomainWithDefaultTechnicalContact = fakedEppObject(
"fakepw",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
@ -1058,6 +1077,7 @@ class MockEppLib(TestCase):
"freeman.gov": (self.InfoDomainWithContacts, None),
"threenameserversDomain.gov": (self.infoDomainThreeHosts, None),
"defaultsecurity.gov": (self.InfoDomainWithDefaultSecurityContact, None),
"adomain2.gov": (self.InfoDomainWithVerisignSecurityContact, None),
"defaulttechnical.gov": (self.InfoDomainWithDefaultTechnicalContact, None),
"justnameserver.com": (self.justNameserver, None),
}
@ -1087,6 +1107,8 @@ class MockEppLib(TestCase):
mocked_result = self.mockDefaultSecurityContact
case "defaultTech":
mocked_result = self.mockDefaultTechnicalContact
case "defaultVeri":
mocked_result = self.mockVerisignDataInfoContact
case _:
# Default contact return
mocked_result = self.mockDataInfoContact

View file

@ -4,8 +4,10 @@ from django.test import Client, RequestFactory, TestCase
from io import StringIO
from registrar.models.domain_information import DomainInformation
from registrar.models.domain import Domain
from registrar.models.public_contact import PublicContact
from registrar.models.user import User
from django.contrib.auth import get_user_model
from registrar.tests.common import MockEppLib
from registrar.utility.csv_export import (
write_header,
write_body,
@ -221,8 +223,9 @@ class CsvReportsTest(TestCase):
self.assertEqual(expected_file_content, response.content)
class ExportDataTest(TestCase):
class ExportDataTest(MockEppLib):
def setUp(self):
super().setUp()
username = "test_user"
first_name = "First"
last_name = "Last"
@ -327,11 +330,85 @@ class ExportDataTest(TestCase):
)
def tearDown(self):
PublicContact.objects.all().delete()
Domain.objects.all().delete()
DomainInformation.objects.all().delete()
User.objects.all().delete()
super().tearDown()
def test_export_domains_to_writer_security_emails(self):
"""Test that export_domains_to_writer returns the
expected security email"""
# Add security email information
self.domain_1.name = "defaultsecurity.gov"
self.domain_1.save()
# Invoke setter
self.domain_1.security_contact
# Invoke setter
self.domain_2.security_contact
# Invoke setter
self.domain_3.security_contact
# Create a CSV file in memory
csv_file = StringIO()
writer = csv.writer(csv_file)
# Define columns, sort fields, and filter condition
columns = [
"Domain name",
"Domain type",
"Agency",
"Organization name",
"City",
"State",
"AO",
"AO email",
"Security contact email",
"Status",
"Expiration date",
]
sort_fields = ["domain__name"]
filter_condition = {
"domain__state__in": [
Domain.State.READY,
Domain.State.DNS_NEEDED,
Domain.State.ON_HOLD,
],
}
self.maxDiff = None
# Call the export functions
write_header(writer, columns)
write_body(writer, columns, sort_fields, filter_condition)
# Reset the CSV file's position to the beginning
csv_file.seek(0)
# Read the content into a variable
csv_content = csv_file.read()
# We expect READY domains,
# sorted alphabetially by domain name
expected_content = (
"Domain name,Domain type,Agency,Organization name,City,State,AO,"
"AO email,Security contact email,Status,Expiration date\n"
"adomain10.gov,Federal,Armed Forces Retirement Home,Ready\n"
"adomain2.gov,Interstate,(blank),Dns needed\n"
"ddomain3.gov,Federal,Armed Forces Retirement Home,123@mail.gov,On hold,2023-05-25\n"
"defaultsecurity.gov,Federal - Executive,World War I Centennial Commission,dotgov@cisa.dhs.gov,Ready"
)
# Normalize line endings and remove commas,
# spaces and leading/trailing whitespace
csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip()
expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip()
self.assertEqual(csv_content, expected_content)
def test_write_body(self):
"""Test that write_body returns the
existing domain, test that sort by domain name works,

View file

@ -22,6 +22,116 @@ from .common import MockEppLib, MockSESClient, less_console_noise
import boto3_mocking # type: ignore
class TestPatchAgencyInfo(TestCase):
def setUp(self):
self.user, _ = User.objects.get_or_create(username="testuser")
self.domain, _ = Domain.objects.get_or_create(name="testdomain.gov")
self.domain_info, _ = DomainInformation.objects.get_or_create(domain=self.domain, creator=self.user)
self.transition_domain, _ = TransitionDomain.objects.get_or_create(
domain_name="testdomain.gov", federal_agency="test agency"
)
def tearDown(self):
Domain.objects.all().delete()
DomainInformation.objects.all().delete()
User.objects.all().delete()
TransitionDomain.objects.all().delete()
@patch("registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", return_value=True)
def call_patch_federal_agency_info(self, mock_prompt):
"""Calls the patch_federal_agency_info command and mimics a keypress"""
call_command("patch_federal_agency_info", "registrar/tests/data/fake_current_full.csv", debug=True)
def test_patch_agency_info(self):
"""
Tests that the `patch_federal_agency_info` command successfully
updates the `federal_agency` field
of a `DomainInformation` object when the corresponding
`TransitionDomain` object has a valid `federal_agency`.
"""
# Ensure that the federal_agency is None
self.assertEqual(self.domain_info.federal_agency, None)
self.call_patch_federal_agency_info()
# Reload the domain_info object from the database
self.domain_info.refresh_from_db()
# Check that the federal_agency field was updated
self.assertEqual(self.domain_info.federal_agency, "test agency")
def test_patch_agency_info_skip(self):
"""
Tests that the `patch_federal_agency_info` command logs a warning and
does not update the `federal_agency` field
of a `DomainInformation` object when the corresponding
`TransitionDomain` object does not exist.
"""
# Set federal_agency to None to simulate a skip
self.transition_domain.federal_agency = None
self.transition_domain.save()
with self.assertLogs("registrar.management.commands.patch_federal_agency_info", level="WARNING") as context:
self.call_patch_federal_agency_info()
# Check that the correct log message was output
self.assertIn("SOME AGENCY DATA WAS NONE", context.output[0])
# Reload the domain_info object from the database
self.domain_info.refresh_from_db()
# Check that the federal_agency field was not updated
self.assertIsNone(self.domain_info.federal_agency)
def test_patch_agency_info_skip_updates_data(self):
"""
Tests that the `patch_federal_agency_info` command logs a warning but
updates the DomainInformation object, because a record exists in the
provided current-full.csv file.
"""
# Set federal_agency to None to simulate a skip
self.transition_domain.federal_agency = None
self.transition_domain.save()
# Change the domain name to something parsable in the .csv
self.domain.name = "cdomain1.gov"
self.domain.save()
with self.assertLogs("registrar.management.commands.patch_federal_agency_info", level="WARNING") as context:
self.call_patch_federal_agency_info()
# Check that the correct log message was output
self.assertIn("SOME AGENCY DATA WAS NONE", context.output[0])
# Reload the domain_info object from the database
self.domain_info.refresh_from_db()
# Check that the federal_agency field was not updated
self.assertEqual(self.domain_info.federal_agency, "World War I Centennial Commission")
def test_patch_agency_info_skips_valid_domains(self):
"""
Tests that the `patch_federal_agency_info` command logs INFO and
does not update the `federal_agency` field
of a `DomainInformation` object
"""
self.domain_info.federal_agency = "unchanged"
self.domain_info.save()
with self.assertLogs("registrar.management.commands.patch_federal_agency_info", level="INFO") as context:
self.call_patch_federal_agency_info()
# Check that the correct log message was output
self.assertIn("FINISHED", context.output[1])
# Reload the domain_info object from the database
self.domain_info.refresh_from_db()
# Check that the federal_agency field was not updated
self.assertEqual(self.domain_info.federal_agency, "unchanged")
class TestExtendExpirationDates(MockEppLib):
def setUp(self):
"""Defines the file name of migration_json and the folder its contained in"""

View file

@ -26,12 +26,23 @@ def get_domain_infos(filter_condition, sort_fields):
def write_row(writer, columns, domain_info):
security_contacts = domain_info.domain.contacts.filter(contact_type=PublicContact.ContactTypeChoices.SECURITY)
# For linter
ao = " "
if domain_info.authorizing_official:
first_name = domain_info.authorizing_official.first_name or ""
last_name = domain_info.authorizing_official.last_name or ""
ao = first_name + " " + last_name
security_email = " "
if security_contacts:
security_email = security_contacts[0].email
invalid_emails = {"registrar@dotgov.gov"}
# These are default emails that should not be displayed in the csv report
if security_email is not None and security_email.lower() in invalid_emails:
security_email = "(blank)"
# create a dictionary of fields which can be included in output
FIELDS = {
"Domain name": domain_info.domain.name,
@ -44,13 +55,14 @@ def write_row(writer, columns, domain_info):
"State": domain_info.state_territory,
"AO": ao,
"AO email": domain_info.authorizing_official.email if domain_info.authorizing_official else " ",
"Security contact email": security_contacts[0].email if security_contacts else " ",
"Security contact email": security_email,
"Status": domain_info.domain.get_state_display(),
"Expiration date": domain_info.domain.expiration_date,
"Created at": domain_info.domain.created_at,
"First ready": domain_info.domain.first_ready,
"Deleted": domain_info.domain.deleted,
}
writer.writerow([FIELDS.get(column, "") for column in columns])