Code clarity refactor

This commit is contained in:
zandercymatics 2023-11-21 10:28:14 -07:00
parent d5cdc624b2
commit 8ef2a8ec02
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
4 changed files with 152 additions and 148 deletions

View file

@ -19,7 +19,22 @@ logger = logging.getLogger(__name__)
class Command(BaseCommand): class Command(BaseCommand):
help = "Send domain invitations once to existing customers." help = "Load organization data on TransitionDomain and DomainInformation objects"
def __init__(self):
super().__init__()
self.domain_information_to_update: List[DomainInformation] = []
# Stores the domain_name for logging purposes
self.domains_failed_to_update: List[str] = []
self.domains_skipped: List[str] = []
self.changed_fields = [
"address_line1",
"city",
"state_territory",
"zipcode",
]
def add_arguments(self, parser): def add_arguments(self, parser):
"""Add command line arguments.""" """Add command line arguments."""
@ -49,18 +64,12 @@ class Command(BaseCommand):
def handle(self, migration_json_filename, **options): def handle(self, migration_json_filename, **options):
"""Process the objects in TransitionDomain.""" """Process the objects in TransitionDomain."""
# Parse JSON file
# === Parse JSON file === #
options = self.load_json_settings(options, migration_json_filename) options = self.load_json_settings(options, migration_json_filename)
args = TransitionDomainArguments(**options) args = TransitionDomainArguments(**options)
changed_fields = [ # Will sys.exit() when prompt is "n"
"address_line", TerminalHelper.prompt_for_execution(
"city",
"state_territory",
"zipcode",
]
proceed = TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True, system_exit_on_terminate=True,
info_to_inspect=f""" info_to_inspect=f"""
==Master data file== ==Master data file==
@ -71,22 +80,16 @@ class Command(BaseCommand):
==Containing directory== ==Containing directory==
directory: {args.directory} directory: {args.directory}
==Proposed Changes==
For each TransitionDomain, modify the following fields: {changed_fields}
""", """,
prompt_title="Do you wish to load organization data for TransitionDomains?", prompt_title="Do you wish to load organization data for TransitionDomains?",
) )
if not proceed:
return None
logger.info(f"{TerminalColors.MAGENTA}" "Loading organization data onto TransitionDomain tables...")
load = OrganizationDataLoader(args) load = OrganizationDataLoader(args)
transition_domains = load.update_organization_data_for_all() transition_domains = load.update_organization_data_for_all()
# Reprompt the user to reinspect before updating DomainInformation # Reprompt the user to reinspect before updating DomainInformation
proceed = TerminalHelper.prompt_for_execution( # Will sys.exit() when prompt is "n"
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True, system_exit_on_terminate=True,
info_to_inspect=f""" info_to_inspect=f"""
==Master data file== ==Master data file==
@ -100,23 +103,20 @@ class Command(BaseCommand):
==Proposed Changes== ==Proposed Changes==
Number of DomainInformation objects to (potentially) change: {len(transition_domains)} Number of DomainInformation objects to (potentially) change: {len(transition_domains)}
For each DomainInformation, modify the following fields: {self.changed_fields}
""", """,
prompt_title="Do you wish to load organization data for DomainInformation?", prompt_title="Do you wish to load organization data for DomainInformation?",
) )
if not proceed:
return None
if len(transition_domains) == 0:
logger.error(f"{TerminalColors.MAGENTA}" "No TransitionDomain objects exist" f"{TerminalColors.ENDC}")
return None
logger.info( logger.info(
f"{TerminalColors.MAGENTA}" f"{TerminalColors.MAGENTA}"
"Preparing to load organization data onto DomainInformation tables..." "Preparing to load organization data onto DomainInformation tables..."
f"{TerminalColors.ENDC}" f"{TerminalColors.ENDC}"
) )
self.update_domain_information(transition_domains, args.debug) self.prepare_update_domain_information(transition_domains, args.debug)
logger.info(f"{TerminalColors.MAGENTA}" "Beginning mass DomainInformation update..." f"{TerminalColors.ENDC}")
self.bulk_update_domain_information(args.debug)
def load_json_settings(self, options, migration_json_filename): def load_json_settings(self, options, migration_json_filename):
"""Parses options from the given JSON file.""" """Parses options from the given JSON file."""
@ -148,19 +148,19 @@ class Command(BaseCommand):
return options return options
def update_domain_information(self, desired_objects: List[TransitionDomain], debug): def prepare_update_domain_information(self, target_transition_domains: List[TransitionDomain], debug):
di_to_update = [] """Returns an array of DomainInformation objects with updated organization data."""
di_failed_to_update = [] if len(target_transition_domains) == 0:
di_skipped = [] raise LoadOrganizationError(code=LoadOrganizationErrorCodes.EMPTY_TRANSITION_DOMAIN_TABLE)
# Grab each TransitionDomain we want to change. # Grab each TransitionDomain we want to change.
transition_domains = TransitionDomain.objects.filter( transition_domains = TransitionDomain.objects.filter(
username__in=[item.username for item in desired_objects], username__in=[item.username for item in target_transition_domains],
domain_name__in=[item.domain_name for item in desired_objects], domain_name__in=[item.domain_name for item in target_transition_domains],
) )
# This indicates that some form of data corruption happened. # This indicates that some form of data corruption happened.
if len(desired_objects) != len(transition_domains): if len(target_transition_domains) != len(transition_domains):
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.TRANSITION_DOMAINS_NOT_FOUND) raise LoadOrganizationError(code=LoadOrganizationErrorCodes.TRANSITION_DOMAINS_NOT_FOUND)
# Start with all DomainInformation objects # Start with all DomainInformation objects
@ -178,35 +178,84 @@ class Command(BaseCommand):
state_territory__isnull=True, state_territory__isnull=True,
zipcode__isnull=True, zipcode__isnull=True,
) )
filtered_domain_informations_dict = {di.domain.name: di for di in domain_informations if di.domain is not None} filtered_domain_informations_dict = {di.domain.name: di for di in domain_informations if di.domain is not None}
# === Create DomainInformation objects === #
for item in transition_domains: for item in transition_domains:
if item.domain_name not in domain_informations_dict: self.map_transition_domain_to_domain_information(
logger.error(f"Could not add {item.domain_name}. Domain does not exist.") item, domain_informations_dict, filtered_domain_informations_dict, debug
di_failed_to_update.append(item) )
continue
if item.address_line is None and item.city is None and item.state_territory and item.zipcode is None: # === Log results and return data === #
logger.info( if len(self.domains_failed_to_update) > 0:
f"{TerminalColors.YELLOW}" failed = [item.domain_name for item in self.domains_failed_to_update]
f"Domain {item.domain_name} has no Organization Data. Cannot update." logger.error(
f"{TerminalColors.ENDC}" f"""{TerminalColors.FAIL}
) Failed to update. An exception was encountered on the following Domains: {failed}
di_skipped.append(item) {TerminalColors.ENDC}"""
continue )
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.UPDATE_DOMAIN_INFO_FAILED)
if item.domain_name not in filtered_domain_informations_dict: if debug:
logger.info( logger.info(f"Updating these DomainInformations: {[item for item in self.domain_information_to_update]}")
f"{TerminalColors.YELLOW}"
f"Domain {item.domain_name} was updated by a user. Cannot update."
f"{TerminalColors.ENDC}"
)
di_skipped.append(item)
continue
if len(self.domains_skipped) > 0:
logger.info(
f"Skipped updating {len(self.domains_skipped)} fields. User-supplied data exists, or there is no data."
)
logger.info(f"Ready to update {len(self.domain_information_to_update)} DomainInformations.")
return self.domain_information_to_update
def bulk_update_domain_information(self, debug):
"""Performs a bulk_update operation on a list of DomainInformation objects"""
# Create a Paginator object. Bulk_update on the full dataset
# is too memory intensive for our current app config, so we can chunk this data instead.
batch_size = 1000
paginator = Paginator(self.domain_information_to_update, batch_size)
for page_num in paginator.page_range:
page = paginator.page(page_num)
DomainInformation.objects.bulk_update(page.object_list, self.changed_fields)
if debug:
logger.info(f"Updated these DomainInformations: {[item for item in self.domain_information_to_update]}")
logger.info(
f"{TerminalColors.OKGREEN}"
f"Updated {len(self.domain_information_to_update)} DomainInformations."
f"{TerminalColors.ENDC}"
)
def map_transition_domain_to_domain_information(
self, item, domain_informations_dict, filtered_domain_informations_dict, debug
):
"""Attempts to return a DomainInformation object based on values from TransitionDomain.
Any domains which cannot be updated will be stored in an array.
"""
does_not_exist: bool = self.is_domain_name_missing(item, domain_informations_dict)
all_fields_are_none: bool = self.is_organization_data_missing(item)
user_updated_field: bool = self.is_domain_name_missing(item, filtered_domain_informations_dict)
if does_not_exist:
logger.error(f"Could not add {item.domain_name}. Domain does not exist.")
self.domains_failed_to_update.append(item)
elif all_fields_are_none:
logger.info(
f"{TerminalColors.YELLOW}"
f"Domain {item.domain_name} has no Organization Data. Cannot update."
f"{TerminalColors.ENDC}"
)
self.domains_skipped.append(item)
elif user_updated_field:
logger.info(
f"{TerminalColors.YELLOW}"
f"Domain {item.domain_name} was updated by a user. Cannot update."
f"{TerminalColors.ENDC}"
)
self.domains_skipped.append(item)
else:
# Based on the current domain, grab the right DomainInformation object. # Based on the current domain, grab the right DomainInformation object.
current_domain_information = filtered_domain_informations_dict[item.domain_name] current_domain_information = filtered_domain_informations_dict[item.domain_name]
if current_domain_information.domain is None or current_domain_information.domain.name is None: if current_domain_information.domain is None or current_domain_information.domain.name is None:
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.DOMAIN_NAME_WAS_NONE) raise LoadOrganizationError(code=LoadOrganizationErrorCodes.DOMAIN_NAME_WAS_NONE)
@ -215,51 +264,16 @@ class Command(BaseCommand):
current_domain_information.city = item.city current_domain_information.city = item.city
current_domain_information.state_territory = item.state_territory current_domain_information.state_territory = item.state_territory
current_domain_information.zipcode = item.zipcode current_domain_information.zipcode = item.zipcode
self.domain_information_to_update.append(current_domain_information)
di_to_update.append(current_domain_information)
if debug: if debug:
logger.info(f"Updated {current_domain_information.domain.name}...") logger.info(f"Updated {current_domain_information.domain.name}...")
if di_failed_to_update: def is_domain_name_missing(self, item: TransitionDomain, domain_informations_dict):
failed = [item.domain_name for item in di_failed_to_update] """Checks if domain_name is in the supplied dictionary"""
logger.error( return item.domain_name not in domain_informations_dict
f"""{TerminalColors.FAIL}
Failed to update. An exception was encountered on the following DomainInformations: {failed}
{TerminalColors.ENDC}"""
)
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.UPDATE_DOMAIN_INFO_FAILED)
if di_skipped: def is_organization_data_missing(self, item: TransitionDomain):
logger.info(f"Skipped updating {len(di_skipped)} fields. User-supplied data exists, or there is no data.") """Checks if all desired Organization fields to update are none"""
fields = [item.address_line, item.city, item.state_territory, item.zipcode]
self.bulk_update_domain_information(di_to_update, debug) return all(field is None for field in fields)
def bulk_update_domain_information(self, di_to_update, debug):
if debug:
logger.info(f"Updating these DomainInformations: {[item for item in di_to_update]}")
logger.info(f"Ready to update {len(di_to_update)} DomainInformations.")
logger.info(f"{TerminalColors.MAGENTA}" "Beginning mass DomainInformation update..." f"{TerminalColors.ENDC}")
changed_fields = [
"address_line1",
"city",
"state_territory",
"zipcode",
]
batch_size = 1000
# Create a Paginator object. Bulk_update on the full dataset
# is too memory intensive for our current app config, so we can chunk this data instead.
paginator = Paginator(di_to_update, batch_size)
for page_num in paginator.page_range:
page = paginator.page(page_num)
DomainInformation.objects.bulk_update(page.object_list, changed_fields)
if debug:
logger.info(f"Updated these DomainInformations: {[item for item in di_to_update]}")
logger.info(
f"{TerminalColors.OKGREEN}" f"Updated {len(di_to_update)} DomainInformations." f"{TerminalColors.ENDC}"
)

