Code cleanup

This commit is contained in:
zandercymatics 2023-11-20 08:58:27 -07:00
parent 2659cd816b
commit e853d4ef16
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
2 changed files with 55 additions and 41 deletions

View file

@ -3,17 +3,18 @@
import argparse
import json
import logging
import os
from django.core.management import BaseCommand
from registrar.management.commands.utility.extra_transition_domain_helper import OrganizationDataLoader
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from registrar.management.commands.utility.transition_domain_arguments import TransitionDomainArguments
from registrar.models import TransitionDomain
from registrar.models.domain import Domain
from registrar.models.domain_information import DomainInformation
from registrar.models import TransitionDomain, DomainInformation
from django.core.paginator import Paginator
from typing import List
from registrar.utility.errors import LoadOrganizationError, LoadOrganizationErrorCodes
logger = logging.getLogger(__name__)
@ -48,25 +49,15 @@ class Command(BaseCommand):
"""Process the objects in TransitionDomain."""
# === Parse JSON file === #
# Desired directory for additional TransitionDomain data
# (In the event they are stored seperately)
directory = options["directory"]
# Add a slash if the last character isn't one
if directory and directory[-1] != "/":
directory += "/"
json_filepath = directory + migration_json_filename
json_filepath = os.path.join(options["directory"], migration_json_filename)
# If a JSON was provided, use its values instead of defaults.
with open(json_filepath, "r") as jsonFile:
# load JSON object as a dictionary
try:
data = json.load(jsonFile)
# Create an instance of TransitionDomainArguments
# Iterate over the data from the JSON file
for key, value in data.items():
if value is not None and value.strip() != "":
options[key] = value
# Iterate over the data from the JSON file. Skip any unused values.
options.update({key: value for key, value in data.items() if value is not None and value.strip() != ""})
except Exception as err:
logger.error(
f"{TerminalColors.FAIL}"
@ -76,6 +67,7 @@ class Command(BaseCommand):
)
raise err
# === End parse JSON file === #
args = TransitionDomainArguments(**options)
changed_fields = [
@ -90,7 +82,7 @@ class Command(BaseCommand):
==Master data file==
domain_additional_filename: {args.domain_additional_filename}
==Organization name information==
==Organization data==
organization_adhoc_filename: {args.organization_adhoc_filename}
==Containing directory==
@ -151,51 +143,46 @@ class Command(BaseCommand):
transition_domains = TransitionDomain.objects.filter(
username__in=[item.username for item in desired_objects],
domain_name__in=[item.domain_name for item in desired_objects],
).distinct()
)
# This indicates that some form of data corruption happened.
if len(desired_objects) != len(transition_domains):
raise Exception("Could not find all desired TransitionDomains")
# Then, for each domain_name grab the associated domain object.
domains = Domain.objects.filter(name__in=[td.domain_name for td in transition_domains])
# Create dictionary for faster lookup
domains_dict = {d.name: d for d in domains}
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.TRANSITION_DOMAINS_NOT_FOUND)
# Start with all DomainInformation objects
filtered_domain_informations = DomainInformation.objects.all()
domain_informations = DomainInformation.objects.all()
domain_informations_dict = {di.domain.name: di for di in domain_informations}
# Then, use each domain object to map domain <--> DomainInformation
# Then, use each domain object to map TransitionDomain <--> DomainInformation
# Fetches all DomainInformations in one query.
# If any related organization fields have been updated,
# we can assume that they modified this information themselves - thus we should not update it.
domain_informations = filtered_domain_informations.filter(
domain__in=domains,
domain_informations = domain_informations.filter(
domain__name__in=[td.domain_name for td in transition_domains],
address_line1__isnull=True,
city__isnull=True,
state_territory__isnull=True,
zipcode__isnull=True,
)
domain_informations_dict = {di.domain.name: di for di in domain_informations}
filtered_domain_informations_dict = {di.domain.name: di for di in domain_informations}
for item in transition_domains:
if item.domain_name not in domains_dict:
if item.domain_name not in domain_informations_dict:
logger.error(f"Could not add {item.domain_name}. Domain does not exist.")
di_failed_to_update.append(item)
continue
current_domain = domains_dict[item.domain_name]
if current_domain.name not in domain_informations_dict:
if item.domain_name not in filtered_domain_informations_dict:
logger.info(
f"{TerminalColors.YELLOW}"
f"Domain {current_domain.name} was updated by a user. Cannot update."
f"Domain {item.domain_name} was updated by a user. Cannot update."
f"{TerminalColors.ENDC}"
)
di_skipped.append(item)
continue
# Based on the current domain, grab the right DomainInformation object.
current_domain_information = domain_informations_dict[current_domain.name]
current_domain_information = filtered_domain_informations_dict[item.domain_name]
# Update fields
current_domain_information.address_line1 = item.address_line
@ -205,27 +192,27 @@ class Command(BaseCommand):
di_to_update.append(current_domain_information)
if debug:
logger.info(f"Updated {current_domain.name}...")
logger.info(f"Updated {current_domain_information.domain.name}...")
if di_failed_to_update:
failed = [item.domain_name for item in di_failed_to_update]
logger.error(
f"""{TerminalColors.FAIL}
Failed to update. An exception was encountered on the following TransitionDomains: {failed}
Failed to update. An exception was encountered on the following DomainInformations: {failed}
{TerminalColors.ENDC}"""
)
raise Exception("Failed to update DomainInformations")
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.UPDATE_DOMAIN_INFO_FAILED)
if di_skipped:
logger.info(f"Skipped updating {len(di_skipped)} fields. User-supplied data exists")
logger.info(f"Skipped updating {len(di_skipped)} fields. User-supplied data exists.")
self.bulk_update_domain_information(di_to_update, debug)
def bulk_update_domain_information(self, di_to_update, debug):
if debug:
logger.info(f"Updating these TransitionDomains: {[item for item in di_to_update]}")
logger.info(f"Updating these DomainInformations: {[item for item in di_to_update]}")
logger.info(f"Ready to update {len(di_to_update)} TransitionDomains.")
logger.info(f"Ready to update {len(di_to_update)} DomainInformations.")
logger.info(f"{TerminalColors.MAGENTA}" "Beginning mass DomainInformation update..." f"{TerminalColors.ENDC}")

View file

@ -57,6 +57,33 @@ contact help@get.gov
return f"{self.message}"
class LoadOrganizationErrorCodes(IntEnum):
TRANSITION_DOMAINS_NOT_FOUND = 1
UPDATE_DOMAIN_INFO_FAILED = 2
class LoadOrganizationError(Exception):
"""
Error class used in the load_organization_data script
"""
_error_mapping = {
LoadOrganizationErrorCodes.TRANSITION_DOMAINS_NOT_FOUND: (
"Could not find all desired TransitionDomains. " "(Possible data corruption?)"
),
LoadOrganizationErrorCodes.UPDATE_DOMAIN_INFO_FAILED: "Failed to update DomainInformation",
}
def __init__(self, *args, code=None, **kwargs):
super().__init__(*args, **kwargs)
self.code = code
if self.code in self._error_mapping:
self.message = self._error_mapping.get(self.code)
def __str__(self):
return f"{self.message}"
class NameserverErrorCodes(IntEnum):
"""Used in the NameserverError class for
error mapping.