Merge branch 'main' into dk/1138-prevent-domain-edits

This commit is contained in:
David Kennedy 2023-11-22 12:08:04 -05:00
commit eda9728177
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B
8 changed files with 980 additions and 51 deletions

View file

@ -441,3 +441,55 @@ purposes.
Used by the migration scripts to trigger a prompt for deleting all table entries.
Useful for testing purposes, but *use with caution*
## Import organization data
During MVP, our import scripts did not populate the following fields: `address_line, city, state_territory, and zipcode` for organization address in Domain Information. This was primarily due to time constraints. Because of this, we need to run a follow-on script to load this remaining data on each `DomainInformation` object.
This script is intended to run under the assumption that the [load_transition_domain](#step-1-load-transition-domains) and the [transfer_transition_domains_to_domains](#step-2-transfer-transition-domain-data-into-main-domain-tables) scripts have already been ran.
##### LOCAL COMMAND
to run this command locally, enter the following:
```shell
docker compose run -T app ./manage.py load_organization_data {filename_of_migration_json} --debug
```
* filename_of_migration_filepath_json - This is a [JSON containing a list of filenames](#step-2-obtain-json-file-for-file-locations). This same file was used in the preceeding steps, `load_transition_domain` and `transfer_transition_domains_to_domains`, however, this script only needs two fields:
```
{
"domain_additional_filename": "example.domainadditionaldatalink.adhoc.dotgov.txt",
"organization_adhoc_filename": "example.organization.adhoc.dotgov.txt"
}
```
If you already possess the old JSON, you do not need to modify it. This script can run even if you specify multiple filepaths. It will just skip over unused ones.
**Example**
```shell
docker compose run -T app ./manage.py load_organization_data migrationFilepaths.json --debug
```
##### SANDBOX COMMAND
```shell
./manage.py load_organization_data {filename_of_migration_json} --debug
```
* **filename_of_migration_filepath_json** - This is a [JSON containing a list of filenames](#step-2-obtain-json-file-for-file-locations). This same file was used in the preceeding steps, `load_transition_domain` and `transfer_transition_domains_to_domains`, however, this script only needs two fields:
```
{
"domain_additional_filename": "example.domainadditionaldatalink.adhoc.dotgov.txt",
"organization_adhoc_filename": "example.organization.adhoc.dotgov.txt"
}
```
If you already possess the old JSON, you do not need to modify it. This script can run even if you specify multiple filepaths. It will just skip over unused ones.
**Example**
```shell
./manage.py load_organization_data migrationFilepaths.json --debug
```
##### Optional parameters
The `load_organization_data` script has five optional parameters. These are as follows:
| | Parameter | Description |
|:-:|:---------------------------------|:----------------------------------------------------------------------------|
| 1 | **sep** | Determines the file separator. Defaults to "\|" |
| 2 | **debug** | Increases logging detail. Defaults to False |
| 3 | **directory** | Specifies the directory containing the files that will be parsed. Defaults to "migrationdata" |
| 4 | **domain_additional_filename** | Specifies the filename of domain_additional. Used as an override for the JSON. Has no default. |
| 5 | **organization_adhoc_filename** | Specifies the filename of organization_adhoc. Used as an override for the JSON. Has no default. |

View file

@ -0,0 +1,283 @@
"""Data migration: Load organization data for TransitionDomain and DomainInformation objects"""
import argparse
import json
import logging
import os
from django.core.management import BaseCommand
from registrar.management.commands.utility.extra_transition_domain_helper import OrganizationDataLoader
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from registrar.management.commands.utility.transition_domain_arguments import TransitionDomainArguments
from registrar.models import TransitionDomain, DomainInformation
from django.core.paginator import Paginator
from typing import List
from registrar.models.domain import Domain
from registrar.management.commands.utility.load_organization_error import (
LoadOrganizationError,
LoadOrganizationErrorCodes,
)
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Load organization data on TransitionDomain and DomainInformation objects"
def __init__(self):
super().__init__()
self.domain_information_to_update: List[DomainInformation] = []
# Stores the domain_name for logging purposes
self.domains_failed_to_update: List[str] = []
self.domains_skipped: List[str] = []
self.changed_fields = [
"address_line1",
"city",
"state_territory",
"zipcode",
]
def add_arguments(self, parser):
"""Add command line arguments."""
parser.add_argument(
"migration_json_filename",
help=("A JSON file that holds the location and filenames" "of all the data files used for migrations"),
)
parser.add_argument("--sep", default="|", help="Delimiter character")
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
parser.add_argument("--directory", default="migrationdata", help="Desired directory")
def handle(self, migration_json_filename, **options):
"""Load organization address data into the TransitionDomain
and DomainInformation tables by using the organization adhoc file and domain_additional file"""
# Parse JSON file
options = self.load_json_settings(options, migration_json_filename)
org_args = TransitionDomainArguments(**options)
# Will sys.exit() when prompt is "n"
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
==Master data file==
domain_additional_filename: {org_args.domain_additional_filename}
==Organization data==
organization_adhoc_filename: {org_args.organization_adhoc_filename}
==Containing directory==
directory: {org_args.directory}
""",
prompt_title="Do you wish to load organization data for TransitionDomains?",
)
org_load_helper = OrganizationDataLoader(org_args)
transition_domains = org_load_helper.update_organization_data_for_all()
# Reprompt the user to reinspect before updating DomainInformation
# Will sys.exit() when prompt is "n"
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
==Master data file==
domain_additional_filename: {org_args.domain_additional_filename}
==Organization name information==
organization_adhoc_filename: {org_args.organization_adhoc_filename}
==Containing directory==
directory: {org_args.directory}
==Proposed Changes==
Number of DomainInformation objects to (potentially) change: {len(transition_domains)}
For each DomainInformation, modify the following fields: {self.changed_fields}
""",
prompt_title="Do you wish to update organization address data for DomainInformation as well?",
)
logger.info(
f"{TerminalColors.MAGENTA}"
"Preparing to load organization data onto DomainInformation tables..."
f"{TerminalColors.ENDC}"
)
self.prepare_update_domain_information(transition_domains, org_args.debug)
logger.info(f"{TerminalColors.MAGENTA}" f"Beginning mass DomainInformation update..." f"{TerminalColors.ENDC}")
self.bulk_update_domain_information(org_args.debug)
def load_json_settings(self, options, migration_json_filename):
"""Parses options from the given JSON file."""
json_filepath = os.path.join(options["directory"], migration_json_filename)
# If a JSON was provided, use its values instead of defaults.
with open(json_filepath, "r") as jsonFile:
# load JSON object as a dictionary
try:
data = json.load(jsonFile)
skipped_fields = ["domain_additional_filename", "organization_adhoc_filename"]
# Iterate over the data from the JSON file. Skip any unused values.
for key, value in data.items():
if value is not None and value.strip() != "":
# If any key in skipped_fields has a value, then
# we override what is specified in the JSON.
if options not in skipped_fields:
options[key] = value
except Exception as err:
logger.error(
f"{TerminalColors.FAIL}"
"There was an error loading "
"the JSON responsible for providing filepaths."
f"{TerminalColors.ENDC}"
)
raise err
return options
def prepare_update_domain_information(self, target_transition_domains: List[TransitionDomain], debug):
"""Returns an array of DomainInformation objects with updated organization data."""
if len(target_transition_domains) == 0:
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.EMPTY_TRANSITION_DOMAIN_TABLE)
# Grab each TransitionDomain we want to change.
transition_domains = TransitionDomain.objects.filter(
username__in=[item.username for item in target_transition_domains],
domain_name__in=[item.domain_name for item in target_transition_domains],
)
# This indicates that some form of data corruption happened.
if len(target_transition_domains) != len(transition_domains):
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.TRANSITION_DOMAINS_NOT_FOUND)
# Maps TransitionDomain <--> DomainInformation.
# If any related organization fields have been updated,
# we can assume that they modified this information themselves - thus we should not update it.
domain_informations = DomainInformation.objects.filter(
domain__name__in=[td.domain_name for td in transition_domains],
address_line1__isnull=True,
city__isnull=True,
state_territory__isnull=True,
zipcode__isnull=True,
)
filtered_domain_informations_dict = {di.domain.name: di for di in domain_informations if di.domain is not None}
# === Create DomainInformation objects === #
for item in transition_domains:
self.map_transition_domain_to_domain_information(item, filtered_domain_informations_dict, debug)
# === Log results and return data === #
if len(self.domains_failed_to_update) > 0:
logger.error(
f"""{TerminalColors.FAIL}
Failed to update. An exception was encountered on the following Domains: {self.domains_failed_to_update}
{TerminalColors.ENDC}"""
)
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.UPDATE_DOMAIN_INFO_FAILED)
if debug:
logger.info(f"Updating these DomainInformations: {[item for item in self.domain_information_to_update]}")
if len(self.domains_skipped) > 0:
logger.info(f"Skipped these fields: {self.domains_skipped}")
logger.info(
f"{TerminalColors.YELLOW}"
f"Skipped updating {len(self.domains_skipped)} fields. User-supplied data exists, or there is no data."
f"{TerminalColors.ENDC}"
)
logger.info(f"Ready to update {len(self.domain_information_to_update)} DomainInformations.")
return self.domain_information_to_update
def bulk_update_domain_information(self, debug):
"""Performs a bulk_update operation on a list of DomainInformation objects"""
# Create a Paginator object. Bulk_update on the full dataset
# is too memory intensive for our current app config, so we can chunk this data instead.
batch_size = 1000
paginator = Paginator(self.domain_information_to_update, batch_size)
for page_num in paginator.page_range:
page = paginator.page(page_num)
DomainInformation.objects.bulk_update(page.object_list, self.changed_fields)
if debug:
logger.info(f"Updated these DomainInformations: {[item for item in self.domain_information_to_update]}")
logger.info(
f"{TerminalColors.OKGREEN}"
f"Updated {len(self.domain_information_to_update)} DomainInformations."
f"{TerminalColors.ENDC}"
)
def map_transition_domain_to_domain_information(self, item, domain_informations_dict, debug):
"""Attempts to return a DomainInformation object based on values from TransitionDomain.
Any domains which cannot be updated will be stored in an array.
"""
does_not_exist: bool = self.is_domain_name_missing(item, domain_informations_dict)
all_fields_are_none: bool = self.is_organization_data_missing(item)
if does_not_exist:
self.handle_if_domain_name_missing(item.domain_name)
elif all_fields_are_none:
logger.info(
f"{TerminalColors.YELLOW}"
f"Domain {item.domain_name} has no Organization Data. Cannot update."
f"{TerminalColors.ENDC}"
)
self.domains_skipped.append(item.domain_name)
else:
# Based on the current domain, grab the right DomainInformation object.
current_domain_information = domain_informations_dict[item.domain_name]
if current_domain_information.domain is None or current_domain_information.domain.name is None:
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.DOMAIN_NAME_WAS_NONE)
# Update fields
current_domain_information.address_line1 = item.address_line
current_domain_information.city = item.city
current_domain_information.state_territory = item.state_territory
current_domain_information.zipcode = item.zipcode
self.domain_information_to_update.append(current_domain_information)
if debug:
logger.info(f"Updated {current_domain_information.domain.name}...")
def is_domain_name_missing(self, item: TransitionDomain, domain_informations_dict):
"""Checks if domain_name is in the supplied dictionary"""
return item.domain_name not in domain_informations_dict
def is_organization_data_missing(self, item: TransitionDomain):
"""Checks if all desired Organization fields to update are none"""
fields = [item.address_line, item.city, item.state_territory, item.zipcode]
return all(field is None for field in fields)
def handle_if_domain_name_missing(self, domain_name):
"""
Infers what to log if we can't find a domain_name and updates the relevant lists.
This function performs the following checks:
1. If the domain does not exist, it logs an error and adds the domain name to the `domains_failed_to_update` list.
2. If the domain was updated by a user, it logs an info message and adds the domain name to the `domains_skipped` list.
3. If there are duplicate domains, it logs an error and adds the domain name to the `domains_failed_to_update` list.
Args:
domain_name (str): The name of the domain to check.
""" # noqa - E501 (harder to read)
domains = Domain.objects.filter(name=domain_name)
if domains.count() == 0:
logger.error(f"Could not add {domain_name}. Domain does not exist.")
self.domains_failed_to_update.append(domain_name)
elif domains.count() == 1:
logger.info(
f"{TerminalColors.YELLOW}"
f"Domain {domain_name} was updated by a user. Cannot update."
f"{TerminalColors.ENDC}"
)
self.domains_skipped.append(domain_name)
else:
logger.error(f"Could not add {domain_name}. Duplicate domains exist.")
self.domains_failed_to_update.append(domain_name)