View file

@ -963,44 +963,45 @@ class ExtraTransitionDomain:
# metadata about each file and associate it with an enum. # metadata about each file and associate it with an enum.
# That way if we want the data located at the agency_adhoc file, # That way if we want the data located at the agency_adhoc file,
# we can just call EnumFilenames.AGENCY_ADHOC. # we can just call EnumFilenames.AGENCY_ADHOC.
options.pattern_map_params = [ if options.pattern_map_params is None or options.pattern_map_params == []:
( options.pattern_map_params = [
EnumFilenames.AGENCY_ADHOC, (
options.agency_adhoc_filename, EnumFilenames.AGENCY_ADHOC,
AgencyAdhoc, options.agency_adhoc_filename,
"agencyid", AgencyAdhoc,
), "agencyid",
( ),
EnumFilenames.DOMAIN_ADDITIONAL, (
options.domain_additional_filename, EnumFilenames.DOMAIN_ADDITIONAL,
DomainAdditionalData, options.domain_additional_filename,
"domainname", DomainAdditionalData,
), "domainname",
( ),
EnumFilenames.DOMAIN_ESCROW, (
options.domain_escrow_filename, EnumFilenames.DOMAIN_ESCROW,
DomainEscrow, options.domain_escrow_filename,
"domainname", DomainEscrow,
), "domainname",
( ),
EnumFilenames.DOMAIN_ADHOC, (
options.domain_adhoc_filename, EnumFilenames.DOMAIN_ADHOC,
DomainTypeAdhoc, options.domain_adhoc_filename,
"domaintypeid", DomainTypeAdhoc,
), "domaintypeid",
( ),
EnumFilenames.ORGANIZATION_ADHOC, (
options.organization_adhoc_filename, EnumFilenames.ORGANIZATION_ADHOC,
OrganizationAdhoc, options.organization_adhoc_filename,
"orgid", OrganizationAdhoc,
), "orgid",
( ),
EnumFilenames.AUTHORITY_ADHOC, (
options.authority_adhoc_filename, EnumFilenames.AUTHORITY_ADHOC,
AuthorityAdhoc, options.authority_adhoc_filename,
"authorityid", AuthorityAdhoc,
), "authorityid",
] ),
]
self.file_data = self.populate_file_data(options.pattern_map_params) self.file_data = self.populate_file_data(options.pattern_map_params)

