Script for patching agency info

This commit is contained in:
zandercymatics 2023-12-28 15:04:12 -07:00
parent 3e9b8e9c82
commit 4bf0a4d6a9
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
2 changed files with 214 additions and 0 deletions

View file

@ -0,0 +1,148 @@
""""""
import argparse
import json
import logging
import os
from typing import List
from django.core.management import BaseCommand
from registrar.management.commands.utility.epp_data_containers import AgencyAdhoc, AuthorityAdhoc, DomainAdditionalData, EnumFilenames
from registrar.management.commands.utility.extra_transition_domain_helper import ExtraTransitionDomain, MigrationDataFileLoader
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from registrar.management.commands.utility.transition_domain_arguments import TransitionDomainArguments
from registrar.models.domain_information import DomainInformation
from django.db.models import Q
from registrar.models.transition_domain import TransitionDomain
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Runs the cat command on files from /tmp into the getgov directory."
def __init__(self):
super().__init__()
self.di_to_update: List[DomainInformation] = []
# Stores the domain_name for logging purposes
self.di_failed_to_update: List[str] = []
self.di_skipped: List[str] = []
def add_arguments(self, parser):
"""Adds command line arguments"""
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
def handle(self, **options):
debug = options.get("debug")
domain_info_to_fix = DomainInformation.objects.filter(Q(federal_agency=None) | Q(federal_agency=""))
domain_names = domain_info_to_fix.values_list('domain__name', flat=True)
transition_domains = TransitionDomain.objects.filter(domain_name__in=domain_names)
# Get the domain names from TransitionDomain
td_agencies = transition_domains.values_list("domain_name", "federal_agency").distinct()
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
==Proposed Changes==
Number of DomainInformation objects to change: {len(td_agencies)}
The following DomainInformation objects will be modified: {td_agencies}
""",
prompt_title="Do you wish to update organization address data for DomainInformation as well?",
)
logger.info("Updating...")
# Create a dictionary mapping of domain_name to federal_agency
td_dict = dict(td_agencies)
for di in domain_info_to_fix:
domain_name = di.domain.name
if domain_name in td_dict and td_dict.get(domain_name) is not None:
# Grab existing federal_agency data
di.federal_agency = td_dict.get(domain_name)
# Append it to our update list
self.di_to_update.append(di)
if debug:
logger.info(
f"{TerminalColors.OKCYAN}Updated {di}{TerminalColors.ENDC}"
)
else:
self.di_skipped.append(di)
if debug:
logger.info(
f"{TerminalColors.YELLOW}Skipping update for {di}{TerminalColors.ENDC}"
)
DomainInformation.objects.bulk_update(self.di_to_update, ["federal_agency"])
# After the update has happened, do a sweep of what we get back.
# If the fields we expect to update are still None, then something is wrong.
for di in domain_info_to_fix:
if domain_name in td_dict and td_dict.get(domain_name) is not None:
logger.info(
f"{TerminalColors.FAIL}Failed to update {di}{TerminalColors.ENDC}"
)
self.di_failed_to_update.append(di)
# === Log results and return data === #
self.log_script_run_summary(debug)
def log_script_run_summary(self, debug):
"""Prints success, failed, and skipped counts, as well as
all affected objects."""
update_success_count = len(self.di_to_update)
update_failed_count = len(self.di_failed_to_update)
update_skipped_count = len(self.di_skipped)
# Prepare debug messages
debug_messages = {
"success": (f"{TerminalColors.OKCYAN}Updated: {self.di_to_update}{TerminalColors.ENDC}\n"),
"skipped": (f"{TerminalColors.YELLOW}Skipped: {self.di_skipped}{TerminalColors.ENDC}\n"),
"failed": (
f"{TerminalColors.FAIL}Failed: {self.di_failed_to_update}{TerminalColors.ENDC}\n"
),
}
# Print out a list of everything that was changed, if we have any changes to log.
# Otherwise, don't print anything.
TerminalHelper.print_conditional(
debug,
f"{debug_messages.get('success') if update_success_count > 0 else ''}"
f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}"
f"{debug_messages.get('failed') if update_failed_count > 0 else ''}",
)
if update_failed_count == 0 and update_skipped_count == 0:
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============
Updated {update_success_count} DomainInformation entries
{TerminalColors.ENDC}
"""
)
elif update_failed_count == 0:
logger.info(
f"""{TerminalColors.YELLOW}
============= FINISHED ===============
Updated {update_success_count} DomainInformation entries
----- SOME AGENCY DATA WAS NONE -----
Skipped updating {update_skipped_count} DomainInformation entries
{TerminalColors.ENDC}
"""
)
else:
logger.info(
f"""{TerminalColors.FAIL}
============= FINISHED ===============
Updated {update_success_count} DomainInformation entries
----- UPDATE FAILED -----
Failed to update {update_failed_count} DomainInformation entries,
Skipped updating {update_skipped_count} DomainInformation entries
{TerminalColors.ENDC}
"""
)

View file

@ -944,6 +944,72 @@ class OrganizationDataLoader:
else:
logger.warning(f"Updated existing {field_name} to '{changed_value}' on {domain_name}")
class MigrationDataFileLoader:
def __init__(self, files_to_load: List[EnumFilenames], options: TransitionDomainArguments):
all_valid_files = self.get_file_map(options)
# Get the list of files we want to load, and coerce them
# into the right format.
desired_files = []
for file in all_valid_files:
if file[0] in files_to_load:
desired_files.append(file)
else:
raise Exception("Invalid file type specified")
# Specify which files we want to load
options.pattern_map_params = desired_files
self.file_data_helper = ExtraTransitionDomain(options)
# Load data from all specified files
self.file_data_helper.parse_all_files(infer_filenames=False)
# Store the file data in a variable
self.file_data = self.file_data_helper.file_data
def get_file_map(self, options: TransitionDomainArguments):
"""Returns metadata about how we should parse desired files"""
file_map = [
(
EnumFilenames.AGENCY_ADHOC,
options.agency_adhoc_filename,
AgencyAdhoc,
"agencyid",
),
(
EnumFilenames.DOMAIN_ADDITIONAL,
options.domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.DOMAIN_ESCROW,
options.domain_escrow_filename,
DomainEscrow,
"domainname",
),
(
EnumFilenames.DOMAIN_ADHOC,
options.domain_adhoc_filename,
DomainTypeAdhoc,
"domaintypeid",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
options.organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
]
return file_map
class ExtraTransitionDomain:
"""Helper class to aid in storing TransitionDomain data spread across