Merge branch 'main' into ky/required-optional-form-fields

This commit is contained in:
Kristina Yin 2023-11-22 09:42:03 -08:00
commit 4e85658d0e
No known key found for this signature in database
GPG key ID: 9BB3845BB3A21584
19 changed files with 1170 additions and 70 deletions

View file

@ -736,7 +736,7 @@ class DomainAdmin(ListHeaderAdmin):
search_help_text = "Search by domain name."
change_form_template = "django/admin/domain_change_form.html"
change_list_template = "django/admin/domain_change_list.html"
readonly_fields = ["state"]
readonly_fields = ["state", "expiration_date"]
def export_data_type(self, request):
# match the CSV example with all the fields

View file

@ -299,11 +299,18 @@ SERVER_EMAIL = "root@get.gov"
# this can be restrictive because we have few external scripts
allowed_sources = ("'self'",)
CSP_DEFAULT_SRC = allowed_sources
# Most things fall back to default-src, but these two do not and should be
# Most things fall back to default-src, but the following do not and should be
# explicitly set
CSP_FRAME_ANCESTORS = allowed_sources
CSP_FORM_ACTION = allowed_sources
# Google analytics requires that we relax our otherwise
# strict CSP by allowing scripts to run from their domain
# and inline with a nonce, as well as allowing connections back to their domain
CSP_SCRIPT_SRC_ELEM = ["'self'", "https://www.googletagmanager.com/"]
CSP_CONNECT_SRC = ["'self'", "https://www.google-analytics.com/"]
CSP_INCLUDE_NONCE_IN = ["script-src-elem"]
# Cross-Origin Resource Sharing (CORS) configuration
# Sets clients that allow access control to manage.get.gov
# TODO: remove :8080 to see if we can have all localhost access

View file

