diff --git a/src/registrar/management/commands/patch_federal_agency_info.py b/src/registrar/management/commands/patch_federal_agency_info.py new file mode 100644 index 000000000..87fa0972a --- /dev/null +++ b/src/registrar/management/commands/patch_federal_agency_info.py @@ -0,0 +1,148 @@ +"""""" +import argparse +import json +import logging + +import os +from typing import List + +from django.core.management import BaseCommand +from registrar.management.commands.utility.epp_data_containers import AgencyAdhoc, AuthorityAdhoc, DomainAdditionalData, EnumFilenames +from registrar.management.commands.utility.extra_transition_domain_helper import ExtraTransitionDomain, MigrationDataFileLoader +from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper +from registrar.management.commands.utility.transition_domain_arguments import TransitionDomainArguments +from registrar.models.domain_information import DomainInformation +from django.db.models import Q + +from registrar.models.transition_domain import TransitionDomain + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Runs the cat command on files from /tmp into the getgov directory." + + def __init__(self): + super().__init__() + self.di_to_update: List[DomainInformation] = [] + + # Stores the domain_name for logging purposes + self.di_failed_to_update: List[str] = [] + self.di_skipped: List[str] = [] + + def add_arguments(self, parser): + """Adds command line arguments""" + parser.add_argument("--debug", action=argparse.BooleanOptionalAction) + + def handle(self, **options): + debug = options.get("debug") + domain_info_to_fix = DomainInformation.objects.filter(Q(federal_agency=None) | Q(federal_agency="")) + + domain_names = domain_info_to_fix.values_list('domain__name', flat=True) + transition_domains = TransitionDomain.objects.filter(domain_name__in=domain_names) + + # Get the domain names from TransitionDomain + td_agencies = transition_domains.values_list("domain_name", "federal_agency").distinct() + + TerminalHelper.prompt_for_execution( + system_exit_on_terminate=True, + info_to_inspect=f""" + ==Proposed Changes== + Number of DomainInformation objects to change: {len(td_agencies)} + The following DomainInformation objects will be modified: {td_agencies} + """, + prompt_title="Do you wish to update organization address data for DomainInformation as well?", + ) + logger.info("Updating...") + + # Create a dictionary mapping of domain_name to federal_agency + td_dict = dict(td_agencies) + + for di in domain_info_to_fix: + domain_name = di.domain.name + if domain_name in td_dict and td_dict.get(domain_name) is not None: + # Grab existing federal_agency data + di.federal_agency = td_dict.get(domain_name) + # Append it to our update list + self.di_to_update.append(di) + if debug: + logger.info( + f"{TerminalColors.OKCYAN}Updated {di}{TerminalColors.ENDC}" + ) + else: + self.di_skipped.append(di) + if debug: + logger.info( + f"{TerminalColors.YELLOW}Skipping update for {di}{TerminalColors.ENDC}" + ) + + DomainInformation.objects.bulk_update(self.di_to_update, ["federal_agency"]) + + # After the update has happened, do a sweep of what we get back. + # If the fields we expect to update are still None, then something is wrong. + for di in domain_info_to_fix: + if domain_name in td_dict and td_dict.get(domain_name) is not None: + logger.info( + f"{TerminalColors.FAIL}Failed to update {di}{TerminalColors.ENDC}" + ) + self.di_failed_to_update.append(di) + + # === Log results and return data === # + self.log_script_run_summary(debug) + + def log_script_run_summary(self, debug): + """Prints success, failed, and skipped counts, as well as + all affected objects.""" + update_success_count = len(self.di_to_update) + update_failed_count = len(self.di_failed_to_update) + update_skipped_count = len(self.di_skipped) + + # Prepare debug messages + debug_messages = { + "success": (f"{TerminalColors.OKCYAN}Updated: {self.di_to_update}{TerminalColors.ENDC}\n"), + "skipped": (f"{TerminalColors.YELLOW}Skipped: {self.di_skipped}{TerminalColors.ENDC}\n"), + "failed": ( + f"{TerminalColors.FAIL}Failed: {self.di_failed_to_update}{TerminalColors.ENDC}\n" + ), + } + + # Print out a list of everything that was changed, if we have any changes to log. + # Otherwise, don't print anything. + TerminalHelper.print_conditional( + debug, + f"{debug_messages.get('success') if update_success_count > 0 else ''}" + f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}" + f"{debug_messages.get('failed') if update_failed_count > 0 else ''}", + ) + + if update_failed_count == 0 and update_skipped_count == 0: + logger.info( + f"""{TerminalColors.OKGREEN} + ============= FINISHED =============== + Updated {update_success_count} DomainInformation entries + {TerminalColors.ENDC} + """ + ) + elif update_failed_count == 0: + logger.info( + f"""{TerminalColors.YELLOW} + ============= FINISHED =============== + Updated {update_success_count} DomainInformation entries + + ----- SOME AGENCY DATA WAS NONE ----- + Skipped updating {update_skipped_count} DomainInformation entries + {TerminalColors.ENDC} + """ + ) + else: + logger.info( + f"""{TerminalColors.FAIL} + ============= FINISHED =============== + Updated {update_success_count} DomainInformation entries + + ----- UPDATE FAILED ----- + Failed to update {update_failed_count} DomainInformation entries, + Skipped updating {update_skipped_count} DomainInformation entries + {TerminalColors.ENDC} + """ + ) \ No newline at end of file