View file

@ -9,9 +9,13 @@ import logging
import os
import sys
from typing import Dict
from typing import Dict, List
from django.core.paginator import Paginator
from registrar.models.transition_domain import TransitionDomain
from registrar.management.commands.utility.load_organization_error import (
LoadOrganizationError,
LoadOrganizationErrorCodes,
)
from .epp_data_containers import (
AgencyAdhoc,
@ -752,6 +756,195 @@ class FileDataHolder:
return (full_filename, can_infer)
class OrganizationDataLoader:
"""Saves organization data onto Transition Domains. Handles file parsing."""
def __init__(self, options: TransitionDomainArguments):
self.debug = options.debug
# We want to data from the domain_additional file and the organization_adhoc file
options.pattern_map_params = [
(
EnumFilenames.DOMAIN_ADDITIONAL,
options.domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
options.organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
]
# Reads and parses organization data
self.parsed_data = ExtraTransitionDomain(options)
# options.infer_filenames will always be false when not SETTING.DEBUG
self.parsed_data.parse_all_files(options.infer_filenames)
self.tds_to_update: List[TransitionDomain] = []
def update_organization_data_for_all(self):
"""Updates org address data for all TransitionDomains"""
all_transition_domains = TransitionDomain.objects.all()
if len(all_transition_domains) == 0:
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.EMPTY_TRANSITION_DOMAIN_TABLE)
self.prepare_transition_domains(all_transition_domains)
logger.info(f"{TerminalColors.MAGENTA}" "Beginning mass TransitionDomain update..." f"{TerminalColors.ENDC}")
self.bulk_update_transition_domains(self.tds_to_update)
return self.tds_to_update
def prepare_transition_domains(self, transition_domains):
"""Parses org data for each transition domain,
then appends it to the tds_to_update list"""
for item in transition_domains:
updated = self.parse_org_data(item.domain_name, item)
self.tds_to_update.append(updated)
if self.debug:
logger.info(
f"""{TerminalColors.OKCYAN}
Successfully updated:
{item.display_transition_domain()}
{TerminalColors.ENDC}"""
)
if self.debug:
logger.info(f"Updating the following: {[item for item in self.tds_to_update]}")
logger.info(
f"""{TerminalColors.MAGENTA}
Ready to update {len(self.tds_to_update)} TransitionDomains.
{TerminalColors.ENDC}"""
)
def bulk_update_transition_domains(self, update_list):
changed_fields = [
"address_line",
"city",
"state_territory",
"zipcode",
]
batch_size = 1000
# Create a Paginator object. Bulk_update on the full dataset
# is too memory intensive for our current app config, so we can chunk this data instead.
paginator = Paginator(update_list, batch_size)
for page_num in paginator.page_range:
page = paginator.page(page_num)
TransitionDomain.objects.bulk_update(page.object_list, changed_fields)
if self.debug:
logger.info(f"Updated the following: {[item for item in self.tds_to_update]}")
logger.info(
f"{TerminalColors.OKGREEN}" f"Updated {len(self.tds_to_update)} TransitionDomains." f"{TerminalColors.ENDC}"
)
def parse_org_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain:
"""Grabs organization_name from the parsed files and associates it
with a transition_domain object, then updates that transition domain object and returns it"""
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
org_info = self.get_org_info(domain_name)
if org_info is None:
logger.error(f"Could not add organization data on {domain_name}, no data exists.")
return transition_domain
# Add street info
transition_domain.address_line = org_info.orgstreet
transition_domain.city = org_info.orgcity
transition_domain.state_territory = org_info.orgstate
transition_domain.zipcode = org_info.orgzip
if self.debug:
# Log what happened to each field. The first value
# is the field name that was updated, second is the value
changed_fields = [
("address_line", transition_domain.address_line),
("city", transition_domain.city),
("state_territory", transition_domain.state_territory),
("zipcode", transition_domain.zipcode),
]
self.log_add_or_changed_values(changed_fields, domain_name)
return transition_domain
def get_org_info(self, domain_name) -> OrganizationAdhoc | None:
"""Maps an id given in get_domain_data to a organization_adhoc
record which has its corresponding definition"""
# Get a row in the domain_additional file. The id is the domain_name.
domain_additional_row = self.retrieve_row_by_id(EnumFilenames.DOMAIN_ADDITIONAL, domain_name)
if domain_additional_row is None:
return None
# Get a row in the organization_adhoc file. The id is the orgid in domain_additional_row.
org_row = self.retrieve_row_by_id(EnumFilenames.ORGANIZATION_ADHOC, domain_additional_row.orgid)
return org_row
def retrieve_row_by_id(self, file_type: EnumFilenames, desired_id):
"""Returns a field in a dictionary based off the type and id.
vars:
file_type: (constant) EnumFilenames -> Which data file to target.
An example would be `EnumFilenames.DOMAIN_ADHOC`.
desired_id: str -> Which id you want to search on.
An example would be `"12"` or `"igorville.gov"`
"""
# Grabs a dict associated with the file_type.
# For example, EnumFilenames.DOMAIN_ADDITIONAL would map to
# whatever data exists on the domain_additional file.
desired_file = self.parsed_data.file_data.get(file_type)
if desired_file is None:
logger.error(f"Type {file_type} does not exist")
return None
# This is essentially a dictionary of rows.
data_in_file = desired_file.data
# Get a row in the given file, based on an id.
# For instance, "igorville.gov" in domain_additional.
row_in_file = data_in_file.get(desired_id)
if row_in_file is None:
logger.error(f"Id {desired_id} does not exist for {file_type.value[0]}")
return row_in_file
def log_add_or_changed_values(self, values_to_check, domain_name):
"""Iterates through a list of fields, and determines if we should log
if the field was added or if the field was updated.
A field is "updated" when it is not None or not "".
A field is "created" when it is either of those things.
"""
for field_name, value in values_to_check:
str_exists = value is not None and value.strip() != ""
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
field_name,
value,
domain_name,
str_exists,
)
def _add_or_change_message(self, field_name, changed_value, domain_name, is_update=False):
"""Creates a log instance when a property
is successfully changed on a given TransitionDomain."""
if not is_update:
logger.info(f"Added {field_name} as '{changed_value}' on {domain_name}")
else:
logger.warning(f"Updated existing {field_name} to '{changed_value}' on {domain_name}")
class ExtraTransitionDomain:
"""Helper class to aid in storing TransitionDomain data spread across
multiple files."""
@ -775,52 +968,47 @@ class ExtraTransitionDomain:
# metadata about each file and associate it with an enum.
# That way if we want the data located at the agency_adhoc file,
# we can just call EnumFilenames.AGENCY_ADHOC.
pattern_map_params = [
(
EnumFilenames.AGENCY_ADHOC,
options.agency_adhoc_filename,
AgencyAdhoc,
"agencyid",
),
(
EnumFilenames.DOMAIN_ADDITIONAL,
options.domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.DOMAIN_ESCROW,
options.domain_escrow_filename,
DomainEscrow,
"domainname",
),
(
EnumFilenames.DOMAIN_ADHOC,
options.domain_adhoc_filename,
DomainTypeAdhoc,
"domaintypeid",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
options.organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
]
if options.pattern_map_params is None or options.pattern_map_params == []:
options.pattern_map_params = [
(
EnumFilenames.AGENCY_ADHOC,
options.agency_adhoc_filename,
AgencyAdhoc,
"agencyid",
),
(
EnumFilenames.DOMAIN_ADDITIONAL,
options.domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.DOMAIN_ESCROW,
options.domain_escrow_filename,
DomainEscrow,
"domainname",
),
(
EnumFilenames.DOMAIN_ADHOC,
options.domain_adhoc_filename,
DomainTypeAdhoc,
"domaintypeid",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
options.organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
]
self.file_data = self.populate_file_data(pattern_map_params)
self.file_data = self.populate_file_data(options.pattern_map_params)
# TODO - revise comment
def populate_file_data(self, pattern_map_params):

View file

@ -0,0 +1,39 @@
from enum import IntEnum
class LoadOrganizationErrorCodes(IntEnum):
"""Used when running the load_organization_data script
Overview of error codes:
- 1 TRANSITION_DOMAINS_NOT_FOUND
- 2 UPDATE_DOMAIN_INFO_FAILED
- 3 EMPTY_TRANSITION_DOMAIN_TABLE
"""
TRANSITION_DOMAINS_NOT_FOUND = 1
UPDATE_DOMAIN_INFO_FAILED = 2
EMPTY_TRANSITION_DOMAIN_TABLE = 3
DOMAIN_NAME_WAS_NONE = 4
class LoadOrganizationError(Exception):
"""
Error class used in the load_organization_data script
"""
_error_mapping = {
LoadOrganizationErrorCodes.TRANSITION_DOMAINS_NOT_FOUND: (
"Could not find all desired TransitionDomains. " "(Possible data corruption?)"
),
LoadOrganizationErrorCodes.UPDATE_DOMAIN_INFO_FAILED: "Failed to update DomainInformation",
LoadOrganizationErrorCodes.EMPTY_TRANSITION_DOMAIN_TABLE: "No TransitionDomains exist. Cannot update.",
LoadOrganizationErrorCodes.DOMAIN_NAME_WAS_NONE: "DomainInformation was updated, but domain was None",
}
def __init__(self, *args, code=None, **kwargs):
super().__init__(*args, **kwargs)
self.code = code
if self.code in self._error_mapping:
self.message = self._error_mapping.get(self.code)
def __str__(self):
return f"{self.message}"

View file

@ -18,7 +18,7 @@ class TransitionDomainArguments:
# Maintains an internal kwargs list and sets values
# that match the class definition.
def __init__(self, **kwargs):
self.kwargs = kwargs
self.pattern_map_params = kwargs.get("pattern_map_params", [])
for k, v in kwargs.items():
if hasattr(self, k):
setattr(self, k, v)
@ -36,13 +36,13 @@ class TransitionDomainArguments:
limitParse: Optional[int] = field(default=None, repr=True)
# Filenames #
# = Adhocs =#
# = Adhocs = #
agency_adhoc_filename: Optional[str] = field(default=EnumFilenames.AGENCY_ADHOC.value[1], repr=True)
domain_adhoc_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADHOC.value[1], repr=True)
organization_adhoc_filename: Optional[str] = field(default=EnumFilenames.ORGANIZATION_ADHOC.value[1], repr=True)
authority_adhoc_filename: Optional[str] = field(default=EnumFilenames.AUTHORITY_ADHOC.value[1], repr=True)
# = Data files =#
# = Data files = #
domain_escrow_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ESCROW.value[1], repr=True)
domain_additional_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADDITIONAL.value[1], repr=True)
domain_contacts_filename: Optional[str] = field(default=None, repr=True)

