diff --git a/docs/operations/data_migration.md b/docs/operations/data_migration.md index cc711cea7..2ce34dc76 100644 --- a/docs/operations/data_migration.md +++ b/docs/operations/data_migration.md @@ -441,3 +441,55 @@ purposes. Used by the migration scripts to trigger a prompt for deleting all table entries. Useful for testing purposes, but *use with caution* + +## Import organization data +During MVP, our import scripts did not populate the following fields: `address_line, city, state_territory, and zipcode` for organization address in Domain Information. This was primarily due to time constraints. Because of this, we need to run a follow-on script to load this remaining data on each `DomainInformation` object. + +This script is intended to run under the assumption that the [load_transition_domain](#step-1-load-transition-domains) and the [transfer_transition_domains_to_domains](#step-2-transfer-transition-domain-data-into-main-domain-tables) scripts have already been ran. + +##### LOCAL COMMAND +to run this command locally, enter the following: +```shell +docker compose run -T app ./manage.py load_organization_data {filename_of_migration_json} --debug +``` +* filename_of_migration_filepath_json - This is a [JSON containing a list of filenames](#step-2-obtain-json-file-for-file-locations). This same file was used in the preceeding steps, `load_transition_domain` and `transfer_transition_domains_to_domains`, however, this script only needs two fields: +``` +{ + "domain_additional_filename": "example.domainadditionaldatalink.adhoc.dotgov.txt", + "organization_adhoc_filename": "example.organization.adhoc.dotgov.txt" +} +``` +If you already possess the old JSON, you do not need to modify it. This script can run even if you specify multiple filepaths. It will just skip over unused ones. + +**Example** +```shell +docker compose run -T app ./manage.py load_organization_data migrationFilepaths.json --debug +``` + +##### SANDBOX COMMAND +```shell +./manage.py load_organization_data {filename_of_migration_json} --debug +``` +* **filename_of_migration_filepath_json** - This is a [JSON containing a list of filenames](#step-2-obtain-json-file-for-file-locations). This same file was used in the preceeding steps, `load_transition_domain` and `transfer_transition_domains_to_domains`, however, this script only needs two fields: +``` +{ + "domain_additional_filename": "example.domainadditionaldatalink.adhoc.dotgov.txt", + "organization_adhoc_filename": "example.organization.adhoc.dotgov.txt" +} +``` +If you already possess the old JSON, you do not need to modify it. This script can run even if you specify multiple filepaths. It will just skip over unused ones. + +**Example** +```shell +./manage.py load_organization_data migrationFilepaths.json --debug +``` + +##### Optional parameters +The `load_organization_data` script has five optional parameters. These are as follows: +| | Parameter | Description | +|:-:|:---------------------------------|:----------------------------------------------------------------------------| +| 1 | **sep** | Determines the file separator. Defaults to "\|" | +| 2 | **debug** | Increases logging detail. Defaults to False | +| 3 | **directory** | Specifies the directory containing the files that will be parsed. Defaults to "migrationdata" | +| 4 | **domain_additional_filename** | Specifies the filename of domain_additional. Used as an override for the JSON. Has no default. | +| 5 | **organization_adhoc_filename** | Specifies the filename of organization_adhoc. Used as an override for the JSON. Has no default. | diff --git a/src/registrar/management/commands/load_organization_data.py b/src/registrar/management/commands/load_organization_data.py new file mode 100644 index 000000000..122795400 --- /dev/null +++ b/src/registrar/management/commands/load_organization_data.py @@ -0,0 +1,283 @@ +"""Data migration: Load organization data for TransitionDomain and DomainInformation objects""" + +import argparse +import json +import logging +import os + +from django.core.management import BaseCommand +from registrar.management.commands.utility.extra_transition_domain_helper import OrganizationDataLoader +from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper +from registrar.management.commands.utility.transition_domain_arguments import TransitionDomainArguments +from registrar.models import TransitionDomain, DomainInformation +from django.core.paginator import Paginator +from typing import List +from registrar.models.domain import Domain + +from registrar.management.commands.utility.load_organization_error import ( + LoadOrganizationError, + LoadOrganizationErrorCodes, +) + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Load organization data on TransitionDomain and DomainInformation objects" + + def __init__(self): + super().__init__() + self.domain_information_to_update: List[DomainInformation] = [] + + # Stores the domain_name for logging purposes + self.domains_failed_to_update: List[str] = [] + self.domains_skipped: List[str] = [] + + self.changed_fields = [ + "address_line1", + "city", + "state_territory", + "zipcode", + ] + + def add_arguments(self, parser): + """Add command line arguments.""" + + parser.add_argument( + "migration_json_filename", + help=("A JSON file that holds the location and filenames" "of all the data files used for migrations"), + ) + + parser.add_argument("--sep", default="|", help="Delimiter character") + + parser.add_argument("--debug", action=argparse.BooleanOptionalAction) + + parser.add_argument("--directory", default="migrationdata", help="Desired directory") + + def handle(self, migration_json_filename, **options): + """Load organization address data into the TransitionDomain + and DomainInformation tables by using the organization adhoc file and domain_additional file""" + # Parse JSON file + options = self.load_json_settings(options, migration_json_filename) + org_args = TransitionDomainArguments(**options) + + # Will sys.exit() when prompt is "n" + TerminalHelper.prompt_for_execution( + system_exit_on_terminate=True, + info_to_inspect=f""" + ==Master data file== + domain_additional_filename: {org_args.domain_additional_filename} + + ==Organization data== + organization_adhoc_filename: {org_args.organization_adhoc_filename} + + ==Containing directory== + directory: {org_args.directory} + """, + prompt_title="Do you wish to load organization data for TransitionDomains?", + ) + + org_load_helper = OrganizationDataLoader(org_args) + transition_domains = org_load_helper.update_organization_data_for_all() + + # Reprompt the user to reinspect before updating DomainInformation + # Will sys.exit() when prompt is "n" + TerminalHelper.prompt_for_execution( + system_exit_on_terminate=True, + info_to_inspect=f""" + ==Master data file== + domain_additional_filename: {org_args.domain_additional_filename} + + ==Organization name information== + organization_adhoc_filename: {org_args.organization_adhoc_filename} + + ==Containing directory== + directory: {org_args.directory} + + ==Proposed Changes== + Number of DomainInformation objects to (potentially) change: {len(transition_domains)} + For each DomainInformation, modify the following fields: {self.changed_fields} + """, + prompt_title="Do you wish to update organization address data for DomainInformation as well?", + ) + + logger.info( + f"{TerminalColors.MAGENTA}" + "Preparing to load organization data onto DomainInformation tables..." + f"{TerminalColors.ENDC}" + ) + self.prepare_update_domain_information(transition_domains, org_args.debug) + + logger.info(f"{TerminalColors.MAGENTA}" f"Beginning mass DomainInformation update..." f"{TerminalColors.ENDC}") + self.bulk_update_domain_information(org_args.debug) + + def load_json_settings(self, options, migration_json_filename): + """Parses options from the given JSON file.""" + json_filepath = os.path.join(options["directory"], migration_json_filename) + + # If a JSON was provided, use its values instead of defaults. + with open(json_filepath, "r") as jsonFile: + # load JSON object as a dictionary + try: + data = json.load(jsonFile) + + skipped_fields = ["domain_additional_filename", "organization_adhoc_filename"] + # Iterate over the data from the JSON file. Skip any unused values. + for key, value in data.items(): + if value is not None and value.strip() != "": + # If any key in skipped_fields has a value, then + # we override what is specified in the JSON. + if options not in skipped_fields: + options[key] = value + + except Exception as err: + logger.error( + f"{TerminalColors.FAIL}" + "There was an error loading " + "the JSON responsible for providing filepaths." + f"{TerminalColors.ENDC}" + ) + raise err + + return options + + def prepare_update_domain_information(self, target_transition_domains: List[TransitionDomain], debug): + """Returns an array of DomainInformation objects with updated organization data.""" + if len(target_transition_domains) == 0: + raise LoadOrganizationError(code=LoadOrganizationErrorCodes.EMPTY_TRANSITION_DOMAIN_TABLE) + + # Grab each TransitionDomain we want to change. + transition_domains = TransitionDomain.objects.filter( + username__in=[item.username for item in target_transition_domains], + domain_name__in=[item.domain_name for item in target_transition_domains], + ) + + # This indicates that some form of data corruption happened. + if len(target_transition_domains) != len(transition_domains): + raise LoadOrganizationError(code=LoadOrganizationErrorCodes.TRANSITION_DOMAINS_NOT_FOUND) + + # Maps TransitionDomain <--> DomainInformation. + # If any related organization fields have been updated, + # we can assume that they modified this information themselves - thus we should not update it. + domain_informations = DomainInformation.objects.filter( + domain__name__in=[td.domain_name for td in transition_domains], + address_line1__isnull=True, + city__isnull=True, + state_territory__isnull=True, + zipcode__isnull=True, + ) + filtered_domain_informations_dict = {di.domain.name: di for di in domain_informations if di.domain is not None} + + # === Create DomainInformation objects === # + for item in transition_domains: + self.map_transition_domain_to_domain_information(item, filtered_domain_informations_dict, debug) + + # === Log results and return data === # + if len(self.domains_failed_to_update) > 0: + logger.error( + f"""{TerminalColors.FAIL} + Failed to update. An exception was encountered on the following Domains: {self.domains_failed_to_update} + {TerminalColors.ENDC}""" + ) + raise LoadOrganizationError(code=LoadOrganizationErrorCodes.UPDATE_DOMAIN_INFO_FAILED) + + if debug: + logger.info(f"Updating these DomainInformations: {[item for item in self.domain_information_to_update]}") + + if len(self.domains_skipped) > 0: + logger.info(f"Skipped these fields: {self.domains_skipped}") + logger.info( + f"{TerminalColors.YELLOW}" + f"Skipped updating {len(self.domains_skipped)} fields. User-supplied data exists, or there is no data." + f"{TerminalColors.ENDC}" + ) + + logger.info(f"Ready to update {len(self.domain_information_to_update)} DomainInformations.") + + return self.domain_information_to_update + + def bulk_update_domain_information(self, debug): + """Performs a bulk_update operation on a list of DomainInformation objects""" + # Create a Paginator object. Bulk_update on the full dataset + # is too memory intensive for our current app config, so we can chunk this data instead. + batch_size = 1000 + paginator = Paginator(self.domain_information_to_update, batch_size) + for page_num in paginator.page_range: + page = paginator.page(page_num) + DomainInformation.objects.bulk_update(page.object_list, self.changed_fields) + + if debug: + logger.info(f"Updated these DomainInformations: {[item for item in self.domain_information_to_update]}") + + logger.info( + f"{TerminalColors.OKGREEN}" + f"Updated {len(self.domain_information_to_update)} DomainInformations." + f"{TerminalColors.ENDC}" + ) + + def map_transition_domain_to_domain_information(self, item, domain_informations_dict, debug): + """Attempts to return a DomainInformation object based on values from TransitionDomain. + Any domains which cannot be updated will be stored in an array. + """ + does_not_exist: bool = self.is_domain_name_missing(item, domain_informations_dict) + all_fields_are_none: bool = self.is_organization_data_missing(item) + if does_not_exist: + self.handle_if_domain_name_missing(item.domain_name) + elif all_fields_are_none: + logger.info( + f"{TerminalColors.YELLOW}" + f"Domain {item.domain_name} has no Organization Data. Cannot update." + f"{TerminalColors.ENDC}" + ) + self.domains_skipped.append(item.domain_name) + else: + # Based on the current domain, grab the right DomainInformation object. + current_domain_information = domain_informations_dict[item.domain_name] + if current_domain_information.domain is None or current_domain_information.domain.name is None: + raise LoadOrganizationError(code=LoadOrganizationErrorCodes.DOMAIN_NAME_WAS_NONE) + + # Update fields + current_domain_information.address_line1 = item.address_line + current_domain_information.city = item.city + current_domain_information.state_territory = item.state_territory + current_domain_information.zipcode = item.zipcode + self.domain_information_to_update.append(current_domain_information) + + if debug: + logger.info(f"Updated {current_domain_information.domain.name}...") + + def is_domain_name_missing(self, item: TransitionDomain, domain_informations_dict): + """Checks if domain_name is in the supplied dictionary""" + return item.domain_name not in domain_informations_dict + + def is_organization_data_missing(self, item: TransitionDomain): + """Checks if all desired Organization fields to update are none""" + fields = [item.address_line, item.city, item.state_territory, item.zipcode] + return all(field is None for field in fields) + + def handle_if_domain_name_missing(self, domain_name): + """ + Infers what to log if we can't find a domain_name and updates the relevant lists. + + This function performs the following checks: + 1. If the domain does not exist, it logs an error and adds the domain name to the `domains_failed_to_update` list. + 2. If the domain was updated by a user, it logs an info message and adds the domain name to the `domains_skipped` list. + 3. If there are duplicate domains, it logs an error and adds the domain name to the `domains_failed_to_update` list. + + Args: + domain_name (str): The name of the domain to check. + """ # noqa - E501 (harder to read) + domains = Domain.objects.filter(name=domain_name) + if domains.count() == 0: + logger.error(f"Could not add {domain_name}. Domain does not exist.") + self.domains_failed_to_update.append(domain_name) + elif domains.count() == 1: + logger.info( + f"{TerminalColors.YELLOW}" + f"Domain {domain_name} was updated by a user. Cannot update." + f"{TerminalColors.ENDC}" + ) + self.domains_skipped.append(domain_name) + else: + logger.error(f"Could not add {domain_name}. Duplicate domains exist.") + self.domains_failed_to_update.append(domain_name) diff --git a/src/registrar/management/commands/utility/extra_transition_domain_helper.py b/src/registrar/management/commands/utility/extra_transition_domain_helper.py index 7961b47c1..04170811f 100644 --- a/src/registrar/management/commands/utility/extra_transition_domain_helper.py +++ b/src/registrar/management/commands/utility/extra_transition_domain_helper.py @@ -9,9 +9,13 @@ import logging import os import sys -from typing import Dict - +from typing import Dict, List +from django.core.paginator import Paginator from registrar.models.transition_domain import TransitionDomain +from registrar.management.commands.utility.load_organization_error import ( + LoadOrganizationError, + LoadOrganizationErrorCodes, +) from .epp_data_containers import ( AgencyAdhoc, @@ -752,6 +756,195 @@ class FileDataHolder: return (full_filename, can_infer) +class OrganizationDataLoader: + """Saves organization data onto Transition Domains. Handles file parsing.""" + + def __init__(self, options: TransitionDomainArguments): + self.debug = options.debug + + # We want to data from the domain_additional file and the organization_adhoc file + options.pattern_map_params = [ + ( + EnumFilenames.DOMAIN_ADDITIONAL, + options.domain_additional_filename, + DomainAdditionalData, + "domainname", + ), + ( + EnumFilenames.ORGANIZATION_ADHOC, + options.organization_adhoc_filename, + OrganizationAdhoc, + "orgid", + ), + ] + + # Reads and parses organization data + self.parsed_data = ExtraTransitionDomain(options) + + # options.infer_filenames will always be false when not SETTING.DEBUG + self.parsed_data.parse_all_files(options.infer_filenames) + + self.tds_to_update: List[TransitionDomain] = [] + + def update_organization_data_for_all(self): + """Updates org address data for all TransitionDomains""" + all_transition_domains = TransitionDomain.objects.all() + if len(all_transition_domains) == 0: + raise LoadOrganizationError(code=LoadOrganizationErrorCodes.EMPTY_TRANSITION_DOMAIN_TABLE) + + self.prepare_transition_domains(all_transition_domains) + + logger.info(f"{TerminalColors.MAGENTA}" "Beginning mass TransitionDomain update..." f"{TerminalColors.ENDC}") + self.bulk_update_transition_domains(self.tds_to_update) + + return self.tds_to_update + + def prepare_transition_domains(self, transition_domains): + """Parses org data for each transition domain, + then appends it to the tds_to_update list""" + for item in transition_domains: + updated = self.parse_org_data(item.domain_name, item) + self.tds_to_update.append(updated) + if self.debug: + logger.info( + f"""{TerminalColors.OKCYAN} + Successfully updated: + {item.display_transition_domain()} + {TerminalColors.ENDC}""" + ) + + if self.debug: + logger.info(f"Updating the following: {[item for item in self.tds_to_update]}") + + logger.info( + f"""{TerminalColors.MAGENTA} + Ready to update {len(self.tds_to_update)} TransitionDomains. + {TerminalColors.ENDC}""" + ) + + def bulk_update_transition_domains(self, update_list): + changed_fields = [ + "address_line", + "city", + "state_territory", + "zipcode", + ] + + batch_size = 1000 + # Create a Paginator object. Bulk_update on the full dataset + # is too memory intensive for our current app config, so we can chunk this data instead. + paginator = Paginator(update_list, batch_size) + for page_num in paginator.page_range: + page = paginator.page(page_num) + TransitionDomain.objects.bulk_update(page.object_list, changed_fields) + + if self.debug: + logger.info(f"Updated the following: {[item for item in self.tds_to_update]}") + + logger.info( + f"{TerminalColors.OKGREEN}" f"Updated {len(self.tds_to_update)} TransitionDomains." f"{TerminalColors.ENDC}" + ) + + def parse_org_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain: + """Grabs organization_name from the parsed files and associates it + with a transition_domain object, then updates that transition domain object and returns it""" + if not isinstance(transition_domain, TransitionDomain): + raise ValueError("Not a valid object, must be TransitionDomain") + + org_info = self.get_org_info(domain_name) + if org_info is None: + logger.error(f"Could not add organization data on {domain_name}, no data exists.") + return transition_domain + + # Add street info + transition_domain.address_line = org_info.orgstreet + transition_domain.city = org_info.orgcity + transition_domain.state_territory = org_info.orgstate + transition_domain.zipcode = org_info.orgzip + + if self.debug: + # Log what happened to each field. The first value + # is the field name that was updated, second is the value + changed_fields = [ + ("address_line", transition_domain.address_line), + ("city", transition_domain.city), + ("state_territory", transition_domain.state_territory), + ("zipcode", transition_domain.zipcode), + ] + self.log_add_or_changed_values(changed_fields, domain_name) + + return transition_domain + + def get_org_info(self, domain_name) -> OrganizationAdhoc | None: + """Maps an id given in get_domain_data to a organization_adhoc + record which has its corresponding definition""" + # Get a row in the domain_additional file. The id is the domain_name. + domain_additional_row = self.retrieve_row_by_id(EnumFilenames.DOMAIN_ADDITIONAL, domain_name) + if domain_additional_row is None: + return None + + # Get a row in the organization_adhoc file. The id is the orgid in domain_additional_row. + org_row = self.retrieve_row_by_id(EnumFilenames.ORGANIZATION_ADHOC, domain_additional_row.orgid) + return org_row + + def retrieve_row_by_id(self, file_type: EnumFilenames, desired_id): + """Returns a field in a dictionary based off the type and id. + + vars: + file_type: (constant) EnumFilenames -> Which data file to target. + An example would be `EnumFilenames.DOMAIN_ADHOC`. + + desired_id: str -> Which id you want to search on. + An example would be `"12"` or `"igorville.gov"` + """ + # Grabs a dict associated with the file_type. + # For example, EnumFilenames.DOMAIN_ADDITIONAL would map to + # whatever data exists on the domain_additional file. + desired_file = self.parsed_data.file_data.get(file_type) + if desired_file is None: + logger.error(f"Type {file_type} does not exist") + return None + + # This is essentially a dictionary of rows. + data_in_file = desired_file.data + + # Get a row in the given file, based on an id. + # For instance, "igorville.gov" in domain_additional. + row_in_file = data_in_file.get(desired_id) + if row_in_file is None: + logger.error(f"Id {desired_id} does not exist for {file_type.value[0]}") + + return row_in_file + + def log_add_or_changed_values(self, values_to_check, domain_name): + """Iterates through a list of fields, and determines if we should log + if the field was added or if the field was updated. + + A field is "updated" when it is not None or not "". + A field is "created" when it is either of those things. + + + """ + for field_name, value in values_to_check: + str_exists = value is not None and value.strip() != "" + # Logs if we either added to this property, + # or modified it. + self._add_or_change_message( + field_name, + value, + domain_name, + str_exists, + ) + + def _add_or_change_message(self, field_name, changed_value, domain_name, is_update=False): + """Creates a log instance when a property + is successfully changed on a given TransitionDomain.""" + if not is_update: + logger.info(f"Added {field_name} as '{changed_value}' on {domain_name}") + else: + logger.warning(f"Updated existing {field_name} to '{changed_value}' on {domain_name}") + + class ExtraTransitionDomain: """Helper class to aid in storing TransitionDomain data spread across multiple files.""" @@ -775,52 +968,47 @@ class ExtraTransitionDomain: # metadata about each file and associate it with an enum. # That way if we want the data located at the agency_adhoc file, # we can just call EnumFilenames.AGENCY_ADHOC. - pattern_map_params = [ - ( - EnumFilenames.AGENCY_ADHOC, - options.agency_adhoc_filename, - AgencyAdhoc, - "agencyid", - ), - ( - EnumFilenames.DOMAIN_ADDITIONAL, - options.domain_additional_filename, - DomainAdditionalData, - "domainname", - ), - ( - EnumFilenames.DOMAIN_ESCROW, - options.domain_escrow_filename, - DomainEscrow, - "domainname", - ), - ( - EnumFilenames.DOMAIN_ADHOC, - options.domain_adhoc_filename, - DomainTypeAdhoc, - "domaintypeid", - ), - ( - EnumFilenames.ORGANIZATION_ADHOC, - options.organization_adhoc_filename, - OrganizationAdhoc, - "orgid", - ), - ( - EnumFilenames.AUTHORITY_ADHOC, - options.authority_adhoc_filename, - AuthorityAdhoc, - "authorityid", - ), - ( - EnumFilenames.AUTHORITY_ADHOC, - options.authority_adhoc_filename, - AuthorityAdhoc, - "authorityid", - ), - ] + if options.pattern_map_params is None or options.pattern_map_params == []: + options.pattern_map_params = [ + ( + EnumFilenames.AGENCY_ADHOC, + options.agency_adhoc_filename, + AgencyAdhoc, + "agencyid", + ), + ( + EnumFilenames.DOMAIN_ADDITIONAL, + options.domain_additional_filename, + DomainAdditionalData, + "domainname", + ), + ( + EnumFilenames.DOMAIN_ESCROW, + options.domain_escrow_filename, + DomainEscrow, + "domainname", + ), + ( + EnumFilenames.DOMAIN_ADHOC, + options.domain_adhoc_filename, + DomainTypeAdhoc, + "domaintypeid", + ), + ( + EnumFilenames.ORGANIZATION_ADHOC, + options.organization_adhoc_filename, + OrganizationAdhoc, + "orgid", + ), + ( + EnumFilenames.AUTHORITY_ADHOC, + options.authority_adhoc_filename, + AuthorityAdhoc, + "authorityid", + ), + ] - self.file_data = self.populate_file_data(pattern_map_params) + self.file_data = self.populate_file_data(options.pattern_map_params) # TODO - revise comment def populate_file_data(self, pattern_map_params): diff --git a/src/registrar/management/commands/utility/load_organization_error.py b/src/registrar/management/commands/utility/load_organization_error.py new file mode 100644 index 000000000..1849558c4 --- /dev/null +++ b/src/registrar/management/commands/utility/load_organization_error.py @@ -0,0 +1,39 @@ +from enum import IntEnum + + +class LoadOrganizationErrorCodes(IntEnum): + """Used when running the load_organization_data script + Overview of error codes: + - 1 TRANSITION_DOMAINS_NOT_FOUND + - 2 UPDATE_DOMAIN_INFO_FAILED + - 3 EMPTY_TRANSITION_DOMAIN_TABLE + """ + + TRANSITION_DOMAINS_NOT_FOUND = 1 + UPDATE_DOMAIN_INFO_FAILED = 2 + EMPTY_TRANSITION_DOMAIN_TABLE = 3 + DOMAIN_NAME_WAS_NONE = 4 + + +class LoadOrganizationError(Exception): + """ + Error class used in the load_organization_data script + """ + + _error_mapping = { + LoadOrganizationErrorCodes.TRANSITION_DOMAINS_NOT_FOUND: ( + "Could not find all desired TransitionDomains. " "(Possible data corruption?)" + ), + LoadOrganizationErrorCodes.UPDATE_DOMAIN_INFO_FAILED: "Failed to update DomainInformation", + LoadOrganizationErrorCodes.EMPTY_TRANSITION_DOMAIN_TABLE: "No TransitionDomains exist. Cannot update.", + LoadOrganizationErrorCodes.DOMAIN_NAME_WAS_NONE: "DomainInformation was updated, but domain was None", + } + + def __init__(self, *args, code=None, **kwargs): + super().__init__(*args, **kwargs) + self.code = code + if self.code in self._error_mapping: + self.message = self._error_mapping.get(self.code) + + def __str__(self): + return f"{self.message}" diff --git a/src/registrar/management/commands/utility/transition_domain_arguments.py b/src/registrar/management/commands/utility/transition_domain_arguments.py index 56425a7b7..1a57d0d2d 100644 --- a/src/registrar/management/commands/utility/transition_domain_arguments.py +++ b/src/registrar/management/commands/utility/transition_domain_arguments.py @@ -18,7 +18,7 @@ class TransitionDomainArguments: # Maintains an internal kwargs list and sets values # that match the class definition. def __init__(self, **kwargs): - self.kwargs = kwargs + self.pattern_map_params = kwargs.get("pattern_map_params", []) for k, v in kwargs.items(): if hasattr(self, k): setattr(self, k, v) @@ -36,13 +36,13 @@ class TransitionDomainArguments: limitParse: Optional[int] = field(default=None, repr=True) # Filenames # - # = Adhocs =# + # = Adhocs = # agency_adhoc_filename: Optional[str] = field(default=EnumFilenames.AGENCY_ADHOC.value[1], repr=True) domain_adhoc_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADHOC.value[1], repr=True) organization_adhoc_filename: Optional[str] = field(default=EnumFilenames.ORGANIZATION_ADHOC.value[1], repr=True) authority_adhoc_filename: Optional[str] = field(default=EnumFilenames.AUTHORITY_ADHOC.value[1], repr=True) - # = Data files =# + # = Data files = # domain_escrow_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ESCROW.value[1], repr=True) domain_additional_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADDITIONAL.value[1], repr=True) domain_contacts_filename: Optional[str] = field(default=None, repr=True) diff --git a/src/registrar/migrations/0047_transitiondomain_address_line_transitiondomain_city_and_more.py b/src/registrar/migrations/0047_transitiondomain_address_line_transitiondomain_city_and_more.py new file mode 100644 index 000000000..a312f62f2 --- /dev/null +++ b/src/registrar/migrations/0047_transitiondomain_address_line_transitiondomain_city_and_more.py @@ -0,0 +1,32 @@ +# Generated by Django 4.2.7 on 2023-11-16 19:56 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("registrar", "0046_transitiondomain_email_transitiondomain_first_name_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="transitiondomain", + name="address_line", + field=models.TextField(blank=True, help_text="Street address", null=True), + ), + migrations.AddField( + model_name="transitiondomain", + name="city", + field=models.TextField(blank=True, help_text="City", null=True), + ), + migrations.AddField( + model_name="transitiondomain", + name="state_territory", + field=models.CharField(blank=True, help_text="State, territory, or military post", max_length=2, null=True), + ), + migrations.AddField( + model_name="transitiondomain", + name="zipcode", + field=models.CharField(blank=True, db_index=True, help_text="Zip code", max_length=10, null=True), + ), + ] diff --git a/src/registrar/models/transition_domain.py b/src/registrar/models/transition_domain.py index 3f1c8d641..c5b9b125c 100644 --- a/src/registrar/models/transition_domain.py +++ b/src/registrar/models/transition_domain.py @@ -105,6 +105,29 @@ class TransitionDomain(TimeStampedModel): blank=True, help_text="Phone", ) + address_line = models.TextField( + null=True, + blank=True, + help_text="Street address", + ) + city = models.TextField( + null=True, + blank=True, + help_text="City", + ) + state_territory = models.CharField( + max_length=2, + null=True, + blank=True, + help_text="State, territory, or military post", + ) + zipcode = models.CharField( + max_length=10, + null=True, + blank=True, + help_text="Zip code", + db_index=True, + ) def __str__(self): return f"{self.username}, {self.domain_name}" @@ -128,4 +151,8 @@ class TransitionDomain(TimeStampedModel): f"last_name: {self.last_name}, \n" f"email: {self.email}, \n" f"phone: {self.phone}, \n" + f"address_line: {self.address_line}, \n" + f"city: {self.city}, \n" + f"state_territory: {self.state_territory}, \n" + f"zipcode: {self.zipcode}, \n" ) diff --git a/src/registrar/tests/test_transition_domain_migrations.py b/src/registrar/tests/test_transition_domain_migrations.py index c6418f013..91625207d 100644 --- a/src/registrar/tests/test_transition_domain_migrations.py +++ b/src/registrar/tests/test_transition_domain_migrations.py @@ -16,9 +16,316 @@ from registrar.models import ( from django.core.management import call_command from unittest.mock import patch +from registrar.models.contact import Contact + from .common import less_console_noise +class TestOrganizationMigration(TestCase): + def setUp(self): + """Defines the file name of migration_json and the folder its contained in""" + self.test_data_file_location = "registrar/tests/data" + self.migration_json_filename = "test_migrationFilepaths.json" + + def tearDown(self): + """Deletes all DB objects related to migrations""" + # Delete domain information + Domain.objects.all().delete() + DomainInformation.objects.all().delete() + DomainInvitation.objects.all().delete() + TransitionDomain.objects.all().delete() + + # Delete users + User.objects.all().delete() + UserDomainRole.objects.all().delete() + + def run_load_domains(self): + """ + This method executes the load_transition_domain command. + + It uses 'unittest.mock.patch' to mock the TerminalHelper.query_yes_no_exit method, + which is a user prompt in the terminal. The mock function always returns True, + allowing the test to proceed without manual user input. + + The 'call_command' function from Django's management framework is then used to + execute the load_transition_domain command with the specified arguments. + """ + # noqa here because splitting this up makes it confusing. + # ES501 + with patch( + "registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa + return_value=True, + ): + call_command( + "load_transition_domain", + self.migration_json_filename, + directory=self.test_data_file_location, + ) + + def run_transfer_domains(self): + """ + This method executes the transfer_transition_domains_to_domains command. + + The 'call_command' function from Django's management framework is then used to + execute the load_transition_domain command with the specified arguments. + """ + call_command("transfer_transition_domains_to_domains") + + def run_load_organization_data(self): + """ + This method executes the load_organization_data command. + + It uses 'unittest.mock.patch' to mock the TerminalHelper.query_yes_no_exit method, + which is a user prompt in the terminal. The mock function always returns True, + allowing the test to proceed without manual user input. + + The 'call_command' function from Django's management framework is then used to + execute the load_organization_data command with the specified arguments. + """ + # noqa here (E501) because splitting this up makes it + # confusing to read. + with patch( + "registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa + return_value=True, + ): + call_command( + "load_organization_data", + self.migration_json_filename, + directory=self.test_data_file_location, + ) + + def compare_tables( + self, + expected_total_transition_domains, + expected_total_domains, + expected_total_domain_informations, + expected_missing_domains, + expected_duplicate_domains, + expected_missing_domain_informations, + ): + """Does a diff between the transition_domain and the following tables: + domain, domain_information and the domain_invitation. + Verifies that the data loaded correctly.""" + + missing_domains = [] + duplicate_domains = [] + missing_domain_informations = [] + for transition_domain in TransitionDomain.objects.all(): + transition_domain_name = transition_domain.domain_name + # Check Domain table + matching_domains = Domain.objects.filter(name=transition_domain_name) + # Check Domain Information table + matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name) + + if len(matching_domains) == 0: + missing_domains.append(transition_domain_name) + elif len(matching_domains) > 1: + duplicate_domains.append(transition_domain_name) + if len(matching_domain_informations) == 0: + missing_domain_informations.append(transition_domain_name) + + total_missing_domains = len(missing_domains) + total_duplicate_domains = len(duplicate_domains) + total_missing_domain_informations = len(missing_domain_informations) + + total_transition_domains = len(TransitionDomain.objects.all()) + total_domains = len(Domain.objects.all()) + total_domain_informations = len(DomainInformation.objects.all()) + + self.assertEqual(total_missing_domains, expected_missing_domains) + self.assertEqual(total_duplicate_domains, expected_duplicate_domains) + self.assertEqual(total_missing_domain_informations, expected_missing_domain_informations) + + self.assertEqual(total_transition_domains, expected_total_transition_domains) + self.assertEqual(total_domains, expected_total_domains) + self.assertEqual(total_domain_informations, expected_total_domain_informations) + + def test_load_organization_data_transition_domain(self): + """ + This test verifies the functionality of the load_organization_data method for TransitionDomain objects. + + The test follows these steps: + 1. Parses all existing data by running the load_domains and transfer_domains methods. + 2. Attempts to add organization data to the parsed data by running the load_organization_data method. + 3. Checks that the data has been loaded as expected. + + The expected result is a set of TransitionDomain objects with specific attributes. + The test fetches the actual TransitionDomain objects from the database and compares them with the expected objects. + """ # noqa - E501 (harder to read) + # == First, parse all existing data == # + self.run_load_domains() + self.run_transfer_domains() + + # == Second, try adding org data to it == # + self.run_load_organization_data() + + # == Third, test that we've loaded data as we expect == # + transition_domains = TransitionDomain.objects.filter(domain_name="fakewebsite2.gov") + + # Should return three objects (three unique emails) + self.assertEqual(transition_domains.count(), 3) + + # Lets test the first one + transition = transition_domains.first() + expected_transition_domain = TransitionDomain( + username="alexandra.bobbitt5@test.com", + domain_name="fakewebsite2.gov", + status="on hold", + email_sent=True, + organization_type="Federal", + organization_name="Fanoodle", + federal_type="Executive", + federal_agency="Department of Commerce", + epp_creation_date=datetime.date(2004, 5, 7), + epp_expiration_date=datetime.date(2023, 9, 30), + first_name="Seline", + middle_name="testmiddle2", + last_name="Tower", + title=None, + email="stower3@answers.com", + phone="151-539-6028", + address_line="93001 Arizona Drive", + city="Columbus", + state_territory="Oh", + zipcode="43268", + ) + expected_transition_domain.id = transition.id + + self.assertEqual(transition, expected_transition_domain) + + def test_load_organization_data_domain_information(self): + """ + This test verifies the functionality of the load_organization_data method. + + The test follows these steps: + 1. Parses all existing data by running the load_domains and transfer_domains methods. + 2. Attempts to add organization data to the parsed data by running the load_organization_data method. + 3. Checks that the data has been loaded as expected. + + The expected result is a DomainInformation object with specific attributes. + The test fetches the actual DomainInformation object from the database + and compares it with the expected object. + """ + # == First, parse all existing data == # + self.run_load_domains() + self.run_transfer_domains() + + # == Second, try adding org data to it == # + self.run_load_organization_data() + + # == Third, test that we've loaded data as we expect == # + _domain = Domain.objects.filter(name="fakewebsite2.gov").get() + domain_information = DomainInformation.objects.filter(domain=_domain).get() + + expected_creator = User.objects.filter(username="System").get() + expected_ao = Contact.objects.filter(first_name="Seline", middle_name="testmiddle2", last_name="Tower").get() + expected_domain_information = DomainInformation( + creator=expected_creator, + organization_type="federal", + federal_agency="Department of Commerce", + federal_type="executive", + organization_name="Fanoodle", + address_line1="93001 Arizona Drive", + city="Columbus", + state_territory="Oh", + zipcode="43268", + authorizing_official=expected_ao, + domain=_domain, + ) + # Given that these are different objects, this needs to be set + expected_domain_information.id = domain_information.id + self.assertEqual(domain_information, expected_domain_information) + + def test_load_organization_data_preserves_existing_data(self): + """ + This test verifies that the load_organization_data method does not overwrite existing data. + + The test follows these steps: + 1. Parses all existing data by running the load_domains and transfer_domains methods. + 2. Adds pre-existing fake data to a DomainInformation object and saves it to the database. + 3. Runs the load_organization_data method. + 4. Checks that the pre-existing data in the DomainInformation object has not been overwritten. + + The expected result is that the DomainInformation object retains its pre-existing data + after the load_organization_data method is run. + """ + # == First, parse all existing data == # + self.run_load_domains() + self.run_transfer_domains() + + # == Second, try add prexisting fake data == # + _domain_old = Domain.objects.filter(name="fakewebsite2.gov").get() + domain_information_old = DomainInformation.objects.filter(domain=_domain_old).get() + domain_information_old.address_line1 = "93001 Galactic Way" + domain_information_old.city = "Olympus" + domain_information_old.state_territory = "MA" + domain_information_old.zipcode = "12345" + domain_information_old.save() + + # == Third, try running the script == # + self.run_load_organization_data() + + # == Fourth, test that no data is overwritten as we expect == # + _domain = Domain.objects.filter(name="fakewebsite2.gov").get() + domain_information = DomainInformation.objects.filter(domain=_domain).get() + + expected_creator = User.objects.filter(username="System").get() + expected_ao = Contact.objects.filter(first_name="Seline", middle_name="testmiddle2", last_name="Tower").get() + expected_domain_information = DomainInformation( + creator=expected_creator, + organization_type="federal", + federal_agency="Department of Commerce", + federal_type="executive", + organization_name="Fanoodle", + address_line1="93001 Galactic Way", + city="Olympus", + state_territory="MA", + zipcode="12345", + authorizing_official=expected_ao, + domain=_domain, + ) + # Given that these are different objects, this needs to be set + expected_domain_information.id = domain_information.id + self.assertEqual(domain_information, expected_domain_information) + + def test_load_organization_data_integrity(self): + """ + This test verifies the data integrity after running the load_organization_data method. + + The test follows these steps: + 1. Parses all existing data by running the load_domains and transfer_domains methods. + 2. Attempts to add organization data to the parsed data by running the load_organization_data method. + 3. Checks that the data has not been corrupted by comparing the actual counts of objects in the database + with the expected counts. + + The expected result is that the counts of objects in the database + match the expected counts, indicating that the data has not been corrupted. + """ + # First, parse all existing data + self.run_load_domains() + self.run_transfer_domains() + + # Second, try adding org data to it + self.run_load_organization_data() + + # Third, test that we didn't corrupt any data + expected_total_transition_domains = 9 + expected_total_domains = 5 + expected_total_domain_informations = 5 + + expected_missing_domains = 0 + expected_duplicate_domains = 0 + expected_missing_domain_informations = 0 + self.compare_tables( + expected_total_transition_domains, + expected_total_domains, + expected_total_domain_informations, + expected_missing_domains, + expected_duplicate_domains, + expected_missing_domain_informations, + ) + + class TestMigrations(TestCase): def setUp(self): """ """ @@ -41,11 +348,12 @@ class TestMigrations(TestCase): self.migration_json_filename = "test_migrationFilepaths.json" def tearDown(self): + super().tearDown() # Delete domain information TransitionDomain.objects.all().delete() Domain.objects.all().delete() - DomainInvitation.objects.all().delete() DomainInformation.objects.all().delete() + DomainInvitation.objects.all().delete() # Delete users User.objects.all().delete()