View file

@ -18,7 +18,7 @@ class TransitionDomainArguments:
# Maintains an internal kwargs list and sets values # Maintains an internal kwargs list and sets values
# that match the class definition. # that match the class definition.
def __init__(self, **kwargs): def __init__(self, **kwargs):
self.kwargs = kwargs self.pattern_map_params = kwargs.get("pattern_map_params", [])
for k, v in kwargs.items(): for k, v in kwargs.items():
if hasattr(self, k): if hasattr(self, k):
setattr(self, k, v) setattr(self, k, v)

View file

@ -27,17 +27,6 @@ class TestOrganizationMigration(TestCase):
# self.master_script = "load_transition_domain", # self.master_script = "load_transition_domain",
self.test_data_file_location = "registrar/tests/data" self.test_data_file_location = "registrar/tests/data"
self.test_domain_contact_filename = "test_domain_contacts.txt"
self.test_contact_filename = "test_contacts.txt"
self.test_domain_status_filename = "test_domain_statuses.txt"
# Files for parsing additional TransitionDomain data
self.test_agency_adhoc_filename = "test_agency_adhoc.txt"
self.test_authority_adhoc_filename = "test_authority_adhoc.txt"
self.test_domain_additional = "test_domain_additional.txt"
self.test_domain_types_adhoc = "test_domain_types_adhoc.txt"
self.test_escrow_domains_daily = "test_escrow_domains_daily"
self.test_organization_adhoc = "test_organization_adhoc.txt"
self.migration_json_filename = "test_migrationFilepaths.json" self.migration_json_filename = "test_migrationFilepaths.json"
def tearDown(self): def tearDown(self):