View file

@ -0,0 +1,32 @@
# Generated by Django 4.2.7 on 2023-11-16 19:56
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("registrar", "0046_transitiondomain_email_transitiondomain_first_name_and_more"),
]
operations = [
migrations.AddField(
model_name="transitiondomain",
name="address_line",
field=models.TextField(blank=True, help_text="Street address", null=True),
),
migrations.AddField(
model_name="transitiondomain",
name="city",
field=models.TextField(blank=True, help_text="City", null=True),
),
migrations.AddField(
model_name="transitiondomain",
name="state_territory",
field=models.CharField(blank=True, help_text="State, territory, or military post", max_length=2, null=True),
),
migrations.AddField(
model_name="transitiondomain",
name="zipcode",
field=models.CharField(blank=True, db_index=True, help_text="Zip code", max_length=10, null=True),
),
]

View file

@ -105,6 +105,29 @@ class TransitionDomain(TimeStampedModel):
blank=True,
help_text="Phone",
)
address_line = models.TextField(
null=True,
blank=True,
help_text="Street address",
)
city = models.TextField(
null=True,
blank=True,
help_text="City",
)
state_territory = models.CharField(
max_length=2,
null=True,
blank=True,
help_text="State, territory, or military post",
)
zipcode = models.CharField(
max_length=10,
null=True,
blank=True,
help_text="Zip code",
db_index=True,
)
def __str__(self):
return f"{self.username}, {self.domain_name}"
@ -128,4 +151,8 @@ class TransitionDomain(TimeStampedModel):
f"last_name: {self.last_name}, \n"
f"email: {self.email}, \n"
f"phone: {self.phone}, \n"
f"address_line: {self.address_line}, \n"
f"city: {self.city}, \n"
f"state_territory: {self.state_territory}, \n"
f"zipcode: {self.zipcode}, \n"
)

