diff --git a/src/registrar/management/commands/patch_federal_agency_info.py b/src/registrar/management/commands/patch_federal_agency_info.py index 6ae679ed5..0903858ee 100644 --- a/src/registrar/management/commands/patch_federal_agency_info.py +++ b/src/registrar/management/commands/patch_federal_agency_info.py @@ -1,4 +1,4 @@ -"""""" +"""Loops through each valid DomainInformation object and updates its agency value""" import argparse import logging @@ -6,7 +6,6 @@ from typing import List from django.core.management import BaseCommand from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper -from registrar.models.domain import Domain from registrar.models.domain_information import DomainInformation from django.db.models import Q @@ -16,7 +15,7 @@ logger = logging.getLogger(__name__) class Command(BaseCommand): - help = "Runs the cat command on files from /tmp into the getgov directory." + help = "Loops through each valid DomainInformation object and updates its agency value" def __init__(self): super().__init__() @@ -30,18 +29,26 @@ class Command(BaseCommand): """Adds command line arguments""" parser.add_argument("--debug", action=argparse.BooleanOptionalAction) - def handle(self, **options): - debug = options.get("debug") + def handle(self, **kwargs): + """Loops through each valid DomainInformation object and updates its agency value""" + debug = kwargs.get("debug") # Update the "federal_agency" field self.patch_agency_info(debug) def patch_agency_info(self, debug): - domain_info_to_fix = DomainInformation.objects.filter(Q(federal_agency=None) | Q(federal_agency="")) + """ + Updates the `federal_agency` field of each valid `DomainInformation` object based on the corresponding + `TransitionDomain` object. Skips the update if the `TransitionDomain` object does not exist or its + `federal_agency` field is `None`. Logs the update, skip, and failure actions if debug mode is on. + After all updates, logs a summary of the results. + """ + empty_agency_query = Q(federal_agency=None) | Q(federal_agency="") + domain_info_to_fix = DomainInformation.objects.filter(empty_agency_query) + + domain_names = domain_info_to_fix.values_list("domain__name", flat=True) + transition_domains = TransitionDomain.objects.filter(domain_name__in=domain_names).exclude(empty_agency_query) - domain_names = domain_info_to_fix.values_list('domain__name', flat=True) - transition_domains = TransitionDomain.objects.filter(domain_name__in=domain_names) - # Get the domain names from TransitionDomain td_agencies = transition_domains.values_list("domain_name", "federal_agency").distinct() @@ -63,18 +70,19 @@ class Command(BaseCommand): for di in domain_info_to_fix: domain_name = di.domain.name federal_agency = td_dict.get(domain_name) + log_message = None # If agency exists on a TransitionDomain, update the related DomainInformation object if domain_name in td_dict and federal_agency is not None: di.federal_agency = federal_agency self.di_to_update.append(di) log_message = f"{TerminalColors.OKCYAN}Updated {di}{TerminalColors.ENDC}" - else: + elif domain_name not in td_dict: self.di_skipped.append(di) log_message = f"{TerminalColors.YELLOW}Skipping update for {di}{TerminalColors.ENDC}" - + # Log the action if debug mode is on - if debug: + if debug and log_message is not None: logger.info(log_message) # Bulk update the federal agency field in DomainInformation objects @@ -87,11 +95,9 @@ class Command(BaseCommand): # If the fields we expect to update are still None, then something is wrong. for di in corrected_domains: if domain_name in td_dict and td_dict.get(domain_name) is None: - logger.info( - f"{TerminalColors.FAIL}Failed to update {di}{TerminalColors.ENDC}" - ) + logger.info(f"{TerminalColors.FAIL}Failed to update {di}{TerminalColors.ENDC}") self.di_failed_to_update.append(di) - + # === Log results and return data === # self.log_script_run_summary(debug) @@ -106,9 +112,7 @@ class Command(BaseCommand): debug_messages = { "success": (f"{TerminalColors.OKCYAN}Updated: {self.di_to_update}{TerminalColors.ENDC}\n"), "skipped": (f"{TerminalColors.YELLOW}Skipped: {self.di_skipped}{TerminalColors.ENDC}\n"), - "failed": ( - f"{TerminalColors.FAIL}Failed: {self.di_failed_to_update}{TerminalColors.ENDC}\n" - ), + "failed": (f"{TerminalColors.FAIL}Failed: {self.di_failed_to_update}{TerminalColors.ENDC}\n"), } # Print out a list of everything that was changed, if we have any changes to log. @@ -129,7 +133,7 @@ class Command(BaseCommand): """ ) elif update_failed_count == 0: - logger.info( + logger.warning( f"""{TerminalColors.YELLOW} ============= FINISHED =============== Updated {update_success_count} DomainInformation entries @@ -140,7 +144,7 @@ class Command(BaseCommand): """ ) else: - logger.info( + logger.error( f"""{TerminalColors.FAIL} ============= FINISHED =============== Updated {update_success_count} DomainInformation entries @@ -150,4 +154,4 @@ class Command(BaseCommand): Skipped updating {update_skipped_count} DomainInformation entries {TerminalColors.ENDC} """ - ) \ No newline at end of file + ) diff --git a/src/registrar/tests/test_transition_domain_migrations.py b/src/registrar/tests/test_transition_domain_migrations.py index f3fd76e88..472dbac86 100644 --- a/src/registrar/tests/test_transition_domain_migrations.py +++ b/src/registrar/tests/test_transition_domain_migrations.py @@ -21,6 +21,65 @@ from registrar.models.contact import Contact from .common import MockEppLib, less_console_noise +class TestPatchAgencyInfo(TestCase): + def setUp(self): + self.user, _ = User.objects.get_or_create(username="testuser") + self.domain, _ = Domain.objects.get_or_create(name="testdomain.gov") + self.domain_info, _ = DomainInformation.objects.get_or_create(domain=self.domain, creator=self.user) + self.transition_domain, _ = TransitionDomain.objects.get_or_create( + domain_name="testdomain.gov", federal_agency="test agency" + ) + + def tearDown(self): + Domain.objects.all().delete() + DomainInformation.objects.all().delete() + User.objects.all().delete() + TransitionDomain.objects.all().delete() + + @patch("registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", return_value=True) + def call_patch_federal_agency_info(self, mock_prompt): + """Calls the patch_federal_agency_info command and mimics a keypress""" + call_command("patch_federal_agency_info", debug=True) + + def test_patch_agency_info(self): + """ + Tests that the `patch_federal_agency_info` command successfully + updates the `federal_agency` field + of a `DomainInformation` object when the corresponding + `TransitionDomain` object has a valid `federal_agency`. + """ + self.call_patch_federal_agency_info() + + # Reload the domain_info object from the database + self.domain_info.refresh_from_db() + + # Check that the federal_agency field was updated + self.assertEqual(self.domain_info.federal_agency, "test agency") + + def test_patch_agency_info_skip(self): + """ + Tests that the `patch_federal_agency_info` command logs a warning and + does not update the `federal_agency` field + of a `DomainInformation` object when the corresponding + `TransitionDomain` object does not exist. + """ + # Set federal_agency to None to simulate a skip + self.transition_domain.federal_agency = None + self.transition_domain.save() + + with self.assertLogs("registrar.management.commands.patch_federal_agency_info", level="WARNING") as context: + self.call_patch_federal_agency_info() + + # Check that the correct log message was output + self.assertIn("SOME AGENCY DATA WAS NONE", context.output[0]) + + # Reload the domain_info object from the database + self.domain_info.refresh_from_db() + + # Check that the federal_agency field was not updated + self.assertIsNone(self.domain_info.federal_agency) + + class TestExtendExpirationDates(MockEppLib): def setUp(self): """Defines the file name of migration_json and the folder its contained in"""