diff --git a/docs/operations/data_migration.md b/docs/operations/data_migration.md index a45e27982..dd55fc1b6 100644 --- a/docs/operations/data_migration.md +++ b/docs/operations/data_migration.md @@ -524,3 +524,37 @@ Example: `cf ssh getgov-za` | 2 | **debug** | Increases logging detail. Defaults to False. | | 3 | **limitParse** | Determines how many domains to parse. Defaults to all. | | 4 | **disableIdempotentCheck** | Boolean that determines if we should check for idempotence or not. Compares the proposed extension date to the value in TransitionDomains. Defaults to False. | + + +## Patch Federal Agency Info +This section outlines how to use `patch_federal_agency_info.py` + +### Running on sandboxes + +#### Step 1: Grab the latest `current-full.csv` file from the dotgov-data repo +Download the csv from [here](https://github.com/cisagov/dotgov-data/blob/main/current-full.csv) and place this file under the `src/migrationdata/` directory. + +#### Step 2: Transfer the `current-full.csv` file to your sandbox +[Click here to go to the section about transferring data to sandboxes](#step-1-transfer-data-to-sandboxes) + +#### Step 3: Login to CloudFoundry +```cf login -a api.fr.cloud.gov --sso``` + +#### Step 4: SSH into your environment +```cf ssh getgov-{space}``` + +Example: `cf ssh getgov-za` + +#### Step 5: Create a shell instance +```/tmp/lifecycle/shell``` + +#### Step 6: Patch agency info +```./manage.py patch_federal_agency_info migrationdata/current-full.csv --debug``` + +### Running locally +```docker-compose exec app ./manage.py patch_federal_agency_info migrationdata/current-full.csv --debug``` + +##### Optional parameters +| | Parameter | Description | +|:-:|:-------------------------- |:----------------------------------------------------------------------------| +| 1 | **debug** | Increases logging detail. Defaults to False. | diff --git a/src/registrar/management/commands/patch_federal_agency_info.py b/src/registrar/management/commands/patch_federal_agency_info.py new file mode 100644 index 000000000..35642c1bf --- /dev/null +++ b/src/registrar/management/commands/patch_federal_agency_info.py @@ -0,0 +1,262 @@ +"""Loops through each valid DomainInformation object and updates its agency value""" +import argparse +import csv +import logging +import os +from typing import List + +from django.core.management import BaseCommand +from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper +from registrar.models.domain_information import DomainInformation +from django.db.models import Q + +from registrar.models.transition_domain import TransitionDomain + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Loops through each valid DomainInformation object and updates its agency value" + + def __init__(self): + super().__init__() + self.di_to_update: List[DomainInformation] = [] + self.di_failed_to_update: List[DomainInformation] = [] + self.di_skipped: List[DomainInformation] = [] + + def add_arguments(self, parser): + """Adds command line arguments""" + parser.add_argument( + "current_full_filepath", + help="TBD", + ) + parser.add_argument("--debug", action=argparse.BooleanOptionalAction) + parser.add_argument("--sep", default=",", help="Delimiter character") + + def handle(self, current_full_filepath, **kwargs): + """Loops through each valid DomainInformation object and updates its agency value""" + debug = kwargs.get("debug") + separator = kwargs.get("sep") + + # Check if the provided file path is valid + if not os.path.isfile(current_full_filepath): + raise argparse.ArgumentTypeError(f"Invalid file path '{current_full_filepath}'") + + # === Update the "federal_agency" field === # + was_success = self.patch_agency_info(debug) + + # === Try to process anything that was skipped === # + # We should only correct skipped records if the previous step was successful. + # If something goes wrong, then we risk corrupting data, so skip this step. + if len(self.di_skipped) > 0 and was_success: + # Flush out the list of DomainInformations to update + self.di_to_update.clear() + self.process_skipped_records(current_full_filepath, separator, debug) + + # Clear the old skipped list, and log the run summary + self.di_skipped.clear() + self.log_script_run_summary(debug) + elif not was_success: + # This code should never execute. This can only occur if bulk_update somehow fails, + # which may indicate some sort of data corruption. + logger.error( + f"{TerminalColors.FAIL}" + "Could not automatically patch skipped records. The initial update failed." + "An error was encountered when running this script, please inspect the following " + f"records for accuracy and completeness: {self.di_failed_to_update}" + f"{TerminalColors.ENDC}" + ) + + def patch_agency_info(self, debug): + """ + Updates the federal_agency field of each valid DomainInformation object based on the corresponding + TransitionDomain object. Skips the update if the TransitionDomain object does not exist or its + federal_agency field is None. Logs the update, skip, and failure actions if debug mode is on. + After all updates, logs a summary of the results. + """ + + # Grab all DomainInformation objects (and their associated TransitionDomains) + # that need to be updated + empty_agency_query = Q(federal_agency=None) | Q(federal_agency="") + domain_info_to_fix = DomainInformation.objects.filter(empty_agency_query) + + domain_names = domain_info_to_fix.values_list("domain__name", flat=True) + transition_domains = TransitionDomain.objects.filter(domain_name__in=domain_names).exclude(empty_agency_query) + + # Get the domain names from TransitionDomain + td_agencies = transition_domains.values_list("domain_name", "federal_agency").distinct() + + human_readable_domain_names = list(domain_names) + # Code execution will stop here if the user prompts "N" + TerminalHelper.prompt_for_execution( + system_exit_on_terminate=True, + info_to_inspect=f""" + ==Proposed Changes== + Number of DomainInformation objects to change: {len(human_readable_domain_names)} + The following DomainInformation objects will be modified: {human_readable_domain_names} + """, + prompt_title="Do you wish to patch federal_agency data?", + ) + logger.info("Updating...") + + # Create a dictionary mapping of domain_name to federal_agency + td_dict = dict(td_agencies) + + for di in domain_info_to_fix: + domain_name = di.domain.name + federal_agency = td_dict.get(domain_name) + log_message = None + + # If agency exists on a TransitionDomain, update the related DomainInformation object + if domain_name in td_dict: + di.federal_agency = federal_agency + self.di_to_update.append(di) + log_message = f"{TerminalColors.OKCYAN}Updated {di}{TerminalColors.ENDC}" + else: + self.di_skipped.append(di) + log_message = f"{TerminalColors.YELLOW}Skipping update for {di}{TerminalColors.ENDC}" + + # Log the action if debug mode is on + if debug and log_message is not None: + logger.info(log_message) + + # Bulk update the federal agency field in DomainInformation objects + DomainInformation.objects.bulk_update(self.di_to_update, ["federal_agency"]) + + # Get a list of each domain we changed + corrected_domains = DomainInformation.objects.filter(domain__name__in=domain_names) + + # After the update has happened, do a sweep of what we get back. + # If the fields we expect to update are still None, then something is wrong. + for di in corrected_domains: + if di not in self.di_skipped and di.federal_agency is None: + logger.info(f"{TerminalColors.FAIL}Failed to update {di}{TerminalColors.ENDC}") + self.di_failed_to_update.append(di) + + # === Log results and return data === # + self.log_script_run_summary(debug) + # Tracks if this script was successful. If any errors are found, something went very wrong. + was_success = len(self.di_failed_to_update) == 0 + return was_success + + def process_skipped_records(self, file_path, separator, debug): + """If we encounter any DomainInformation records that do not have data in the associated + TransitionDomain record, then check the associated current-full.csv file for this + information.""" + + # Code execution will stop here if the user prompts "N" + TerminalHelper.prompt_for_execution( + system_exit_on_terminate=True, + info_to_inspect=f""" + ==File location== + current-full.csv filepath: {file_path} + + ==Proposed Changes== + Number of DomainInformation objects to change: {len(self.di_skipped)} + The following DomainInformation objects will be modified if agency data exists in file: {self.di_skipped} + """, + prompt_title="Do you wish to patch skipped records?", + ) + logger.info("Updating...") + + file_data = self.read_current_full(file_path, separator) + for di in self.di_skipped: + domain_name = di.domain.name + row = file_data.get(domain_name) + fed_agency = None + if row is not None and "agency" in row: + fed_agency = row.get("agency") + + # Determine if we should update this record or not. + # If we don't get any data back, something went wrong. + if fed_agency is not None: + di.federal_agency = fed_agency + self.di_to_update.append(di) + if debug: + logger.info(f"{TerminalColors.OKCYAN}" f"Updating {di}" f"{TerminalColors.ENDC}") + else: + self.di_failed_to_update.append(di) + logger.error( + f"{TerminalColors.FAIL}" f"Could not update {di}. No information found." f"{TerminalColors.ENDC}" + ) + + # Bulk update the federal agency field in DomainInformation objects + DomainInformation.objects.bulk_update(self.di_to_update, ["federal_agency"]) + + def read_current_full(self, file_path, separator): + """Reads the current-full.csv file and stores it in a dictionary""" + with open(file_path, "r") as requested_file: + old_reader = csv.DictReader(requested_file, delimiter=separator) + # Some variants of current-full.csv have key casing differences for fields + # such as "Domain name" or "Domain Name". This corrects that. + reader = self.lowercase_fieldnames(old_reader) + # Return a dictionary with the domain name as the key, + # and the row information as the value + dict_data = {} + for row in reader: + domain_name = row.get("domain name") + if domain_name is not None: + domain_name = domain_name.lower() + dict_data[domain_name] = row + + return dict_data + + def lowercase_fieldnames(self, reader): + """Lowercases all field keys in a dictreader to account for potential casing differences""" + for row in reader: + yield {k.lower(): v for k, v in row.items()} + + def log_script_run_summary(self, debug): + """Prints success, failed, and skipped counts, as well as + all affected objects.""" + update_success_count = len(self.di_to_update) + update_failed_count = len(self.di_failed_to_update) + update_skipped_count = len(self.di_skipped) + + # Prepare debug messages + debug_messages = { + "success": (f"{TerminalColors.OKCYAN}Updated: {self.di_to_update}{TerminalColors.ENDC}\n"), + "skipped": (f"{TerminalColors.YELLOW}Skipped: {self.di_skipped}{TerminalColors.ENDC}\n"), + "failed": (f"{TerminalColors.FAIL}Failed: {self.di_failed_to_update}{TerminalColors.ENDC}\n"), + } + + # Print out a list of everything that was changed, if we have any changes to log. + # Otherwise, don't print anything. + TerminalHelper.print_conditional( + debug, + f"{debug_messages.get('success') if update_success_count > 0 else ''}" + f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}" + f"{debug_messages.get('failed') if update_failed_count > 0 else ''}", + ) + + if update_failed_count == 0 and update_skipped_count == 0: + logger.info( + f"""{TerminalColors.OKGREEN} + ============= FINISHED =============== + Updated {update_success_count} DomainInformation entries + {TerminalColors.ENDC} + """ + ) + elif update_failed_count == 0: + logger.warning( + f"""{TerminalColors.YELLOW} + ============= FINISHED =============== + Updated {update_success_count} DomainInformation entries + + ----- SOME AGENCY DATA WAS NONE (WILL BE PATCHED AUTOMATICALLY) ----- + Skipped updating {update_skipped_count} DomainInformation entries + {TerminalColors.ENDC} + """ + ) + else: + logger.error( + f"""{TerminalColors.FAIL} + ============= FINISHED =============== + Updated {update_success_count} DomainInformation entries + + ----- UPDATE FAILED ----- + Failed to update {update_failed_count} DomainInformation entries, + Skipped updating {update_skipped_count} DomainInformation entries + {TerminalColors.ENDC} + """ + ) diff --git a/src/registrar/tests/common.py b/src/registrar/tests/common.py index f54691202..80ec5ef3d 100644 --- a/src/registrar/tests/common.py +++ b/src/registrar/tests/common.py @@ -743,6 +743,25 @@ class MockEppLib(TestCase): ], ) + mockVerisignDataInfoContact = mockDataInfoDomain.dummyInfoContactResultData( + "defaultVeri", "registrar@dotgov.gov", datetime.datetime(2023, 5, 25, 19, 45, 35), "lastPw" + ) + InfoDomainWithVerisignSecurityContact = fakedEppObject( + "fakepw", + cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35), + contacts=[ + common.DomainContact( + contact="defaultVeri", + type=PublicContact.ContactTypeChoices.SECURITY, + ) + ], + hosts=["fake.host.com"], + statuses=[ + common.Status(state="serverTransferProhibited", description="", lang="en"), + common.Status(state="inactive", description="", lang="en"), + ], + ) + InfoDomainWithDefaultTechnicalContact = fakedEppObject( "fakepw", cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35), @@ -1058,6 +1077,7 @@ class MockEppLib(TestCase): "freeman.gov": (self.InfoDomainWithContacts, None), "threenameserversDomain.gov": (self.infoDomainThreeHosts, None), "defaultsecurity.gov": (self.InfoDomainWithDefaultSecurityContact, None), + "adomain2.gov": (self.InfoDomainWithVerisignSecurityContact, None), "defaulttechnical.gov": (self.InfoDomainWithDefaultTechnicalContact, None), "justnameserver.com": (self.justNameserver, None), } @@ -1087,6 +1107,8 @@ class MockEppLib(TestCase): mocked_result = self.mockDefaultSecurityContact case "defaultTech": mocked_result = self.mockDefaultTechnicalContact + case "defaultVeri": + mocked_result = self.mockVerisignDataInfoContact case _: # Default contact return mocked_result = self.mockDataInfoContact diff --git a/src/registrar/tests/test_reports.py b/src/registrar/tests/test_reports.py index 85e24ce33..b1c631b3d 100644 --- a/src/registrar/tests/test_reports.py +++ b/src/registrar/tests/test_reports.py @@ -4,8 +4,10 @@ from django.test import Client, RequestFactory, TestCase from io import StringIO from registrar.models.domain_information import DomainInformation from registrar.models.domain import Domain +from registrar.models.public_contact import PublicContact from registrar.models.user import User from django.contrib.auth import get_user_model +from registrar.tests.common import MockEppLib from registrar.utility.csv_export import ( write_header, write_body, @@ -221,8 +223,9 @@ class CsvReportsTest(TestCase): self.assertEqual(expected_file_content, response.content) -class ExportDataTest(TestCase): +class ExportDataTest(MockEppLib): def setUp(self): + super().setUp() username = "test_user" first_name = "First" last_name = "Last" @@ -327,11 +330,85 @@ class ExportDataTest(TestCase): ) def tearDown(self): + PublicContact.objects.all().delete() Domain.objects.all().delete() DomainInformation.objects.all().delete() User.objects.all().delete() super().tearDown() + def test_export_domains_to_writer_security_emails(self): + """Test that export_domains_to_writer returns the + expected security email""" + + # Add security email information + self.domain_1.name = "defaultsecurity.gov" + self.domain_1.save() + + # Invoke setter + self.domain_1.security_contact + + # Invoke setter + self.domain_2.security_contact + + # Invoke setter + self.domain_3.security_contact + + # Create a CSV file in memory + csv_file = StringIO() + writer = csv.writer(csv_file) + + # Define columns, sort fields, and filter condition + columns = [ + "Domain name", + "Domain type", + "Agency", + "Organization name", + "City", + "State", + "AO", + "AO email", + "Security contact email", + "Status", + "Expiration date", + ] + sort_fields = ["domain__name"] + filter_condition = { + "domain__state__in": [ + Domain.State.READY, + Domain.State.DNS_NEEDED, + Domain.State.ON_HOLD, + ], + } + + self.maxDiff = None + # Call the export functions + write_header(writer, columns) + write_body(writer, columns, sort_fields, filter_condition) + + # Reset the CSV file's position to the beginning + csv_file.seek(0) + + # Read the content into a variable + csv_content = csv_file.read() + + # We expect READY domains, + # sorted alphabetially by domain name + expected_content = ( + "Domain name,Domain type,Agency,Organization name,City,State,AO," + "AO email,Security contact email,Status,Expiration date\n" + "adomain10.gov,Federal,Armed Forces Retirement Home,Ready\n" + "adomain2.gov,Interstate,(blank),Dns needed\n" + "ddomain3.gov,Federal,Armed Forces Retirement Home,123@mail.gov,On hold,2023-05-25\n" + "defaultsecurity.gov,Federal - Executive,World War I Centennial Commission,dotgov@cisa.dhs.gov,Ready" + ) + + # Normalize line endings and remove commas, + # spaces and leading/trailing whitespace + csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() + expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() + + self.assertEqual(csv_content, expected_content) + def test_write_body(self): """Test that write_body returns the existing domain, test that sort by domain name works, diff --git a/src/registrar/tests/test_transition_domain_migrations.py b/src/registrar/tests/test_transition_domain_migrations.py index 4774e085f..8a05963b8 100644 --- a/src/registrar/tests/test_transition_domain_migrations.py +++ b/src/registrar/tests/test_transition_domain_migrations.py @@ -22,6 +22,116 @@ from .common import MockEppLib, MockSESClient, less_console_noise import boto3_mocking # type: ignore +class TestPatchAgencyInfo(TestCase): + def setUp(self): + self.user, _ = User.objects.get_or_create(username="testuser") + self.domain, _ = Domain.objects.get_or_create(name="testdomain.gov") + self.domain_info, _ = DomainInformation.objects.get_or_create(domain=self.domain, creator=self.user) + self.transition_domain, _ = TransitionDomain.objects.get_or_create( + domain_name="testdomain.gov", federal_agency="test agency" + ) + + def tearDown(self): + Domain.objects.all().delete() + DomainInformation.objects.all().delete() + User.objects.all().delete() + TransitionDomain.objects.all().delete() + + @patch("registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", return_value=True) + def call_patch_federal_agency_info(self, mock_prompt): + """Calls the patch_federal_agency_info command and mimics a keypress""" + call_command("patch_federal_agency_info", "registrar/tests/data/fake_current_full.csv", debug=True) + + def test_patch_agency_info(self): + """ + Tests that the `patch_federal_agency_info` command successfully + updates the `federal_agency` field + of a `DomainInformation` object when the corresponding + `TransitionDomain` object has a valid `federal_agency`. + """ + + # Ensure that the federal_agency is None + self.assertEqual(self.domain_info.federal_agency, None) + + self.call_patch_federal_agency_info() + + # Reload the domain_info object from the database + self.domain_info.refresh_from_db() + + # Check that the federal_agency field was updated + self.assertEqual(self.domain_info.federal_agency, "test agency") + + def test_patch_agency_info_skip(self): + """ + Tests that the `patch_federal_agency_info` command logs a warning and + does not update the `federal_agency` field + of a `DomainInformation` object when the corresponding + `TransitionDomain` object does not exist. + """ + # Set federal_agency to None to simulate a skip + self.transition_domain.federal_agency = None + self.transition_domain.save() + + with self.assertLogs("registrar.management.commands.patch_federal_agency_info", level="WARNING") as context: + self.call_patch_federal_agency_info() + + # Check that the correct log message was output + self.assertIn("SOME AGENCY DATA WAS NONE", context.output[0]) + + # Reload the domain_info object from the database + self.domain_info.refresh_from_db() + + # Check that the federal_agency field was not updated + self.assertIsNone(self.domain_info.federal_agency) + + def test_patch_agency_info_skip_updates_data(self): + """ + Tests that the `patch_federal_agency_info` command logs a warning but + updates the DomainInformation object, because a record exists in the + provided current-full.csv file. + """ + # Set federal_agency to None to simulate a skip + self.transition_domain.federal_agency = None + self.transition_domain.save() + + # Change the domain name to something parsable in the .csv + self.domain.name = "cdomain1.gov" + self.domain.save() + + with self.assertLogs("registrar.management.commands.patch_federal_agency_info", level="WARNING") as context: + self.call_patch_federal_agency_info() + + # Check that the correct log message was output + self.assertIn("SOME AGENCY DATA WAS NONE", context.output[0]) + + # Reload the domain_info object from the database + self.domain_info.refresh_from_db() + + # Check that the federal_agency field was not updated + self.assertEqual(self.domain_info.federal_agency, "World War I Centennial Commission") + + def test_patch_agency_info_skips_valid_domains(self): + """ + Tests that the `patch_federal_agency_info` command logs INFO and + does not update the `federal_agency` field + of a `DomainInformation` object + """ + self.domain_info.federal_agency = "unchanged" + self.domain_info.save() + + with self.assertLogs("registrar.management.commands.patch_federal_agency_info", level="INFO") as context: + self.call_patch_federal_agency_info() + + # Check that the correct log message was output + self.assertIn("FINISHED", context.output[1]) + + # Reload the domain_info object from the database + self.domain_info.refresh_from_db() + + # Check that the federal_agency field was not updated + self.assertEqual(self.domain_info.federal_agency, "unchanged") + + class TestExtendExpirationDates(MockEppLib): def setUp(self): """Defines the file name of migration_json and the folder its contained in""" diff --git a/src/registrar/utility/csv_export.py b/src/registrar/utility/csv_export.py index 4c46ee3a3..52afb218b 100644 --- a/src/registrar/utility/csv_export.py +++ b/src/registrar/utility/csv_export.py @@ -26,12 +26,23 @@ def get_domain_infos(filter_condition, sort_fields): def write_row(writer, columns, domain_info): security_contacts = domain_info.domain.contacts.filter(contact_type=PublicContact.ContactTypeChoices.SECURITY) + # For linter ao = " " if domain_info.authorizing_official: first_name = domain_info.authorizing_official.first_name or "" last_name = domain_info.authorizing_official.last_name or "" ao = first_name + " " + last_name + + security_email = " " + if security_contacts: + security_email = security_contacts[0].email + + invalid_emails = {"registrar@dotgov.gov"} + # These are default emails that should not be displayed in the csv report + if security_email is not None and security_email.lower() in invalid_emails: + security_email = "(blank)" + # create a dictionary of fields which can be included in output FIELDS = { "Domain name": domain_info.domain.name, @@ -44,13 +55,14 @@ def write_row(writer, columns, domain_info): "State": domain_info.state_territory, "AO": ao, "AO email": domain_info.authorizing_official.email if domain_info.authorizing_official else " ", - "Security contact email": security_contacts[0].email if security_contacts else " ", + "Security contact email": security_email, "Status": domain_info.domain.get_state_display(), "Expiration date": domain_info.domain.expiration_date, "Created at": domain_info.domain.created_at, "First ready": domain_info.domain.first_ready, "Deleted": domain_info.domain.deleted, } + writer.writerow([FIELDS.get(column, "") for column in columns])