View file

@ -16,9 +16,316 @@ from registrar.models import (
from django.core.management import call_command
from unittest.mock import patch
from registrar.models.contact import Contact
from .common import less_console_noise
class TestOrganizationMigration(TestCase):
def setUp(self):
"""Defines the file name of migration_json and the folder its contained in"""
self.test_data_file_location = "registrar/tests/data"
self.migration_json_filename = "test_migrationFilepaths.json"
def tearDown(self):
"""Deletes all DB objects related to migrations"""
# Delete domain information
Domain.objects.all().delete()
DomainInformation.objects.all().delete()
DomainInvitation.objects.all().delete()
TransitionDomain.objects.all().delete()
# Delete users
User.objects.all().delete()
UserDomainRole.objects.all().delete()
def run_load_domains(self):
"""
This method executes the load_transition_domain command.
It uses 'unittest.mock.patch' to mock the TerminalHelper.query_yes_no_exit method,
which is a user prompt in the terminal. The mock function always returns True,
allowing the test to proceed without manual user input.
The 'call_command' function from Django's management framework is then used to
execute the load_transition_domain command with the specified arguments.
"""
# noqa here because splitting this up makes it confusing.
# ES501
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa
return_value=True,
):
call_command(
"load_transition_domain",
self.migration_json_filename,
directory=self.test_data_file_location,
)
def run_transfer_domains(self):
"""
This method executes the transfer_transition_domains_to_domains command.
The 'call_command' function from Django's management framework is then used to
execute the load_transition_domain command with the specified arguments.
"""
call_command("transfer_transition_domains_to_domains")
def run_load_organization_data(self):
"""
This method executes the load_organization_data command.
It uses 'unittest.mock.patch' to mock the TerminalHelper.query_yes_no_exit method,
which is a user prompt in the terminal. The mock function always returns True,
allowing the test to proceed without manual user input.
The 'call_command' function from Django's management framework is then used to
execute the load_organization_data command with the specified arguments.
"""
# noqa here (E501) because splitting this up makes it
# confusing to read.
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa
return_value=True,
):
call_command(
"load_organization_data",
self.migration_json_filename,
directory=self.test_data_file_location,
)
def compare_tables(
self,
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
):
"""Does a diff between the transition_domain and the following tables:
domain, domain_information and the domain_invitation.
Verifies that the data loaded correctly."""
missing_domains = []
duplicate_domains = []
missing_domain_informations = []
for transition_domain in TransitionDomain.objects.all():
transition_domain_name = transition_domain.domain_name
# Check Domain table
matching_domains = Domain.objects.filter(name=transition_domain_name)
# Check Domain Information table
matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name)
if len(matching_domains) == 0:
missing_domains.append(transition_domain_name)
elif len(matching_domains) > 1:
duplicate_domains.append(transition_domain_name)
if len(matching_domain_informations) == 0:
missing_domain_informations.append(transition_domain_name)
total_missing_domains = len(missing_domains)
total_duplicate_domains = len(duplicate_domains)
total_missing_domain_informations = len(missing_domain_informations)
total_transition_domains = len(TransitionDomain.objects.all())
total_domains = len(Domain.objects.all())
total_domain_informations = len(DomainInformation.objects.all())
self.assertEqual(total_missing_domains, expected_missing_domains)
self.assertEqual(total_duplicate_domains, expected_duplicate_domains)
self.assertEqual(total_missing_domain_informations, expected_missing_domain_informations)
self.assertEqual(total_transition_domains, expected_total_transition_domains)
self.assertEqual(total_domains, expected_total_domains)
self.assertEqual(total_domain_informations, expected_total_domain_informations)
def test_load_organization_data_transition_domain(self):
"""
This test verifies the functionality of the load_organization_data method for TransitionDomain objects.
The test follows these steps:
1. Parses all existing data by running the load_domains and transfer_domains methods.
2. Attempts to add organization data to the parsed data by running the load_organization_data method.
3. Checks that the data has been loaded as expected.
The expected result is a set of TransitionDomain objects with specific attributes.
The test fetches the actual TransitionDomain objects from the database and compares them with the expected objects.
""" # noqa - E501 (harder to read)
# == First, parse all existing data == #
self.run_load_domains()
self.run_transfer_domains()
# == Second, try adding org data to it == #
self.run_load_organization_data()
# == Third, test that we've loaded data as we expect == #
transition_domains = TransitionDomain.objects.filter(domain_name="fakewebsite2.gov")
# Should return three objects (three unique emails)
self.assertEqual(transition_domains.count(), 3)
# Lets test the first one
transition = transition_domains.first()
expected_transition_domain = TransitionDomain(
username="alexandra.bobbitt5@test.com",
domain_name="fakewebsite2.gov",
status="on hold",
email_sent=True,
organization_type="Federal",
organization_name="Fanoodle",
federal_type="Executive",
federal_agency="Department of Commerce",
epp_creation_date=datetime.date(2004, 5, 7),
epp_expiration_date=datetime.date(2023, 9, 30),
first_name="Seline",
middle_name="testmiddle2",
last_name="Tower",
title=None,
email="stower3@answers.com",
phone="151-539-6028",
address_line="93001 Arizona Drive",
city="Columbus",
state_territory="Oh",
zipcode="43268",
)
expected_transition_domain.id = transition.id
self.assertEqual(transition, expected_transition_domain)
def test_load_organization_data_domain_information(self):
"""
This test verifies the functionality of the load_organization_data method.
The test follows these steps:
1. Parses all existing data by running the load_domains and transfer_domains methods.
2. Attempts to add organization data to the parsed data by running the load_organization_data method.
3. Checks that the data has been loaded as expected.
The expected result is a DomainInformation object with specific attributes.
The test fetches the actual DomainInformation object from the database
and compares it with the expected object.
"""
# == First, parse all existing data == #
self.run_load_domains()
self.run_transfer_domains()
# == Second, try adding org data to it == #
self.run_load_organization_data()
# == Third, test that we've loaded data as we expect == #
_domain = Domain.objects.filter(name="fakewebsite2.gov").get()
domain_information = DomainInformation.objects.filter(domain=_domain).get()
expected_creator = User.objects.filter(username="System").get()
expected_ao = Contact.objects.filter(first_name="Seline", middle_name="testmiddle2", last_name="Tower").get()
expected_domain_information = DomainInformation(
creator=expected_creator,
organization_type="federal",
federal_agency="Department of Commerce",
federal_type="executive",
organization_name="Fanoodle",
address_line1="93001 Arizona Drive",
city="Columbus",
state_territory="Oh",
zipcode="43268",
authorizing_official=expected_ao,
domain=_domain,
)
# Given that these are different objects, this needs to be set
expected_domain_information.id = domain_information.id
self.assertEqual(domain_information, expected_domain_information)
def test_load_organization_data_preserves_existing_data(self):
"""
This test verifies that the load_organization_data method does not overwrite existing data.
The test follows these steps:
1. Parses all existing data by running the load_domains and transfer_domains methods.
2. Adds pre-existing fake data to a DomainInformation object and saves it to the database.
3. Runs the load_organization_data method.
4. Checks that the pre-existing data in the DomainInformation object has not been overwritten.
The expected result is that the DomainInformation object retains its pre-existing data
after the load_organization_data method is run.
"""
# == First, parse all existing data == #
self.run_load_domains()
self.run_transfer_domains()
# == Second, try add prexisting fake data == #
_domain_old = Domain.objects.filter(name="fakewebsite2.gov").get()
domain_information_old = DomainInformation.objects.filter(domain=_domain_old).get()
domain_information_old.address_line1 = "93001 Galactic Way"
domain_information_old.city = "Olympus"
domain_information_old.state_territory = "MA"
domain_information_old.zipcode = "12345"
domain_information_old.save()
# == Third, try running the script == #
self.run_load_organization_data()
# == Fourth, test that no data is overwritten as we expect == #
_domain = Domain.objects.filter(name="fakewebsite2.gov").get()
domain_information = DomainInformation.objects.filter(domain=_domain).get()
expected_creator = User.objects.filter(username="System").get()
expected_ao = Contact.objects.filter(first_name="Seline", middle_name="testmiddle2", last_name="Tower").get()
expected_domain_information = DomainInformation(
creator=expected_creator,
organization_type="federal",
federal_agency="Department of Commerce",
federal_type="executive",
organization_name="Fanoodle",
address_line1="93001 Galactic Way",
city="Olympus",
state_territory="MA",
zipcode="12345",
authorizing_official=expected_ao,
domain=_domain,
)
# Given that these are different objects, this needs to be set
expected_domain_information.id = domain_information.id
self.assertEqual(domain_information, expected_domain_information)
def test_load_organization_data_integrity(self):
"""
This test verifies the data integrity after running the load_organization_data method.
The test follows these steps:
1. Parses all existing data by running the load_domains and transfer_domains methods.
2. Attempts to add organization data to the parsed data by running the load_organization_data method.
3. Checks that the data has not been corrupted by comparing the actual counts of objects in the database
with the expected counts.
The expected result is that the counts of objects in the database
match the expected counts, indicating that the data has not been corrupted.
"""
# First, parse all existing data
self.run_load_domains()
self.run_transfer_domains()
# Second, try adding org data to it
self.run_load_organization_data()
# Third, test that we didn't corrupt any data
expected_total_transition_domains = 9
expected_total_domains = 5
expected_total_domain_informations = 5
expected_missing_domains = 0
expected_duplicate_domains = 0
expected_missing_domain_informations = 0
self.compare_tables(
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
)
class TestMigrations(TestCase):
def setUp(self):
""" """
@ -41,11 +348,12 @@ class TestMigrations(TestCase):
self.migration_json_filename = "test_migrationFilepaths.json"
def tearDown(self):
super().tearDown()
# Delete domain information
TransitionDomain.objects.all().delete()
Domain.objects.all().delete()
DomainInvitation.objects.all().delete()
DomainInformation.objects.all().delete()
DomainInvitation.objects.all().delete()
# Delete users
User.objects.all().delete()