@ -5,6 +5,7 @@ from .domain import (
DomainSecurityEmailForm,
DomainOrgNameAddressForm,
ContactForm,
AuthorizingOfficialContactForm,
DomainDnssecForm,
DomainDsdataFormset,
DomainDsdataForm,

View file

@ -157,6 +157,40 @@ class ContactForm(forms.ModelForm):
# Set custom form label
self.fields["middle_name"].label = "Middle name (optional)"
# Set custom error messages
self.fields["first_name"].error_messages = {"required": "Enter your first name / given name."}
self.fields["last_name"].error_messages = {"required": "Enter your last name / family name."}
self.fields["title"].error_messages = {
"required": "Enter your title or role in your organization (e.g., Chief Information Officer)"
}
self.fields["email"].error_messages = {
"required": "Enter your email address in the required format, like name@example.com."
}
self.fields["phone"].error_messages = {"required": "Enter your phone number."}
class AuthorizingOfficialContactForm(ContactForm):
"""Form for updating authorizing official contacts."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Set custom error messages
self.fields["first_name"].error_messages = {
"required": "Enter the first name / given name of your authorizing official."
}
self.fields["last_name"].error_messages = {
"required": "Enter the last name / family name of your authorizing official."
}
self.fields["title"].error_messages = {
"required": "Enter the title or role your authorizing official has in your \
organization (e.g., Chief Information Officer)."
}
self.fields["email"].error_messages = {
"required": "Enter an email address in the required format, like name@example.com."
}
self.fields["phone"].error_messages = {"required": "Enter a phone number for your authorizing official."}
class DomainSecurityEmailForm(forms.Form):
"""Form for adding or editing a security email to a domain."""

View file

@ -0,0 +1,283 @@
"""Data migration: Load organization data for TransitionDomain and DomainInformation objects"""
import argparse
import json
import logging
import os
from django.core.management import BaseCommand
from registrar.management.commands.utility.extra_transition_domain_helper import OrganizationDataLoader
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from registrar.management.commands.utility.transition_domain_arguments import TransitionDomainArguments
from registrar.models import TransitionDomain, DomainInformation
from django.core.paginator import Paginator
from typing import List
from registrar.models.domain import Domain
from registrar.management.commands.utility.load_organization_error import (
LoadOrganizationError,
LoadOrganizationErrorCodes,
)
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Load organization data on TransitionDomain and DomainInformation objects"
def __init__(self):
super().__init__()
self.domain_information_to_update: List[DomainInformation] = []
# Stores the domain_name for logging purposes
self.domains_failed_to_update: List[str] = []
self.domains_skipped: List[str] = []
self.changed_fields = [
"address_line1",
"city",
"state_territory",
"zipcode",
]
def add_arguments(self, parser):
"""Add command line arguments."""
parser.add_argument(
"migration_json_filename",
help=("A JSON file that holds the location and filenames" "of all the data files used for migrations"),
)
parser.add_argument("--sep", default="|", help="Delimiter character")
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
parser.add_argument("--directory", default="migrationdata", help="Desired directory")
def handle(self, migration_json_filename, **options):
"""Load organization address data into the TransitionDomain
and DomainInformation tables by using the organization adhoc file and domain_additional file"""
# Parse JSON file
options = self.load_json_settings(options, migration_json_filename)
org_args = TransitionDomainArguments(**options)
# Will sys.exit() when prompt is "n"
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
==Master data file==
domain_additional_filename: {org_args.domain_additional_filename}
==Organization data==
organization_adhoc_filename: {org_args.organization_adhoc_filename}
==Containing directory==
directory: {org_args.directory}
""",
prompt_title="Do you wish to load organization data for TransitionDomains?",
)
org_load_helper = OrganizationDataLoader(org_args)
transition_domains = org_load_helper.update_organization_data_for_all()
# Reprompt the user to reinspect before updating DomainInformation
# Will sys.exit() when prompt is "n"
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
==Master data file==
domain_additional_filename: {org_args.domain_additional_filename}
==Organization name information==
organization_adhoc_filename: {org_args.organization_adhoc_filename}
==Containing directory==
directory: {org_args.directory}
==Proposed Changes==
Number of DomainInformation objects to (potentially) change: {len(transition_domains)}
For each DomainInformation, modify the following fields: {self.changed_fields}
""",
prompt_title="Do you wish to update organization address data for DomainInformation as well?",
)
logger.info(
f"{TerminalColors.MAGENTA}"
"Preparing to load organization data onto DomainInformation tables..."
f"{TerminalColors.ENDC}"
)
self.prepare_update_domain_information(transition_domains, org_args.debug)
logger.info(f"{TerminalColors.MAGENTA}" f"Beginning mass DomainInformation update..." f"{TerminalColors.ENDC}")
self.bulk_update_domain_information(org_args.debug)
def load_json_settings(self, options, migration_json_filename):
"""Parses options from the given JSON file."""
json_filepath = os.path.join(options["directory"], migration_json_filename)
# If a JSON was provided, use its values instead of defaults.
with open(json_filepath, "r") as jsonFile:
# load JSON object as a dictionary
try:
data = json.load(jsonFile)
skipped_fields = ["domain_additional_filename", "organization_adhoc_filename"]
# Iterate over the data from the JSON file. Skip any unused values.
for key, value in data.items():
if value is not None and value.strip() != "":
# If any key in skipped_fields has a value, then
# we override what is specified in the JSON.
if options not in skipped_fields:
options[key] = value
except Exception as err:
logger.error(
f"{TerminalColors.FAIL}"
"There was an error loading "
"the JSON responsible for providing filepaths."
f"{TerminalColors.ENDC}"
)
raise err
return options
def prepare_update_domain_information(self, target_transition_domains: List[TransitionDomain], debug):
"""Returns an array of DomainInformation objects with updated organization data."""
if len(target_transition_domains) == 0:
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.EMPTY_TRANSITION_DOMAIN_TABLE)
# Grab each TransitionDomain we want to change.
transition_domains = TransitionDomain.objects.filter(
username__in=[item.username for item in target_transition_domains],
domain_name__in=[item.domain_name for item in target_transition_domains],
)
# This indicates that some form of data corruption happened.
if len(target_transition_domains) != len(transition_domains):
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.TRANSITION_DOMAINS_NOT_FOUND)
# Maps TransitionDomain <--> DomainInformation.
# If any related organization fields have been updated,
# we can assume that they modified this information themselves - thus we should not update it.
domain_informations = DomainInformation.objects.filter(
domain__name__in=[td.domain_name for td in transition_domains],
address_line1__isnull=True,
city__isnull=True,
state_territory__isnull=True,
zipcode__isnull=True,
)
filtered_domain_informations_dict = {di.domain.name: di for di in domain_informations if di.domain is not None}
# === Create DomainInformation objects === #
for item in transition_domains:
self.map_transition_domain_to_domain_information(item, filtered_domain_informations_dict, debug)
# === Log results and return data === #
if len(self.domains_failed_to_update) > 0:
logger.error(
f"""{TerminalColors.FAIL}
Failed to update. An exception was encountered on the following Domains: {self.domains_failed_to_update}
{TerminalColors.ENDC}"""
)
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.UPDATE_DOMAIN_INFO_FAILED)
if debug:
logger.info(f"Updating these DomainInformations: {[item for item in self.domain_information_to_update]}")
if len(self.domains_skipped) > 0:
logger.info(f"Skipped these fields: {self.domains_skipped}")
logger.info(
f"{TerminalColors.YELLOW}"
f"Skipped updating {len(self.domains_skipped)} fields. User-supplied data exists, or there is no data."
f"{TerminalColors.ENDC}"
)
logger.info(f"Ready to update {len(self.domain_information_to_update)} DomainInformations.")
return self.domain_information_to_update
def bulk_update_domain_information(self, debug):
"""Performs a bulk_update operation on a list of DomainInformation objects"""
# Create a Paginator object. Bulk_update on the full dataset
# is too memory intensive for our current app config, so we can chunk this data instead.
batch_size = 1000
paginator = Paginator(self.domain_information_to_update, batch_size)
for page_num in paginator.page_range:
page = paginator.page(page_num)
DomainInformation.objects.bulk_update(page.object_list, self.changed_fields)
if debug:
logger.info(f"Updated these DomainInformations: {[item for item in self.domain_information_to_update]}")
logger.info(
f"{TerminalColors.OKGREEN}"
f"Updated {len(self.domain_information_to_update)} DomainInformations."
f"{TerminalColors.ENDC}"
)
def map_transition_domain_to_domain_information(self, item, domain_informations_dict, debug):
"""Attempts to return a DomainInformation object based on values from TransitionDomain.
Any domains which cannot be updated will be stored in an array.
"""
does_not_exist: bool = self.is_domain_name_missing(item, domain_informations_dict)
all_fields_are_none: bool = self.is_organization_data_missing(item)
if does_not_exist:
self.handle_if_domain_name_missing(item.domain_name)
elif all_fields_are_none:
logger.info(
f"{TerminalColors.YELLOW}"
f"Domain {item.domain_name} has no Organization Data. Cannot update."
f"{TerminalColors.ENDC}"
)
self.domains_skipped.append(item.domain_name)
else:
# Based on the current domain, grab the right DomainInformation object.
current_domain_information = domain_informations_dict[item.domain_name]
if current_domain_information.domain is None or current_domain_information.domain.name is None:
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.DOMAIN_NAME_WAS_NONE)
# Update fields
current_domain_information.address_line1 = item.address_line
current_domain_information.city = item.city
current_domain_information.state_territory = item.state_territory
current_domain_information.zipcode = item.zipcode
self.domain_information_to_update.append(current_domain_information)
if debug:
logger.info(f"Updated {current_domain_information.domain.name}...")
def is_domain_name_missing(self, item: TransitionDomain, domain_informations_dict):
"""Checks if domain_name is in the supplied dictionary"""
return item.domain_name not in domain_informations_dict
def is_organization_data_missing(self, item: TransitionDomain):
"""Checks if all desired Organization fields to update are none"""
fields = [item.address_line, item.city, item.state_territory, item.zipcode]
return all(field is None for field in fields)
def handle_if_domain_name_missing(self, domain_name):
"""
Infers what to log if we can't find a domain_name and updates the relevant lists.
This function performs the following checks:
1. If the domain does not exist, it logs an error and adds the domain name to the `domains_failed_to_update` list.
2. If the domain was updated by a user, it logs an info message and adds the domain name to the `domains_skipped` list.
3. If there are duplicate domains, it logs an error and adds the domain name to the `domains_failed_to_update` list.
Args:
domain_name (str): The name of the domain to check.
""" # noqa - E501 (harder to read)
domains = Domain.objects.filter(name=domain_name)
if domains.count() == 0:
logger.error(f"Could not add {domain_name}. Domain does not exist.")
self.domains_failed_to_update.append(domain_name)
elif domains.count() == 1:
logger.info(
f"{TerminalColors.YELLOW}"
f"Domain {domain_name} was updated by a user. Cannot update."
f"{TerminalColors.ENDC}"
)
self.domains_skipped.append(domain_name)
else:
logger.error(f"Could not add {domain_name}. Duplicate domains exist.")
self.domains_failed_to_update.append(domain_name)

View file

@ -9,9 +9,13 @@ import logging
import os
import sys
from typing import Dict
from typing import Dict, List
from django.core.paginator import Paginator
from registrar.models.transition_domain import TransitionDomain
from registrar.management.commands.utility.load_organization_error import (
LoadOrganizationError,
LoadOrganizationErrorCodes,
)
from .epp_data_containers import (
AgencyAdhoc,
@ -752,6 +756,195 @@ class FileDataHolder:
return (full_filename, can_infer)
class OrganizationDataLoader:
"""Saves organization data onto Transition Domains. Handles file parsing."""
def __init__(self, options: TransitionDomainArguments):
self.debug = options.debug
# We want to data from the domain_additional file and the organization_adhoc file
options.pattern_map_params = [
(
EnumFilenames.DOMAIN_ADDITIONAL,
options.domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
options.organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
]
# Reads and parses organization data
self.parsed_data = ExtraTransitionDomain(options)
# options.infer_filenames will always be false when not SETTING.DEBUG
self.parsed_data.parse_all_files(options.infer_filenames)
self.tds_to_update: List[TransitionDomain] = []
def update_organization_data_for_all(self):
"""Updates org address data for all TransitionDomains"""
all_transition_domains = TransitionDomain.objects.all()
if len(all_transition_domains) == 0:
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.EMPTY_TRANSITION_DOMAIN_TABLE)
self.prepare_transition_domains(all_transition_domains)
logger.info(f"{TerminalColors.MAGENTA}" "Beginning mass TransitionDomain update..." f"{TerminalColors.ENDC}")
self.bulk_update_transition_domains(self.tds_to_update)
return self.tds_to_update
def prepare_transition_domains(self, transition_domains):
"""Parses org data for each transition domain,
then appends it to the tds_to_update list"""
for item in transition_domains:
updated = self.parse_org_data(item.domain_name, item)
self.tds_to_update.append(updated)
if self.debug:
logger.info(
f"""{TerminalColors.OKCYAN}
Successfully updated:
{item.display_transition_domain()}
{TerminalColors.ENDC}"""
)
if self.debug:
logger.info(f"Updating the following: {[item for item in self.tds_to_update]}")
logger.info(
f"""{TerminalColors.MAGENTA}
Ready to update {len(self.tds_to_update)} TransitionDomains.
{TerminalColors.ENDC}"""
)
def bulk_update_transition_domains(self, update_list):
changed_fields = [
"address_line",
"city",
"state_territory",
"zipcode",
]
batch_size = 1000
# Create a Paginator object. Bulk_update on the full dataset
# is too memory intensive for our current app config, so we can chunk this data instead.
paginator = Paginator(update_list, batch_size)
for page_num in paginator.page_range:
page = paginator.page(page_num)
TransitionDomain.objects.bulk_update(page.object_list, changed_fields)
if self.debug:
logger.info(f"Updated the following: {[item for item in self.tds_to_update]}")
logger.info(
f"{TerminalColors.OKGREEN}" f"Updated {len(self.tds_to_update)} TransitionDomains." f"{TerminalColors.ENDC}"
)
def parse_org_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain:
"""Grabs organization_name from the parsed files and associates it
with a transition_domain object, then updates that transition domain object and returns it"""
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
org_info = self.get_org_info(domain_name)
if org_info is None:
logger.error(f"Could not add organization data on {domain_name}, no data exists.")
return transition_domain
# Add street info
transition_domain.address_line = org_info.orgstreet
transition_domain.city = org_info.orgcity
transition_domain.state_territory = org_info.orgstate
transition_domain.zipcode = org_info.orgzip
if self.debug:
# Log what happened to each field. The first value
# is the field name that was updated, second is the value
changed_fields = [
("address_line", transition_domain.address_line),
("city", transition_domain.city),
("state_territory", transition_domain.state_territory),
("zipcode", transition_domain.zipcode),
]
self.log_add_or_changed_values(changed_fields, domain_name)
return transition_domain
def get_org_info(self, domain_name) -> OrganizationAdhoc | None:
"""Maps an id given in get_domain_data to a organization_adhoc
record which has its corresponding definition"""
# Get a row in the domain_additional file. The id is the domain_name.
domain_additional_row = self.retrieve_row_by_id(EnumFilenames.DOMAIN_ADDITIONAL, domain_name)
if domain_additional_row is None:
return None
# Get a row in the organization_adhoc file. The id is the orgid in domain_additional_row.
org_row = self.retrieve_row_by_id(EnumFilenames.ORGANIZATION_ADHOC, domain_additional_row.orgid)
return org_row
def retrieve_row_by_id(self, file_type: EnumFilenames, desired_id):
"""Returns a field in a dictionary based off the type and id.
vars:
file_type: (constant) EnumFilenames -> Which data file to target.
An example would be `EnumFilenames.DOMAIN_ADHOC`.
desired_id: str -> Which id you want to search on.
An example would be `"12"` or `"igorville.gov"`
"""
# Grabs a dict associated with the file_type.
# For example, EnumFilenames.DOMAIN_ADDITIONAL would map to
# whatever data exists on the domain_additional file.
desired_file = self.parsed_data.file_data.get(file_type)
if desired_file is None:
logger.error(f"Type {file_type} does not exist")
return None
# This is essentially a dictionary of rows.
data_in_file = desired_file.data
# Get a row in the given file, based on an id.
# For instance, "igorville.gov" in domain_additional.
row_in_file = data_in_file.get(desired_id)
if row_in_file is None:
logger.error(f"Id {desired_id} does not exist for {file_type.value[0]}")
return row_in_file
def log_add_or_changed_values(self, values_to_check, domain_name):
"""Iterates through a list of fields, and determines if we should log
if the field was added or if the field was updated.
A field is "updated" when it is not None or not "".
A field is "created" when it is either of those things.
"""
for field_name, value in values_to_check:
str_exists = value is not None and value.strip() != ""
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
field_name,
value,
domain_name,
str_exists,
)
def _add_or_change_message(self, field_name, changed_value, domain_name, is_update=False):
"""Creates a log instance when a property
is successfully changed on a given TransitionDomain."""
if not is_update:
logger.info(f"Added {field_name} as '{changed_value}' on {domain_name}")
else:
logger.warning(f"Updated existing {field_name} to '{changed_value}' on {domain_name}")
class ExtraTransitionDomain:
"""Helper class to aid in storing TransitionDomain data spread across
multiple files."""
@ -775,52 +968,47 @@ class ExtraTransitionDomain:
# metadata about each file and associate it with an enum.
# That way if we want the data located at the agency_adhoc file,
# we can just call EnumFilenames.AGENCY_ADHOC.
pattern_map_params = [
(
EnumFilenames.AGENCY_ADHOC,
options.agency_adhoc_filename,
AgencyAdhoc,
"agencyid",
),
(
EnumFilenames.DOMAIN_ADDITIONAL,
options.domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.DOMAIN_ESCROW,
options.domain_escrow_filename,
DomainEscrow,
"domainname",
),
(
EnumFilenames.DOMAIN_ADHOC,
options.domain_adhoc_filename,
DomainTypeAdhoc,
"domaintypeid",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
options.organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
]
if options.pattern_map_params is None or options.pattern_map_params == []:
options.pattern_map_params = [
(
EnumFilenames.AGENCY_ADHOC,
options.agency_adhoc_filename,
AgencyAdhoc,
"agencyid",
),
(
EnumFilenames.DOMAIN_ADDITIONAL,
options.domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.DOMAIN_ESCROW,
options.domain_escrow_filename,
DomainEscrow,
"domainname",
),
(
EnumFilenames.DOMAIN_ADHOC,
options.domain_adhoc_filename,
DomainTypeAdhoc,
"domaintypeid",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
options.organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
]
self.file_data = self.populate_file_data(pattern_map_params)
self.file_data = self.populate_file_data(options.pattern_map_params)
# TODO - revise comment
def populate_file_data(self, pattern_map_params):

View file

@ -0,0 +1,39 @@
from enum import IntEnum
class LoadOrganizationErrorCodes(IntEnum):
"""Used when running the load_organization_data script
Overview of error codes:
- 1 TRANSITION_DOMAINS_NOT_FOUND
- 2 UPDATE_DOMAIN_INFO_FAILED
- 3 EMPTY_TRANSITION_DOMAIN_TABLE
"""
TRANSITION_DOMAINS_NOT_FOUND = 1
UPDATE_DOMAIN_INFO_FAILED = 2
EMPTY_TRANSITION_DOMAIN_TABLE = 3
DOMAIN_NAME_WAS_NONE = 4
class LoadOrganizationError(Exception):
"""
Error class used in the load_organization_data script
"""
_error_mapping = {
LoadOrganizationErrorCodes.TRANSITION_DOMAINS_NOT_FOUND: (
"Could not find all desired TransitionDomains. " "(Possible data corruption?)"
),
LoadOrganizationErrorCodes.UPDATE_DOMAIN_INFO_FAILED: "Failed to update DomainInformation",
LoadOrganizationErrorCodes.EMPTY_TRANSITION_DOMAIN_TABLE: "No TransitionDomains exist. Cannot update.",
LoadOrganizationErrorCodes.DOMAIN_NAME_WAS_NONE: "DomainInformation was updated, but domain was None",
}
def __init__(self, *args, code=None, **kwargs):
super().__init__(*args, **kwargs)
self.code = code
if self.code in self._error_mapping:
self.message = self._error_mapping.get(self.code)
def __str__(self):
return f"{self.message}"

View file

@ -18,7 +18,7 @@ class TransitionDomainArguments:
# Maintains an internal kwargs list and sets values
# that match the class definition.
def __init__(self, **kwargs):
self.kwargs = kwargs
self.pattern_map_params = kwargs.get("pattern_map_params", [])
for k, v in kwargs.items():
if hasattr(self, k):
setattr(self, k, v)
@ -36,13 +36,13 @@ class TransitionDomainArguments:
limitParse: Optional[int] = field(default=None, repr=True)
# Filenames #
# = Adhocs =#
# = Adhocs = #
agency_adhoc_filename: Optional[str] = field(default=EnumFilenames.AGENCY_ADHOC.value[1], repr=True)
domain_adhoc_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADHOC.value[1], repr=True)
organization_adhoc_filename: Optional[str] = field(default=EnumFilenames.ORGANIZATION_ADHOC.value[1], repr=True)
authority_adhoc_filename: Optional[str] = field(default=EnumFilenames.AUTHORITY_ADHOC.value[1], repr=True)
# = Data files =#
# = Data files = #
domain_escrow_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ESCROW.value[1], repr=True)
domain_additional_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADDITIONAL.value[1], repr=True)
domain_contacts_filename: Optional[str] = field(default=None, repr=True)

View file

@ -0,0 +1,32 @@
# Generated by Django 4.2.7 on 2023-11-16 19:56
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("registrar", "0046_transitiondomain_email_transitiondomain_first_name_and_more"),
]
operations = [
migrations.AddField(
model_name="transitiondomain",
name="address_line",
field=models.TextField(blank=True, help_text="Street address", null=True),
),
migrations.AddField(
model_name="transitiondomain",
name="city",
field=models.TextField(blank=True, help_text="City", null=True),
),
migrations.AddField(
model_name="transitiondomain",
name="state_territory",
field=models.CharField(blank=True, help_text="State, territory, or military post", max_length=2, null=True),
),
migrations.AddField(
model_name="transitiondomain",
name="zipcode",
field=models.CharField(blank=True, db_index=True, help_text="Zip code", max_length=10, null=True),
),
]

View file

@ -211,12 +211,56 @@ class Domain(TimeStampedModel, DomainHelper):
@Cache
def registry_expiration_date(self) -> date:
"""Get or set the `ex_date` element from the registry."""
return self._get_property("ex_date")
"""Get or set the `ex_date` element from the registry.
Additionally, update the expiration date in the registrar"""
try:
self.expiration_date = self._get_property("ex_date")
self.save()
return self.expiration_date
except Exception as e:
# exception raised during the save to registrar
logger.error(f"error updating expiration date in registrar: {e}")
raise (e)
@registry_expiration_date.setter # type: ignore
def registry_expiration_date(self, ex_date: date):
pass
"""
Direct setting of the expiration date in the registry is not implemented.
To update the expiration date, use renew_domain method."""
raise NotImplementedError()
def renew_domain(self, length: int = 1, unit: epp.Unit = epp.Unit.YEAR):
"""
Renew the domain to a length and unit of time relative to the current
expiration date.
Default length and unit of time are 1 year.
"""
# if no expiration date from registry, set to today
try:
cur_exp_date = self.registry_expiration_date
except KeyError:
logger.warning("current expiration date not set; setting to today")
cur_exp_date = date.today()
# create RenewDomain request
request = commands.RenewDomain(name=self.name, cur_exp_date=cur_exp_date, period=epp.Period(length, unit))
try:
# update expiration date in registry, and set the updated
# expiration date in the registrar, and in the cache
self._cache["ex_date"] = registry.send(request, cleaned=True).res_data[0].ex_date
self.expiration_date = self._cache["ex_date"]
self.save()
except RegistryError as err:
# if registry error occurs, log the error, and raise it as well
logger.error(f"registry error renewing domain: {err}")
raise (err)
except Exception as e:
# exception raised during the save to registrar
logger.error(f"error updating expiration date in registrar: {e}")
raise (e)
@Cache
def password(self) -> str:

View file

@ -105,6 +105,29 @@ class TransitionDomain(TimeStampedModel):
blank=True,
help_text="Phone",
)
address_line = models.TextField(
null=True,
blank=True,
help_text="Street address",
)
city = models.TextField(
null=True,
blank=True,
help_text="City",
)
state_territory = models.CharField(
max_length=2,
null=True,
blank=True,
help_text="State, territory, or military post",
)
zipcode = models.CharField(
max_length=10,
null=True,
blank=True,
help_text="Zip code",
db_index=True,
)
def __str__(self):
return f"{self.username}, {self.domain_name}"
@ -128,4 +151,8 @@ class TransitionDomain(TimeStampedModel):
f"last_name: {self.last_name}, \n"
f"email: {self.email}, \n"
f"phone: {self.phone}, \n"
f"address_line: {self.address_line}, \n"
f"city: {self.city}, \n"
f"state_territory: {self.state_territory}, \n"
f"zipcode: {self.zipcode}, \n"
)

View file

@ -6,7 +6,7 @@
{% if IS_PRODUCTION %}
<!-- Google tag (gtag.js) -->
<script async src="https://www.googletagmanager.com/gtag/js?id=G-PZ5QSP6QPL"></script>
<script>
<script type="text/javascript" nonce="{{request.csp_nonce}}">
window.dataLayer = window.dataLayer || []; function gtag(){dataLayer.push(arguments);} gtag('js', new Date()); gtag('config', 'G-PZ5QSP6QPL');
</script>
{% endif %}

View file

@ -17,6 +17,7 @@
<a href="{{ url }}" {% if request.path|startswith:url %}class="usa-current"{% endif %}>
DNS
</a>
{% if request.path|startswith:url %}
<ul class="usa-sidenav__sublist">
<li class="usa-sidenav__item">
{% url 'domain-dns-nameservers' pk=domain.id as url %}
@ -34,20 +35,23 @@
>
DNSSEC
</a>
{% if domain.dnssecdata is not None or request.path|startswith:url and request.path|endswith:'dsdata' %}
<ul class="usa-sidenav__sublist">
<li class="usa-sidenav__item">
{% url 'domain-dns-dnssec-dsdata' pk=domain.id as url %}
<a href="{{ url }}"
{% if request.path == url %}class="usa-current"{% endif %}
>
DS Data
</a>
</li>
</ul>
{% if request.path|startswith:url %}
{% if domain.dnssecdata is not None or request.path|startswith:url and request.path|endswith:'dsdata' %}
<ul class="usa-sidenav__sublist">
<li class="usa-sidenav__item">
{% url 'domain-dns-dnssec-dsdata' pk=domain.id as url %}
<a href="{{ url }}"
{% if request.path == url %}class="usa-current"{% endif %}
>
DS Data
</a>
</li>
</ul>
{% endif %}
{% endif %}
</li>
</ul>
{% endif %}
</li>
<li class="usa-sidenav__item">

View file

@ -556,6 +556,7 @@ class MockEppLib(TestCase):
avail=...,
addrs=...,
registrant=...,
ex_date=...,
):
self.auth_info = auth_info
self.cr_date = cr_date
@ -565,6 +566,7 @@ class MockEppLib(TestCase):
self.avail = avail # use for CheckDomain
self.addrs = addrs
self.registrant = registrant
self.ex_date = ex_date
def dummyInfoContactResultData(
self,
@ -811,6 +813,11 @@ class MockEppLib(TestCase):
],
)
mockRenewedDomainExpDate = fakedEppObject(
"fake.gov",
ex_date=datetime.date(2023, 5, 25),
)
def _mockDomainName(self, _name, _avail=False):
return MagicMock(
res_data=[
@ -870,6 +877,8 @@ class MockEppLib(TestCase):
return self.mockCheckDomainCommand(_request, cleaned)
case commands.DeleteDomain:
return self.mockDeleteDomainCommands(_request, cleaned)
case commands.RenewDomain:
return self.mockRenewDomainCommand(_request, cleaned)
case _:
return MagicMock(res_data=[self.mockDataInfoHosts])
@ -890,6 +899,15 @@ class MockEppLib(TestCase):
raise RegistryError(code=ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION)
return None
def mockRenewDomainCommand(self, _request, cleaned):
if getattr(_request, "name", None) == "fake-error.gov":
raise RegistryError(code=ErrorCode.PARAMETER_VALUE_RANGE_ERROR)
else:
return MagicMock(
res_data=[self.mockRenewedDomainExpDate],
code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
)
def mockInfoDomainCommands(self, _request, cleaned):
request_name = getattr(_request, "name", None)

View file

@ -56,7 +56,7 @@ class TestDomainCache(MockEppLib):
self.assertFalse("avail" in domain._cache.keys())
# using a setter should clear the cache
domain.registry_expiration_date = datetime.date.today()
domain.dnssecdata = []
self.assertEquals(domain._cache, {})
# send should have been called only once
@ -1953,6 +1953,41 @@ class TestRegistrantDNSSEC(MockEppLib):
self.assertTrue(err.is_client_error() or err.is_session_error() or err.is_server_error())
class TestExpirationDate(MockEppLib):
"""User may renew expiration date by a number of units of time"""
def setUp(self):
"""
Domain exists in registry
"""
super().setUp()
# for the tests, need a domain in the ready state
self.domain, _ = Domain.objects.get_or_create(name="fake.gov", state=Domain.State.READY)
# for the test, need a domain that will raise an exception
self.domain_w_error, _ = Domain.objects.get_or_create(name="fake-error.gov", state=Domain.State.READY)
def tearDown(self):
Domain.objects.all().delete()
super().tearDown()
def test_expiration_date_setter_not_implemented(self):
"""assert that the setter for expiration date is not implemented and will raise error"""
with self.assertRaises(NotImplementedError):
self.domain.registry_expiration_date = datetime.date.today()
def test_renew_domain(self):
"""assert that the renew_domain sets new expiration date in cache and saves to registrar"""
self.domain.renew_domain()
test_date = datetime.date(2023, 5, 25)
self.assertEquals(self.domain._cache["ex_date"], test_date)
self.assertEquals(self.domain.expiration_date, test_date)
def test_renew_domain_error(self):
"""assert that the renew_domain raises an exception when registry raises error"""
with self.assertRaises(RegistryError):
self.domain_w_error.renew_domain()
class TestAnalystClientHold(MockEppLib):
"""Rule: Analysts may suspend or restore a domain by using client hold"""

View file

@ -16,9 +16,316 @@ from registrar.models import (
from django.core.management import call_command
from unittest.mock import patch
from registrar.models.contact import Contact
from .common import less_console_noise
class TestOrganizationMigration(TestCase):
def setUp(self):
"""Defines the file name of migration_json and the folder its contained in"""
self.test_data_file_location = "registrar/tests/data"
self.migration_json_filename = "test_migrationFilepaths.json"
def tearDown(self):
"""Deletes all DB objects related to migrations"""
# Delete domain information
Domain.objects.all().delete()
DomainInformation.objects.all().delete()
DomainInvitation.objects.all().delete()
TransitionDomain.objects.all().delete()
# Delete users
User.objects.all().delete()
UserDomainRole.objects.all().delete()
def run_load_domains(self):
"""
This method executes the load_transition_domain command.
It uses 'unittest.mock.patch' to mock the TerminalHelper.query_yes_no_exit method,
which is a user prompt in the terminal. The mock function always returns True,
allowing the test to proceed without manual user input.
The 'call_command' function from Django's management framework is then used to
execute the load_transition_domain command with the specified arguments.
"""
# noqa here because splitting this up makes it confusing.
# ES501
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa
return_value=True,
):
call_command(
"load_transition_domain",
self.migration_json_filename,
directory=self.test_data_file_location,
)
def run_transfer_domains(self):
"""
This method executes the transfer_transition_domains_to_domains command.
The 'call_command' function from Django's management framework is then used to
execute the load_transition_domain command with the specified arguments.
"""
call_command("transfer_transition_domains_to_domains")
def run_load_organization_data(self):
"""
This method executes the load_organization_data command.
It uses 'unittest.mock.patch' to mock the TerminalHelper.query_yes_no_exit method,
which is a user prompt in the terminal. The mock function always returns True,
allowing the test to proceed without manual user input.
The 'call_command' function from Django's management framework is then used to
execute the load_organization_data command with the specified arguments.
"""
# noqa here (E501) because splitting this up makes it
# confusing to read.
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa
return_value=True,
):
call_command(
"load_organization_data",
self.migration_json_filename,
directory=self.test_data_file_location,
)
def compare_tables(
self,
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
):
"""Does a diff between the transition_domain and the following tables:
domain, domain_information and the domain_invitation.
Verifies that the data loaded correctly."""
missing_domains = []
duplicate_domains = []
missing_domain_informations = []
for transition_domain in TransitionDomain.objects.all():
transition_domain_name = transition_domain.domain_name
# Check Domain table
matching_domains = Domain.objects.filter(name=transition_domain_name)
# Check Domain Information table
matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name)
if len(matching_domains) == 0:
missing_domains.append(transition_domain_name)
elif len(matching_domains) > 1:
duplicate_domains.append(transition_domain_name)
if len(matching_domain_informations) == 0:
missing_domain_informations.append(transition_domain_name)
total_missing_domains = len(missing_domains)
total_duplicate_domains = len(duplicate_domains)
total_missing_domain_informations = len(missing_domain_informations)
total_transition_domains = len(TransitionDomain.objects.all())
total_domains = len(Domain.objects.all())
total_domain_informations = len(DomainInformation.objects.all())
self.assertEqual(total_missing_domains, expected_missing_domains)
self.assertEqual(total_duplicate_domains, expected_duplicate_domains)
self.assertEqual(total_missing_domain_informations, expected_missing_domain_informations)
self.assertEqual(total_transition_domains, expected_total_transition_domains)
self.assertEqual(total_domains, expected_total_domains)
self.assertEqual(total_domain_informations, expected_total_domain_informations)
def test_load_organization_data_transition_domain(self):
"""
This test verifies the functionality of the load_organization_data method for TransitionDomain objects.
The test follows these steps:
1. Parses all existing data by running the load_domains and transfer_domains methods.
2. Attempts to add organization data to the parsed data by running the load_organization_data method.
3. Checks that the data has been loaded as expected.
The expected result is a set of TransitionDomain objects with specific attributes.
The test fetches the actual TransitionDomain objects from the database and compares them with the expected objects.
""" # noqa - E501 (harder to read)
# == First, parse all existing data == #
self.run_load_domains()
self.run_transfer_domains()
# == Second, try adding org data to it == #
self.run_load_organization_data()
# == Third, test that we've loaded data as we expect == #
transition_domains = TransitionDomain.objects.filter(domain_name="fakewebsite2.gov")
# Should return three objects (three unique emails)
self.assertEqual(transition_domains.count(), 3)
# Lets test the first one
transition = transition_domains.first()
expected_transition_domain = TransitionDomain(
username="alexandra.bobbitt5@test.com",
domain_name="fakewebsite2.gov",
status="on hold",
email_sent=True,
organization_type="Federal",
organization_name="Fanoodle",
federal_type="Executive",
federal_agency="Department of Commerce",
epp_creation_date=datetime.date(2004, 5, 7),
epp_expiration_date=datetime.date(2023, 9, 30),
first_name="Seline",
middle_name="testmiddle2",
last_name="Tower",
title=None,
email="stower3@answers.com",
phone="151-539-6028",
address_line="93001 Arizona Drive",
city="Columbus",
state_territory="Oh",
zipcode="43268",
)
expected_transition_domain.id = transition.id
self.assertEqual(transition, expected_transition_domain)
def test_load_organization_data_domain_information(self):
"""
This test verifies the functionality of the load_organization_data method.
The test follows these steps:
1. Parses all existing data by running the load_domains and transfer_domains methods.
2. Attempts to add organization data to the parsed data by running the load_organization_data method.
3. Checks that the data has been loaded as expected.
The expected result is a DomainInformation object with specific attributes.
The test fetches the actual DomainInformation object from the database
and compares it with the expected object.
"""
# == First, parse all existing data == #
self.run_load_domains()
self.run_transfer_domains()
# == Second, try adding org data to it == #
self.run_load_organization_data()
# == Third, test that we've loaded data as we expect == #
_domain = Domain.objects.filter(name="fakewebsite2.gov").get()
domain_information = DomainInformation.objects.filter(domain=_domain).get()
expected_creator = User.objects.filter(username="System").get()
expected_ao = Contact.objects.filter(first_name="Seline", middle_name="testmiddle2", last_name="Tower").get()
expected_domain_information = DomainInformation(
creator=expected_creator,
organization_type="federal",
federal_agency="Department of Commerce",
federal_type="executive",
organization_name="Fanoodle",
address_line1="93001 Arizona Drive",
city="Columbus",
state_territory="Oh",
zipcode="43268",
authorizing_official=expected_ao,
domain=_domain,
)
# Given that these are different objects, this needs to be set
expected_domain_information.id = domain_information.id
self.assertEqual(domain_information, expected_domain_information)
def test_load_organization_data_preserves_existing_data(self):
"""
This test verifies that the load_organization_data method does not overwrite existing data.
The test follows these steps:
1. Parses all existing data by running the load_domains and transfer_domains methods.
2. Adds pre-existing fake data to a DomainInformation object and saves it to the database.
3. Runs the load_organization_data method.
4. Checks that the pre-existing data in the DomainInformation object has not been overwritten.
The expected result is that the DomainInformation object retains its pre-existing data
after the load_organization_data method is run.
"""
# == First, parse all existing data == #
self.run_load_domains()
self.run_transfer_domains()
# == Second, try add prexisting fake data == #
_domain_old = Domain.objects.filter(name="fakewebsite2.gov").get()
domain_information_old = DomainInformation.objects.filter(domain=_domain_old).get()
domain_information_old.address_line1 = "93001 Galactic Way"
domain_information_old.city = "Olympus"
domain_information_old.state_territory = "MA"
domain_information_old.zipcode = "12345"
domain_information_old.save()
# == Third, try running the script == #
self.run_load_organization_data()
# == Fourth, test that no data is overwritten as we expect == #
_domain = Domain.objects.filter(name="fakewebsite2.gov").get()
domain_information = DomainInformation.objects.filter(domain=_domain).get()
expected_creator = User.objects.filter(username="System").get()
expected_ao = Contact.objects.filter(first_name="Seline", middle_name="testmiddle2", last_name="Tower").get()
expected_domain_information = DomainInformation(
creator=expected_creator,
organization_type="federal",
federal_agency="Department of Commerce",
federal_type="executive",
organization_name="Fanoodle",
address_line1="93001 Galactic Way",
city="Olympus",
state_territory="MA",
zipcode="12345",
authorizing_official=expected_ao,
domain=_domain,
)
# Given that these are different objects, this needs to be set
expected_domain_information.id = domain_information.id
self.assertEqual(domain_information, expected_domain_information)
def test_load_organization_data_integrity(self):
"""
This test verifies the data integrity after running the load_organization_data method.
The test follows these steps:
1. Parses all existing data by running the load_domains and transfer_domains methods.
2. Attempts to add organization data to the parsed data by running the load_organization_data method.
3. Checks that the data has not been corrupted by comparing the actual counts of objects in the database
with the expected counts.
The expected result is that the counts of objects in the database
match the expected counts, indicating that the data has not been corrupted.
"""
# First, parse all existing data
self.run_load_domains()
self.run_transfer_domains()
# Second, try adding org data to it
self.run_load_organization_data()
# Third, test that we didn't corrupt any data
expected_total_transition_domains = 9
expected_total_domains = 5
expected_total_domain_informations = 5
expected_missing_domains = 0
expected_duplicate_domains = 0
expected_missing_domain_informations = 0
self.compare_tables(
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
)
class TestMigrations(TestCase):
def setUp(self):
""" """
@ -41,11 +348,12 @@ class TestMigrations(TestCase):
self.migration_json_filename = "test_migrationFilepaths.json"
def tearDown(self):
super().tearDown()
# Delete domain information
TransitionDomain.objects.all().delete()
Domain.objects.all().delete()
DomainInvitation.objects.all().delete()
DomainInformation.objects.all().delete()
DomainInvitation.objects.all().delete()
# Delete users
User.objects.all().delete()

View file

@ -37,6 +37,7 @@ from registrar.models.utility.contact_error import ContactError
from ..forms import (
ContactForm,
AuthorizingOfficialContactForm,
DomainOrgNameAddressForm,
DomainAddUserForm,
DomainSecurityEmailForm,
@ -186,7 +187,7 @@ class DomainAuthorizingOfficialView(DomainFormBaseView):
model = Domain
template_name = "domain_authorizing_official.html"
context_object_name = "domain"
form_class = ContactForm
form_class = AuthorizingOfficialContactForm
def get_form_kwargs(self, *args, **kwargs):
"""Add domain_info.authorizing_official instance to make a bound form."""