Minor refactor

This commit is contained in:
zandercymatics 2023-11-21 10:52:59 -07:00
parent 8ef2a8ec02
commit bd89eea21f
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7

View file

@ -12,6 +12,7 @@ from registrar.management.commands.utility.transition_domain_arguments import Tr
from registrar.models import TransitionDomain, DomainInformation from registrar.models import TransitionDomain, DomainInformation
from django.core.paginator import Paginator from django.core.paginator import Paginator
from typing import List from typing import List
from registrar.models.domain import Domain
from registrar.utility.errors import LoadOrganizationError, LoadOrganizationErrorCodes from registrar.utility.errors import LoadOrganizationError, LoadOrganizationErrorCodes
@ -163,15 +164,10 @@ class Command(BaseCommand):
if len(target_transition_domains) != len(transition_domains): if len(target_transition_domains) != len(transition_domains):
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.TRANSITION_DOMAINS_NOT_FOUND) raise LoadOrganizationError(code=LoadOrganizationErrorCodes.TRANSITION_DOMAINS_NOT_FOUND)
# Start with all DomainInformation objects # Maps TransitionDomain <--> DomainInformation.
domain_informations = DomainInformation.objects.all()
domain_informations_dict = {di.domain.name: di for di in domain_informations if di.domain is not None}
# Then, use each domain object to map TransitionDomain <--> DomainInformation
# Fetches all DomainInformations in one query.
# If any related organization fields have been updated, # If any related organization fields have been updated,
# we can assume that they modified this information themselves - thus we should not update it. # we can assume that they modified this information themselves - thus we should not update it.
domain_informations = domain_informations.filter( domain_informations = DomainInformation.objects.filter(
domain__name__in=[td.domain_name for td in transition_domains], domain__name__in=[td.domain_name for td in transition_domains],
address_line1__isnull=True, address_line1__isnull=True,
city__isnull=True, city__isnull=True,
@ -182,9 +178,7 @@ class Command(BaseCommand):
# === Create DomainInformation objects === # # === Create DomainInformation objects === #
for item in transition_domains: for item in transition_domains:
self.map_transition_domain_to_domain_information( self.map_transition_domain_to_domain_information(item, filtered_domain_informations_dict, debug)
item, domain_informations_dict, filtered_domain_informations_dict, debug
)
# === Log results and return data === # # === Log results and return data === #
if len(self.domains_failed_to_update) > 0: if len(self.domains_failed_to_update) > 0:
@ -227,18 +221,14 @@ class Command(BaseCommand):
f"{TerminalColors.ENDC}" f"{TerminalColors.ENDC}"
) )
def map_transition_domain_to_domain_information( def map_transition_domain_to_domain_information(self, item, domain_informations_dict, debug):
self, item, domain_informations_dict, filtered_domain_informations_dict, debug
):
"""Attempts to return a DomainInformation object based on values from TransitionDomain. """Attempts to return a DomainInformation object based on values from TransitionDomain.
Any domains which cannot be updated will be stored in an array. Any domains which cannot be updated will be stored in an array.
""" """
does_not_exist: bool = self.is_domain_name_missing(item, domain_informations_dict) does_not_exist: bool = self.is_domain_name_missing(item, domain_informations_dict)
all_fields_are_none: bool = self.is_organization_data_missing(item) all_fields_are_none: bool = self.is_organization_data_missing(item)
user_updated_field: bool = self.is_domain_name_missing(item, filtered_domain_informations_dict)
if does_not_exist: if does_not_exist:
logger.error(f"Could not add {item.domain_name}. Domain does not exist.") self.handle_if_domain_name_missing(item.domain_name)
self.domains_failed_to_update.append(item)
elif all_fields_are_none: elif all_fields_are_none:
logger.info( logger.info(
f"{TerminalColors.YELLOW}" f"{TerminalColors.YELLOW}"
@ -246,16 +236,9 @@ class Command(BaseCommand):
f"{TerminalColors.ENDC}" f"{TerminalColors.ENDC}"
) )
self.domains_skipped.append(item) self.domains_skipped.append(item)
elif user_updated_field:
logger.info(
f"{TerminalColors.YELLOW}"
f"Domain {item.domain_name} was updated by a user. Cannot update."
f"{TerminalColors.ENDC}"
)
self.domains_skipped.append(item)
else: else:
# Based on the current domain, grab the right DomainInformation object. # Based on the current domain, grab the right DomainInformation object.
current_domain_information = filtered_domain_informations_dict[item.domain_name] current_domain_information = domain_informations_dict[item.domain_name]
if current_domain_information.domain is None or current_domain_information.domain.name is None: if current_domain_information.domain is None or current_domain_information.domain.name is None:
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.DOMAIN_NAME_WAS_NONE) raise LoadOrganizationError(code=LoadOrganizationErrorCodes.DOMAIN_NAME_WAS_NONE)
@ -277,3 +260,30 @@ class Command(BaseCommand):
"""Checks if all desired Organization fields to update are none""" """Checks if all desired Organization fields to update are none"""
fields = [item.address_line, item.city, item.state_territory, item.zipcode] fields = [item.address_line, item.city, item.state_territory, item.zipcode]
return all(field is None for field in fields) return all(field is None for field in fields)
def handle_if_domain_name_missing(self, domain_name):
"""
Infers what to log if we can't find a domain_name and updates the relevant lists.
This function performs the following checks:
1. If the domain does not exist, it logs an error and adds the domain name to the `domains_failed_to_update` list.
2. If the domain was updated by a user, it logs an info message and adds the domain name to the `domains_skipped` list.
3. If there are duplicate domains, it logs an error and adds the domain name to the `domains_failed_to_update` list.
Args:
domain_name (str): The name of the domain to check.
"""
domains = Domain.objects.filter(name=domain_name)
if domains.count() == 0:
logger.error(f"Could not add {domain_name}. Domain does not exist.")
self.domains_failed_to_update.append(domain_name)
elif domains.count() == 1:
logger.info(
f"{TerminalColors.YELLOW}"
f"Domain {domain_name} was updated by a user. Cannot update."
f"{TerminalColors.ENDC}"
)
self.domains_skipped.append(domain_name)
else:
logger.error(f"Could not add {domain_name}. Duplicate domains exist.")
self.domains_failed_to_update.append(domain_name)