Merge branch 'main' into za/1075-generate-csvs-to-publish

This commit is contained in:
zandercymatics 2023-11-29 08:38:53 -07:00
commit ff37e510f1
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
41 changed files with 1828 additions and 154 deletions

View file

@ -3,6 +3,9 @@ import os
from django.apps import apps
from django.views.decorators.http import require_http_methods
from django.http import FileResponse, HttpResponse, JsonResponse
from django.utils.safestring import mark_safe
from registrar.templatetags.url_helpers import public_site_url
import requests
from login_required import login_not_required
@ -18,8 +21,13 @@ DOMAIN_API_MESSAGES = {
" For example, if you want www.city.gov, you would enter “city”"
" (without the quotes).",
"extra_dots": "Enter the .gov domain you want without any periods.",
"unavailable": "That domain isnt available. Try entering another one."
" Contact us if you need help coming up with a domain.",
# message below is considered safe; no user input can be inserted into the message
# body; public_site_url() function reads from local app settings and therefore safe
"unavailable": mark_safe( # nosec
"That domain isnt available. "
"<a class='usa-link' href='{}' target='_blank'>"
"Read more about choosing your .gov domain.</a>".format(public_site_url("domains/choosing"))
),
"invalid": "Enter a domain using only letters, numbers, or hyphens (though we don't recommend using hyphens).",
"success": "That domain is available!",
"error": "Error finding domain availability.",

View file

@ -344,6 +344,12 @@ class UserDomainRoleAdmin(ListHeaderAdmin):
class DomainInvitationAdmin(ListHeaderAdmin):
"""Custom domain invitation admin class."""
class Meta:
model = models.DomainInvitation
fields = "__all__"
_meta = Meta()
# Columns
list_display = [
"email",
@ -356,6 +362,10 @@ class DomainInvitationAdmin(ListHeaderAdmin):
"email",
"domain__name",
]
# Filters
list_filter = ("status",)
search_help_text = "Search by email or domain."
# Mark the FSM field 'status' as readonly
@ -736,7 +746,7 @@ class DomainAdmin(ListHeaderAdmin):
search_help_text = "Search by domain name."
change_form_template = "django/admin/domain_change_form.html"
change_list_template = "django/admin/domain_change_list.html"
readonly_fields = ["state"]
readonly_fields = ["state", "expiration_date"]
def export_data_type(self, request):
# match the CSV example with all the fields

View file

@ -115,14 +115,14 @@ function inlineToast(el, id, style, msg) {
toast.className = `usa-alert usa-alert--${style} usa-alert--slim`;
toastBody.classList.add("usa-alert__body");
p.classList.add("usa-alert__text");
p.innerText = msg;
p.innerHTML = msg;
toastBody.appendChild(p);
toast.appendChild(toastBody);
el.parentNode.insertBefore(toast, el.nextSibling);
} else {
// update and show the existing message div
toast.className = `usa-alert usa-alert--${style} usa-alert--slim`;
toast.querySelector("div p").innerText = msg;
toast.querySelector("div p").innerHTML = msg;
makeVisible(toast);
}
} else {

View file

@ -299,11 +299,18 @@ SERVER_EMAIL = "root@get.gov"
# this can be restrictive because we have few external scripts
allowed_sources = ("'self'",)
CSP_DEFAULT_SRC = allowed_sources
# Most things fall back to default-src, but these two do not and should be
# Most things fall back to default-src, but the following do not and should be
# explicitly set
CSP_FRAME_ANCESTORS = allowed_sources
CSP_FORM_ACTION = allowed_sources
# Google analytics requires that we relax our otherwise
# strict CSP by allowing scripts to run from their domain
# and inline with a nonce, as well as allowing connections back to their domain
CSP_SCRIPT_SRC_ELEM = ["'self'", "https://www.googletagmanager.com/"]
CSP_CONNECT_SRC = ["'self'", "https://www.google-analytics.com/"]
CSP_INCLUDE_NONCE_IN = ["script-src-elem"]
# Cross-Origin Resource Sharing (CORS) configuration
# Sets clients that allow access control to manage.get.gov
# TODO: remove :8080 to see if we can have all localhost access
@ -619,6 +626,7 @@ SECURE_SSL_REDIRECT = True
ALLOWED_HOSTS = [
"getgov-stable.app.cloud.gov",
"getgov-staging.app.cloud.gov",
"getgov-development.app.cloud.gov",
"getgov-ky.app.cloud.gov",
"getgov-es.app.cloud.gov",
"getgov-nl.app.cloud.gov",

View file

@ -5,6 +5,7 @@ from .domain import (
DomainSecurityEmailForm,
DomainOrgNameAddressForm,
ContactForm,
AuthorizingOfficialContactForm,
DomainDnssecForm,
DomainDsdataFormset,
DomainDsdataForm,

View file

@ -8,6 +8,10 @@ from phonenumber_field.widgets import RegionalPhoneNumberWidget
from registrar.utility.errors import (
NameserverError,
NameserverErrorCodes as nsErrorCodes,
DsDataError,
DsDataErrorCodes,
SecurityEmailError,
SecurityEmailErrorCodes,
)
from ..models import Contact, DomainInformation, Domain
@ -16,6 +20,8 @@ from .common import (
DIGEST_TYPE_CHOICES,
)
import re
class DomainAddUserForm(forms.Form):
"""Form for adding a user to a domain."""
@ -61,6 +67,7 @@ class DomainNameserverForm(forms.Form):
ip = cleaned_data.get("ip", None)
# remove ANY spaces in the ip field
ip = ip.replace(" ", "")
cleaned_data["ip"] = ip
domain = cleaned_data.get("domain", "")
ip_list = self.extract_ip_list(ip)
@ -111,8 +118,34 @@ class DomainNameserverForm(forms.Form):
self.add_error("ip", str(e))
class BaseNameserverFormset(forms.BaseFormSet):
def clean(self):
"""
Check for duplicate entries in the formset.
"""
if any(self.errors):
# Don't bother validating the formset unless each form is valid on its own
return
data = []
duplicates = []
for form in self.forms:
if form.cleaned_data:
value = form.cleaned_data["server"]
if value in data:
form.add_error(
"server",
NameserverError(code=nsErrorCodes.DUPLICATE_HOST, nameserver=value),
)
duplicates.append(value)
else:
data.append(value)
NameserverFormset = formset_factory(
DomainNameserverForm,
formset=BaseNameserverFormset,
extra=1,
max_num=13,
validate_max=True,
@ -148,11 +181,51 @@ class ContactForm(forms.ModelForm):
for field_name in self.required:
self.fields[field_name].required = True
# Set custom error messages
self.fields["first_name"].error_messages = {"required": "Enter your first name / given name."}
self.fields["last_name"].error_messages = {"required": "Enter your last name / family name."}
self.fields["title"].error_messages = {
"required": "Enter your title or role in your organization (e.g., Chief Information Officer)"
}
self.fields["email"].error_messages = {
"required": "Enter your email address in the required format, like name@example.com."
}
self.fields["phone"].error_messages = {"required": "Enter your phone number."}
class AuthorizingOfficialContactForm(ContactForm):
"""Form for updating authorizing official contacts."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Set custom error messages
self.fields["first_name"].error_messages = {
"required": "Enter the first name / given name of your authorizing official."
}
self.fields["last_name"].error_messages = {
"required": "Enter the last name / family name of your authorizing official."
}
self.fields["title"].error_messages = {
"required": "Enter the title or role your authorizing official has in your \
organization (e.g., Chief Information Officer)."
}
self.fields["email"].error_messages = {
"required": "Enter an email address in the required format, like name@example.com."
}
self.fields["phone"].error_messages = {"required": "Enter a phone number for your authorizing official."}
class DomainSecurityEmailForm(forms.Form):
"""Form for adding or editing a security email to a domain."""
security_email = forms.EmailField(label="Security email", required=False)
security_email = forms.EmailField(
label="Security email",
required=False,
error_messages={
"invalid": str(SecurityEmailError(code=SecurityEmailErrorCodes.BAD_DATA)),
},
)
class DomainOrgNameAddressForm(forms.ModelForm):
@ -228,12 +301,22 @@ class DomainDnssecForm(forms.Form):
class DomainDsdataForm(forms.Form):
"""Form for adding or editing DNSSEC DS Data to a domain."""
def validate_hexadecimal(value):
"""
Tests that string matches all hexadecimal values.
Raise validation error to display error in form
if invalid characters entered
"""
if not re.match(r"^[0-9a-fA-F]+$", value):
raise forms.ValidationError(str(DsDataError(code=DsDataErrorCodes.INVALID_DIGEST_CHARS)))
key_tag = forms.IntegerField(
required=True,
label="Key tag",
validators=[
MinValueValidator(0, message="Value must be between 0 and 65535"),
MaxValueValidator(65535, message="Value must be between 0 and 65535"),
MinValueValidator(0, message=str(DsDataError(code=DsDataErrorCodes.INVALID_KEYTAG_SIZE))),
MaxValueValidator(65535, message=str(DsDataError(code=DsDataErrorCodes.INVALID_KEYTAG_SIZE))),
],
error_messages={"required": ("Key tag is required.")},
)
@ -251,15 +334,38 @@ class DomainDsdataForm(forms.Form):
label="Digest type",
coerce=int, # need to coerce into int so dsData objects can be compared
choices=[(None, "--Select--")] + DIGEST_TYPE_CHOICES, # type: ignore
error_messages={"required": ("Digest Type is required.")},
error_messages={"required": ("Digest type is required.")},
)
digest = forms.CharField(
required=True,
label="Digest",
error_messages={"required": ("Digest is required.")},
validators=[validate_hexadecimal],
error_messages={
"required": "Digest is required.",
},
)
def clean(self):
# clean is called from clean_forms, which is called from is_valid
# after clean_fields. it is used to determine form level errors.
# is_valid is typically called from view during a post
cleaned_data = super().clean()
digest_type = cleaned_data.get("digest_type", 0)
digest = cleaned_data.get("digest", "")
# validate length of digest depending on digest_type
if digest_type == 1 and len(digest) != 40:
self.add_error(
"digest",
DsDataError(code=DsDataErrorCodes.INVALID_DIGEST_SHA1),
)
elif digest_type == 2 and len(digest) != 64:
self.add_error(
"digest",
DsDataError(code=DsDataErrorCodes.INVALID_DIGEST_SHA256),
)
return cleaned_data
DomainDsdataFormset = formset_factory(
DomainDsdataForm,

View file

@ -0,0 +1,283 @@
"""Data migration: Load organization data for TransitionDomain and DomainInformation objects"""
import argparse
import json
import logging
import os
from django.core.management import BaseCommand
from registrar.management.commands.utility.extra_transition_domain_helper import OrganizationDataLoader
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from registrar.management.commands.utility.transition_domain_arguments import TransitionDomainArguments
from registrar.models import TransitionDomain, DomainInformation
from django.core.paginator import Paginator
from typing import List
from registrar.models.domain import Domain
from registrar.management.commands.utility.load_organization_error import (
LoadOrganizationError,
LoadOrganizationErrorCodes,
)
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Load organization data on TransitionDomain and DomainInformation objects"
def __init__(self):
super().__init__()
self.domain_information_to_update: List[DomainInformation] = []
# Stores the domain_name for logging purposes
self.domains_failed_to_update: List[str] = []
self.domains_skipped: List[str] = []
self.changed_fields = [
"address_line1",
"city",
"state_territory",
"zipcode",
]
def add_arguments(self, parser):
"""Add command line arguments."""
parser.add_argument(
"migration_json_filename",
help=("A JSON file that holds the location and filenames" "of all the data files used for migrations"),
)
parser.add_argument("--sep", default="|", help="Delimiter character")
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
parser.add_argument("--directory", default="migrationdata", help="Desired directory")
def handle(self, migration_json_filename, **options):
"""Load organization address data into the TransitionDomain
and DomainInformation tables by using the organization adhoc file and domain_additional file"""
# Parse JSON file
options = self.load_json_settings(options, migration_json_filename)
org_args = TransitionDomainArguments(**options)
# Will sys.exit() when prompt is "n"
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
==Master data file==
domain_additional_filename: {org_args.domain_additional_filename}
==Organization data==
organization_adhoc_filename: {org_args.organization_adhoc_filename}
==Containing directory==
directory: {org_args.directory}
""",
prompt_title="Do you wish to load organization data for TransitionDomains?",
)
org_load_helper = OrganizationDataLoader(org_args)
transition_domains = org_load_helper.update_organization_data_for_all()
# Reprompt the user to reinspect before updating DomainInformation
# Will sys.exit() when prompt is "n"
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
==Master data file==
domain_additional_filename: {org_args.domain_additional_filename}
==Organization name information==
organization_adhoc_filename: {org_args.organization_adhoc_filename}
==Containing directory==
directory: {org_args.directory}
==Proposed Changes==
Number of DomainInformation objects to (potentially) change: {len(transition_domains)}
For each DomainInformation, modify the following fields: {self.changed_fields}
""",
prompt_title="Do you wish to update organization address data for DomainInformation as well?",
)
logger.info(
f"{TerminalColors.MAGENTA}"
"Preparing to load organization data onto DomainInformation tables..."
f"{TerminalColors.ENDC}"
)
self.prepare_update_domain_information(transition_domains, org_args.debug)
logger.info(f"{TerminalColors.MAGENTA}" f"Beginning mass DomainInformation update..." f"{TerminalColors.ENDC}")
self.bulk_update_domain_information(org_args.debug)
def load_json_settings(self, options, migration_json_filename):
"""Parses options from the given JSON file."""
json_filepath = os.path.join(options["directory"], migration_json_filename)
# If a JSON was provided, use its values instead of defaults.
with open(json_filepath, "r") as jsonFile:
# load JSON object as a dictionary
try:
data = json.load(jsonFile)
skipped_fields = ["domain_additional_filename", "organization_adhoc_filename"]
# Iterate over the data from the JSON file. Skip any unused values.
for key, value in data.items():
if value is not None and value.strip() != "":
# If any key in skipped_fields has a value, then
# we override what is specified in the JSON.
if options not in skipped_fields:
options[key] = value
except Exception as err:
logger.error(
f"{TerminalColors.FAIL}"
"There was an error loading "
"the JSON responsible for providing filepaths."
f"{TerminalColors.ENDC}"
)
raise err
return options
def prepare_update_domain_information(self, target_transition_domains: List[TransitionDomain], debug):
"""Returns an array of DomainInformation objects with updated organization data."""
if len(target_transition_domains) == 0:
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.EMPTY_TRANSITION_DOMAIN_TABLE)
# Grab each TransitionDomain we want to change.
transition_domains = TransitionDomain.objects.filter(
username__in=[item.username for item in target_transition_domains],
domain_name__in=[item.domain_name for item in target_transition_domains],
)
# This indicates that some form of data corruption happened.
if len(target_transition_domains) != len(transition_domains):
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.TRANSITION_DOMAINS_NOT_FOUND)
# Maps TransitionDomain <--> DomainInformation.
# If any related organization fields have been updated,
# we can assume that they modified this information themselves - thus we should not update it.
domain_informations = DomainInformation.objects.filter(
domain__name__in=[td.domain_name for td in transition_domains],
address_line1__isnull=True,
city__isnull=True,
state_territory__isnull=True,
zipcode__isnull=True,
)
filtered_domain_informations_dict = {di.domain.name: di for di in domain_informations if di.domain is not None}
# === Create DomainInformation objects === #
for item in transition_domains:
self.map_transition_domain_to_domain_information(item, filtered_domain_informations_dict, debug)
# === Log results and return data === #
if len(self.domains_failed_to_update) > 0:
logger.error(
f"""{TerminalColors.FAIL}
Failed to update. An exception was encountered on the following Domains: {self.domains_failed_to_update}
{TerminalColors.ENDC}"""
)
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.UPDATE_DOMAIN_INFO_FAILED)
if debug:
logger.info(f"Updating these DomainInformations: {[item for item in self.domain_information_to_update]}")
if len(self.domains_skipped) > 0:
logger.info(f"Skipped these fields: {self.domains_skipped}")
logger.info(
f"{TerminalColors.YELLOW}"
f"Skipped updating {len(self.domains_skipped)} fields. User-supplied data exists, or there is no data."
f"{TerminalColors.ENDC}"
)
logger.info(f"Ready to update {len(self.domain_information_to_update)} DomainInformations.")
return self.domain_information_to_update
def bulk_update_domain_information(self, debug):
"""Performs a bulk_update operation on a list of DomainInformation objects"""
# Create a Paginator object. Bulk_update on the full dataset
# is too memory intensive for our current app config, so we can chunk this data instead.
batch_size = 1000
paginator = Paginator(self.domain_information_to_update, batch_size)
for page_num in paginator.page_range:
page = paginator.page(page_num)
DomainInformation.objects.bulk_update(page.object_list, self.changed_fields)
if debug:
logger.info(f"Updated these DomainInformations: {[item for item in self.domain_information_to_update]}")
logger.info(
f"{TerminalColors.OKGREEN}"
f"Updated {len(self.domain_information_to_update)} DomainInformations."
f"{TerminalColors.ENDC}"
)
def map_transition_domain_to_domain_information(self, item, domain_informations_dict, debug):
"""Attempts to return a DomainInformation object based on values from TransitionDomain.
Any domains which cannot be updated will be stored in an array.
"""
does_not_exist: bool = self.is_domain_name_missing(item, domain_informations_dict)
all_fields_are_none: bool = self.is_organization_data_missing(item)
if does_not_exist:
self.handle_if_domain_name_missing(item.domain_name)
elif all_fields_are_none:
logger.info(
f"{TerminalColors.YELLOW}"
f"Domain {item.domain_name} has no Organization Data. Cannot update."
f"{TerminalColors.ENDC}"
)
self.domains_skipped.append(item.domain_name)
else:
# Based on the current domain, grab the right DomainInformation object.
current_domain_information = domain_informations_dict[item.domain_name]
if current_domain_information.domain is None or current_domain_information.domain.name is None:
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.DOMAIN_NAME_WAS_NONE)
# Update fields
current_domain_information.address_line1 = item.address_line
current_domain_information.city = item.city
current_domain_information.state_territory = item.state_territory
current_domain_information.zipcode = item.zipcode
self.domain_information_to_update.append(current_domain_information)
if debug:
logger.info(f"Updated {current_domain_information.domain.name}...")
def is_domain_name_missing(self, item: TransitionDomain, domain_informations_dict):
"""Checks if domain_name is in the supplied dictionary"""
return item.domain_name not in domain_informations_dict
def is_organization_data_missing(self, item: TransitionDomain):
"""Checks if all desired Organization fields to update are none"""
fields = [item.address_line, item.city, item.state_territory, item.zipcode]
return all(field is None for field in fields)
def handle_if_domain_name_missing(self, domain_name):
"""
Infers what to log if we can't find a domain_name and updates the relevant lists.
This function performs the following checks:
1. If the domain does not exist, it logs an error and adds the domain name to the `domains_failed_to_update` list.
2. If the domain was updated by a user, it logs an info message and adds the domain name to the `domains_skipped` list.
3. If there are duplicate domains, it logs an error and adds the domain name to the `domains_failed_to_update` list.
Args:
domain_name (str): The name of the domain to check.
""" # noqa - E501 (harder to read)
domains = Domain.objects.filter(name=domain_name)
if domains.count() == 0:
logger.error(f"Could not add {domain_name}. Domain does not exist.")
self.domains_failed_to_update.append(domain_name)
elif domains.count() == 1:
logger.info(
f"{TerminalColors.YELLOW}"
f"Domain {domain_name} was updated by a user. Cannot update."
f"{TerminalColors.ENDC}"
)
self.domains_skipped.append(domain_name)
else:
logger.error(f"Could not add {domain_name}. Duplicate domains exist.")
self.domains_failed_to_update.append(domain_name)

View file

@ -9,9 +9,13 @@ import logging
import os
import sys
from typing import Dict
from typing import Dict, List
from django.core.paginator import Paginator
from registrar.models.transition_domain import TransitionDomain
from registrar.management.commands.utility.load_organization_error import (
LoadOrganizationError,
LoadOrganizationErrorCodes,
)
from .epp_data_containers import (
AgencyAdhoc,
@ -752,6 +756,195 @@ class FileDataHolder:
return (full_filename, can_infer)
class OrganizationDataLoader:
"""Saves organization data onto Transition Domains. Handles file parsing."""
def __init__(self, options: TransitionDomainArguments):
self.debug = options.debug
# We want to data from the domain_additional file and the organization_adhoc file
options.pattern_map_params = [
(
EnumFilenames.DOMAIN_ADDITIONAL,
options.domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
options.organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
]
# Reads and parses organization data
self.parsed_data = ExtraTransitionDomain(options)
# options.infer_filenames will always be false when not SETTING.DEBUG
self.parsed_data.parse_all_files(options.infer_filenames)
self.tds_to_update: List[TransitionDomain] = []
def update_organization_data_for_all(self):
"""Updates org address data for all TransitionDomains"""
all_transition_domains = TransitionDomain.objects.all()
if len(all_transition_domains) == 0:
raise LoadOrganizationError(code=LoadOrganizationErrorCodes.EMPTY_TRANSITION_DOMAIN_TABLE)
self.prepare_transition_domains(all_transition_domains)
logger.info(f"{TerminalColors.MAGENTA}" "Beginning mass TransitionDomain update..." f"{TerminalColors.ENDC}")
self.bulk_update_transition_domains(self.tds_to_update)
return self.tds_to_update
def prepare_transition_domains(self, transition_domains):
"""Parses org data for each transition domain,
then appends it to the tds_to_update list"""
for item in transition_domains:
updated = self.parse_org_data(item.domain_name, item)
self.tds_to_update.append(updated)
if self.debug:
logger.info(
f"""{TerminalColors.OKCYAN}
Successfully updated:
{item.display_transition_domain()}
{TerminalColors.ENDC}"""
)
if self.debug:
logger.info(f"Updating the following: {[item for item in self.tds_to_update]}")
logger.info(
f"""{TerminalColors.MAGENTA}
Ready to update {len(self.tds_to_update)} TransitionDomains.
{TerminalColors.ENDC}"""
)
def bulk_update_transition_domains(self, update_list):
changed_fields = [
"address_line",
"city",
"state_territory",
"zipcode",
]
batch_size = 1000
# Create a Paginator object. Bulk_update on the full dataset
# is too memory intensive for our current app config, so we can chunk this data instead.
paginator = Paginator(update_list, batch_size)
for page_num in paginator.page_range:
page = paginator.page(page_num)
TransitionDomain.objects.bulk_update(page.object_list, changed_fields)
if self.debug:
logger.info(f"Updated the following: {[item for item in self.tds_to_update]}")
logger.info(
f"{TerminalColors.OKGREEN}" f"Updated {len(self.tds_to_update)} TransitionDomains." f"{TerminalColors.ENDC}"
)
def parse_org_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain:
"""Grabs organization_name from the parsed files and associates it
with a transition_domain object, then updates that transition domain object and returns it"""
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
org_info = self.get_org_info(domain_name)
if org_info is None:
logger.error(f"Could not add organization data on {domain_name}, no data exists.")
return transition_domain
# Add street info
transition_domain.address_line = org_info.orgstreet
transition_domain.city = org_info.orgcity
transition_domain.state_territory = org_info.orgstate
transition_domain.zipcode = org_info.orgzip
if self.debug:
# Log what happened to each field. The first value
# is the field name that was updated, second is the value
changed_fields = [
("address_line", transition_domain.address_line),
("city", transition_domain.city),
("state_territory", transition_domain.state_territory),
("zipcode", transition_domain.zipcode),
]
self.log_add_or_changed_values(changed_fields, domain_name)
return transition_domain
def get_org_info(self, domain_name) -> OrganizationAdhoc | None:
"""Maps an id given in get_domain_data to a organization_adhoc
record which has its corresponding definition"""
# Get a row in the domain_additional file. The id is the domain_name.
domain_additional_row = self.retrieve_row_by_id(EnumFilenames.DOMAIN_ADDITIONAL, domain_name)
if domain_additional_row is None:
return None
# Get a row in the organization_adhoc file. The id is the orgid in domain_additional_row.
org_row = self.retrieve_row_by_id(EnumFilenames.ORGANIZATION_ADHOC, domain_additional_row.orgid)
return org_row
def retrieve_row_by_id(self, file_type: EnumFilenames, desired_id):
"""Returns a field in a dictionary based off the type and id.
vars:
file_type: (constant) EnumFilenames -> Which data file to target.
An example would be `EnumFilenames.DOMAIN_ADHOC`.
desired_id: str -> Which id you want to search on.
An example would be `"12"` or `"igorville.gov"`
"""
# Grabs a dict associated with the file_type.
# For example, EnumFilenames.DOMAIN_ADDITIONAL would map to
# whatever data exists on the domain_additional file.
desired_file = self.parsed_data.file_data.get(file_type)
if desired_file is None:
logger.error(f"Type {file_type} does not exist")
return None
# This is essentially a dictionary of rows.
data_in_file = desired_file.data
# Get a row in the given file, based on an id.
# For instance, "igorville.gov" in domain_additional.
row_in_file = data_in_file.get(desired_id)
if row_in_file is None:
logger.error(f"Id {desired_id} does not exist for {file_type.value[0]}")
return row_in_file
def log_add_or_changed_values(self, values_to_check, domain_name):
"""Iterates through a list of fields, and determines if we should log
if the field was added or if the field was updated.
A field is "updated" when it is not None or not "".
A field is "created" when it is either of those things.
"""
for field_name, value in values_to_check:
str_exists = value is not None and value.strip() != ""
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
field_name,
value,
domain_name,
str_exists,
)
def _add_or_change_message(self, field_name, changed_value, domain_name, is_update=False):
"""Creates a log instance when a property
is successfully changed on a given TransitionDomain."""
if not is_update:
logger.info(f"Added {field_name} as '{changed_value}' on {domain_name}")
else:
logger.warning(f"Updated existing {field_name} to '{changed_value}' on {domain_name}")
class ExtraTransitionDomain:
"""Helper class to aid in storing TransitionDomain data spread across
multiple files."""
@ -775,52 +968,47 @@ class ExtraTransitionDomain:
# metadata about each file and associate it with an enum.
# That way if we want the data located at the agency_adhoc file,
# we can just call EnumFilenames.AGENCY_ADHOC.
pattern_map_params = [
(
EnumFilenames.AGENCY_ADHOC,
options.agency_adhoc_filename,
AgencyAdhoc,
"agencyid",
),
(
EnumFilenames.DOMAIN_ADDITIONAL,
options.domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.DOMAIN_ESCROW,
options.domain_escrow_filename,
DomainEscrow,
"domainname",
),
(
EnumFilenames.DOMAIN_ADHOC,
options.domain_adhoc_filename,
DomainTypeAdhoc,
"domaintypeid",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
options.organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
]
if options.pattern_map_params is None or options.pattern_map_params == []:
options.pattern_map_params = [
(
EnumFilenames.AGENCY_ADHOC,
options.agency_adhoc_filename,
AgencyAdhoc,
"agencyid",
),
(
EnumFilenames.DOMAIN_ADDITIONAL,
options.domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.DOMAIN_ESCROW,
options.domain_escrow_filename,
DomainEscrow,
"domainname",
),
(
EnumFilenames.DOMAIN_ADHOC,
options.domain_adhoc_filename,
DomainTypeAdhoc,
"domaintypeid",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
options.organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
]
self.file_data = self.populate_file_data(pattern_map_params)
self.file_data = self.populate_file_data(options.pattern_map_params)
# TODO - revise comment
def populate_file_data(self, pattern_map_params):

View file

@ -0,0 +1,39 @@
from enum import IntEnum
class LoadOrganizationErrorCodes(IntEnum):
"""Used when running the load_organization_data script
Overview of error codes:
- 1 TRANSITION_DOMAINS_NOT_FOUND
- 2 UPDATE_DOMAIN_INFO_FAILED
- 3 EMPTY_TRANSITION_DOMAIN_TABLE
"""
TRANSITION_DOMAINS_NOT_FOUND = 1
UPDATE_DOMAIN_INFO_FAILED = 2
EMPTY_TRANSITION_DOMAIN_TABLE = 3
DOMAIN_NAME_WAS_NONE = 4
class LoadOrganizationError(Exception):
"""
Error class used in the load_organization_data script
"""
_error_mapping = {
LoadOrganizationErrorCodes.TRANSITION_DOMAINS_NOT_FOUND: (
"Could not find all desired TransitionDomains. " "(Possible data corruption?)"
),
LoadOrganizationErrorCodes.UPDATE_DOMAIN_INFO_FAILED: "Failed to update DomainInformation",
LoadOrganizationErrorCodes.EMPTY_TRANSITION_DOMAIN_TABLE: "No TransitionDomains exist. Cannot update.",
LoadOrganizationErrorCodes.DOMAIN_NAME_WAS_NONE: "DomainInformation was updated, but domain was None",
}
def __init__(self, *args, code=None, **kwargs):
super().__init__(*args, **kwargs)
self.code = code
if self.code in self._error_mapping:
self.message = self._error_mapping.get(self.code)
def __str__(self):
return f"{self.message}"

View file

@ -18,7 +18,7 @@ class TransitionDomainArguments:
# Maintains an internal kwargs list and sets values
# that match the class definition.
def __init__(self, **kwargs):
self.kwargs = kwargs
self.pattern_map_params = kwargs.get("pattern_map_params", [])
for k, v in kwargs.items():
if hasattr(self, k):
setattr(self, k, v)
@ -36,13 +36,13 @@ class TransitionDomainArguments:
limitParse: Optional[int] = field(default=None, repr=True)
# Filenames #
# = Adhocs =#
# = Adhocs = #
agency_adhoc_filename: Optional[str] = field(default=EnumFilenames.AGENCY_ADHOC.value[1], repr=True)
domain_adhoc_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADHOC.value[1], repr=True)
organization_adhoc_filename: Optional[str] = field(default=EnumFilenames.ORGANIZATION_ADHOC.value[1], repr=True)
authority_adhoc_filename: Optional[str] = field(default=EnumFilenames.AUTHORITY_ADHOC.value[1], repr=True)
# = Data files =#
# = Data files = #
domain_escrow_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ESCROW.value[1], repr=True)
domain_additional_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADDITIONAL.value[1], repr=True)
domain_contacts_filename: Optional[str] = field(default=None, repr=True)

View file

@ -0,0 +1,32 @@
# Generated by Django 4.2.7 on 2023-11-16 19:56
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("registrar", "0046_transitiondomain_email_transitiondomain_first_name_and_more"),
]
operations = [
migrations.AddField(
model_name="transitiondomain",
name="address_line",
field=models.TextField(blank=True, help_text="Street address", null=True),
),
migrations.AddField(
model_name="transitiondomain",
name="city",
field=models.TextField(blank=True, help_text="City", null=True),
),
migrations.AddField(
model_name="transitiondomain",
name="state_territory",
field=models.CharField(blank=True, help_text="State, territory, or military post", max_length=2, null=True),
),
migrations.AddField(
model_name="transitiondomain",
name="zipcode",
field=models.CharField(blank=True, db_index=True, help_text="Zip code", max_length=10, null=True),
),
]

View file

@ -211,12 +211,54 @@ class Domain(TimeStampedModel, DomainHelper):
@Cache
def registry_expiration_date(self) -> date:
"""Get or set the `ex_date` element from the registry."""
return self._get_property("ex_date")
"""Get or set the `ex_date` element from the registry.
Additionally, _get_property updates the expiration date in the registrar"""
try:
return self._get_property("ex_date")
except Exception as e:
# exception raised during the save to registrar
logger.error(f"error updating expiration date in registrar: {e}")
raise (e)
@registry_expiration_date.setter # type: ignore
def registry_expiration_date(self, ex_date: date):
pass
"""
Direct setting of the expiration date in the registry is not implemented.
To update the expiration date, use renew_domain method."""
raise NotImplementedError()
def renew_domain(self, length: int = 1, unit: epp.Unit = epp.Unit.YEAR):
"""
Renew the domain to a length and unit of time relative to the current
expiration date.
Default length and unit of time are 1 year.
"""
# if no expiration date from registry, set to today
try:
cur_exp_date = self.registry_expiration_date
except KeyError:
logger.warning("current expiration date not set; setting to today")
cur_exp_date = date.today()
# create RenewDomain request
request = commands.RenewDomain(name=self.name, cur_exp_date=cur_exp_date, period=epp.Period(length, unit))
try:
# update expiration date in registry, and set the updated
# expiration date in the registrar, and in the cache
self._cache["ex_date"] = registry.send(request, cleaned=True).res_data[0].ex_date
self.expiration_date = self._cache["ex_date"]
self.save()
except RegistryError as err:
# if registry error occurs, log the error, and raise it as well
logger.error(f"registry error renewing domain: {err}")
raise (err)
except Exception as e:
# exception raised during the save to registrar
logger.error(f"error updating expiration date in registrar: {e}")
raise (e)
@Cache
def password(self) -> str:
@ -598,7 +640,7 @@ class Domain(TimeStampedModel, DomainHelper):
# if unable to update domain raise error and stop
if responseCode != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
raise NameserverError(code=nsErrorCodes.UNABLE_TO_UPDATE_DOMAIN)
raise NameserverError(code=nsErrorCodes.BAD_DATA)
successTotalNameservers = len(oldNameservers) - deleteCount + addToDomainCount
@ -836,6 +878,14 @@ class Domain(TimeStampedModel, DomainHelper):
"""
return self.state == self.State.READY
def is_editable(self) -> bool:
"""domain is editable unless state is on hold or deleted"""
return self.state in [
self.State.UNKNOWN,
self.State.DNS_NEEDED,
self.State.READY,
]
def transfer(self):
"""Going somewhere. Not implemented."""
raise NotImplementedError()
@ -1144,7 +1194,7 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error(e)
logger.error(e.code)
raise e
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST:
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST and self.state != Domain.State.DELETED:
# avoid infinite loop
already_tried_to_create = True
self.dns_needed_from_unknown()
@ -1558,6 +1608,12 @@ class Domain(TimeStampedModel, DomainHelper):
if old_cache_contacts is not None:
cleaned["contacts"] = old_cache_contacts
# if expiration date from registry does not match what is in db,
# update the db
if "ex_date" in cleaned and cleaned["ex_date"] != self.expiration_date:
self.expiration_date = cleaned["ex_date"]
self.save()
self._cache = cleaned
except RegistryError as e:

View file

@ -105,6 +105,29 @@ class TransitionDomain(TimeStampedModel):
blank=True,
help_text="Phone",
)
address_line = models.TextField(
null=True,
blank=True,
help_text="Street address",
)
city = models.TextField(
null=True,
blank=True,
help_text="City",
)
state_territory = models.CharField(
max_length=2,
null=True,
blank=True,
help_text="State, territory, or military post",
)
zipcode = models.CharField(
max_length=10,
null=True,
blank=True,
help_text="Zip code",
db_index=True,
)
def __str__(self):
return f"{self.username}, {self.domain_name}"
@ -128,4 +151,8 @@ class TransitionDomain(TimeStampedModel):
f"last_name: {self.last_name}, \n"
f"email: {self.email}, \n"
f"phone: {self.phone}, \n"
f"address_line: {self.address_line}, \n"
f"city: {self.city}, \n"
f"state_territory: {self.state_territory}, \n"
f"zipcode: {self.zipcode}, \n"
)

View file

@ -6,7 +6,7 @@
{% if IS_PRODUCTION %}
<!-- Google tag (gtag.js) -->
<script async src="https://www.googletagmanager.com/gtag/js?id=G-PZ5QSP6QPL"></script>
<script>
<script type="text/javascript" nonce="{{request.csp_nonce}}">
window.dataLayer = window.dataLayer || []; function gtag(){dataLayer.push(arguments);} gtag('js', new Date()); gtag('config', 'G-PZ5QSP6QPL');
</script>
{% endif %}

View file

@ -4,10 +4,10 @@
{% block title %}Add another user{% endblock %}
{% block domain_content %}
<h1>Add another user</h1>
<h1>Add a domain manager</h1>
<p>You can add another user to help manage your domain. They will need to sign
into the .gov registrar with their Login.gov account.
in to the .gov registrar with their Login.gov account.
</p>
<form class="usa-form usa-form--large" method="post" novalidate>

View file

@ -29,30 +29,34 @@
{% url 'domain-dns-nameservers' pk=domain.id as url %}
{% if domain.nameservers|length > 0 %}
{% include "includes/summary_item.html" with title='DNS name servers' domains='true' value=domain.nameservers list='true' edit_link=url %}
{% include "includes/summary_item.html" with title='DNS name servers' domains='true' value=domain.nameservers list='true' edit_link=url editable=domain.is_editable %}
{% else %}
{% if domain.is_editable %}
<h2 class="margin-top-neg-1"> DNS name servers </h2>
<p> No DNS name servers have been added yet. Before your domain can be used well need information about your domain name servers.</p>
<a class="usa-button margin-bottom-1" href="{{url}}"> Add DNS name servers </a>
<a class="usa-button margin-bottom-1" href="{{url}}"> Add DNS name servers </a>
{% else %}
{% include "includes/summary_item.html" with title='DNS name servers' domains='true' value='' edit_link=url editable=domain.is_editable %}
{% endif %}
{% endif %}
{% url 'domain-org-name-address' pk=domain.id as url %}
{% include "includes/summary_item.html" with title='Organization name and mailing address' value=domain.domain_info address='true' edit_link=url %}
{% include "includes/summary_item.html" with title='Organization name and mailing address' value=domain.domain_info address='true' edit_link=url editable=domain.is_editable %}
{% url 'domain-authorizing-official' pk=domain.id as url %}
{% include "includes/summary_item.html" with title='Authorizing official' value=domain.domain_info.authorizing_official contact='true' edit_link=url %}
{% include "includes/summary_item.html" with title='Authorizing official' value=domain.domain_info.authorizing_official contact='true' edit_link=url editable=domain.is_editable %}
{% url 'domain-your-contact-information' pk=domain.id as url %}
{% include "includes/summary_item.html" with title='Your contact information' value=request.user.contact contact='true' edit_link=url %}
{% include "includes/summary_item.html" with title='Your contact information' value=request.user.contact contact='true' edit_link=url editable=domain.is_editable %}
{% url 'domain-security-email' pk=domain.id as url %}
{% if security_email is not None and security_email != default_security_email%}
{% include "includes/summary_item.html" with title='Security email' value=security_email edit_link=url %}
{% include "includes/summary_item.html" with title='Security email' value=security_email edit_link=url editable=domain.is_editable %}
{% else %}
{% include "includes/summary_item.html" with title='Security email' value='None provided' edit_link=url %}
{% include "includes/summary_item.html" with title='Security email' value='None provided' edit_link=url editable=domain.is_editable %}
{% endif %}
{% url 'domain-users' pk=domain.id as url %}
{% include "includes/summary_item.html" with title='Domain managers' users='true' list=True value=domain.permissions.all edit_link=url %}
{% include "includes/summary_item.html" with title='Domain managers' users='true' list=True value=domain.permissions.all edit_link=url editable=domain.is_editable %}
</div>
{% endblock %} {# domain_content #}

View file

@ -114,7 +114,7 @@
aria-describedby="Your DNSSEC records will be deleted from the registry."
data-force-action
>
{% include 'includes/modal.html' with cancel_button_resets_ds_form=True modal_heading="Warning: You are about to delete all DS records on your domain" modal_description="To fully disable DNSSEC: In addition to deleting your DS records here youll also need to delete the DS records at your DNS host. To avoid causing your domain to appear offline you should wait to delete your DS records at your DNS host until the Time to Live (TTL) expires. This is often less than 24 hours, but confirm with your provider." modal_button=modal_button|safe %}
{% include 'includes/modal.html' with cancel_button_resets_ds_form=True modal_heading="Warning: You are about to remove all DS records on your domain" modal_description="To fully disable DNSSEC: In addition to removing your DS records here youll also need to delete the DS records at your DNS host. To avoid causing your domain to appear offline you should wait to delete your DS records at your DNS host until the Time to Live (TTL) expires. This is often less than 24 hours, but confirm with your provider." modal_button=modal_button|safe %}
</div>
{% endblock %} {# domain_content #}

View file

@ -12,11 +12,13 @@
</a>
</li>
{% if domain.is_editable %}
<li class="usa-sidenav__item">
{% url 'domain-dns' pk=domain.id as url %}
<a href="{{ url }}" {% if request.path|startswith:url %}class="usa-current"{% endif %}>
DNS
</a>
{% if request.path|startswith:url %}
<ul class="usa-sidenav__sublist">
<li class="usa-sidenav__item">
{% url 'domain-dns-nameservers' pk=domain.id as url %}
@ -34,20 +36,23 @@
>
DNSSEC
</a>
{% if domain.dnssecdata is not None or request.path|startswith:url and request.path|endswith:'dsdata' %}
<ul class="usa-sidenav__sublist">
<li class="usa-sidenav__item">
{% url 'domain-dns-dnssec-dsdata' pk=domain.id as url %}
<a href="{{ url }}"
{% if request.path == url %}class="usa-current"{% endif %}
>
DS Data
</a>
</li>
</ul>
{% if request.path|startswith:url %}
{% if domain.dnssecdata is not None or request.path|startswith:url and request.path|endswith:'dsdata' %}
<ul class="usa-sidenav__sublist">
<li class="usa-sidenav__item">
{% url 'domain-dns-dnssec-dsdata' pk=domain.id as url %}
<a href="{{ url }}"
{% if request.path == url %}class="usa-current"{% endif %}
>
DS Data
</a>
</li>
</ul>
{% endif %}
{% endif %}
</li>
</ul>
{% endif %}
</li>
<li class="usa-sidenav__item">
@ -94,6 +99,7 @@
Domain managers
</a>
</li>
{% endif %}
</ul>
</nav>
</div>

View file

@ -25,8 +25,8 @@
{% if domain.permissions %}
<section class="section--outlined">
<table class="usa-table usa-table--borderless usa-table--stacked dotgov-table--stacked dotgov-table">
<h2 class> Active users </h2>
<caption class="sr-only">Domain users</caption>
<h2 class> Domain managers </h2>
<caption class="sr-only">Domain managers</caption>
<thead>
<tr>
<th data-sortable scope="col" role="columnheader">Email</th>
@ -53,7 +53,7 @@
<a class="usa-button usa-button--unstyled" href="{% url 'domain-users-add' pk=domain.id %}">
<svg class="usa-icon" aria-hidden="true" focusable="false" role="img" width="24" height="24">
<use xlink:href="{%static 'img/sprite.svg'%}#add_circle"></use>
</svg><span class="margin-left-05">Add another user</span>
</svg><span class="margin-left-05">Add a domain manager</span>
</a>
</section>

View file

@ -85,7 +85,7 @@
{% endif %}
</div>
{% if edit_link %}
{% if editable and edit_link %}
<div class="text-right">
<a
href="{{ edit_link }}"

View file

@ -556,6 +556,7 @@ class MockEppLib(TestCase):
avail=...,
addrs=...,
registrant=...,
ex_date=...,
):
self.auth_info = auth_info
self.cr_date = cr_date
@ -565,6 +566,7 @@ class MockEppLib(TestCase):
self.avail = avail # use for CheckDomain
self.addrs = addrs
self.registrant = registrant
self.ex_date = ex_date
def dummyInfoContactResultData(
self,
@ -615,6 +617,7 @@ class MockEppLib(TestCase):
common.Status(state="serverTransferProhibited", description="", lang="en"),
common.Status(state="inactive", description="", lang="en"),
],
ex_date=datetime.date(2023, 5, 25),
)
mockDataInfoContact = mockDataInfoDomain.dummyInfoContactResultData(
"123", "123@mail.gov", datetime.datetime(2023, 5, 25, 19, 45, 35), "lastPw"
@ -811,6 +814,11 @@ class MockEppLib(TestCase):
],
)
mockRenewedDomainExpDate = fakedEppObject(
"fake.gov",
ex_date=datetime.date(2023, 5, 25),
)
def _mockDomainName(self, _name, _avail=False):
return MagicMock(
res_data=[
@ -852,15 +860,9 @@ class MockEppLib(TestCase):
case commands.UpdateDomain:
return self.mockUpdateDomainCommands(_request, cleaned)
case commands.CreateHost:
return MagicMock(
res_data=[self.mockDataHostChange],
code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
)
return self.mockCreateHostCommands(_request, cleaned)
case commands.UpdateHost:
return MagicMock(
res_data=[self.mockDataHostChange],
code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
)
return self.mockUpdateHostCommands(_request, cleaned)
case commands.DeleteHost:
return MagicMock(
res_data=[self.mockDataHostChange],
@ -870,9 +872,33 @@ class MockEppLib(TestCase):
return self.mockCheckDomainCommand(_request, cleaned)
case commands.DeleteDomain:
return self.mockDeleteDomainCommands(_request, cleaned)
case commands.RenewDomain:
return self.mockRenewDomainCommand(_request, cleaned)
case _:
return MagicMock(res_data=[self.mockDataInfoHosts])
def mockCreateHostCommands(self, _request, cleaned):
test_ws_ip = common.Ip(addr="1.1. 1.1")
addrs_submitted = getattr(_request, "addrs", [])
if test_ws_ip in addrs_submitted:
raise RegistryError(code=ErrorCode.PARAMETER_VALUE_RANGE_ERROR)
else:
return MagicMock(
res_data=[self.mockDataHostChange],
code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
)
def mockUpdateHostCommands(self, _request, cleaned):
test_ws_ip = common.Ip(addr="1.1. 1.1")
addrs_submitted = getattr(_request, "addrs", [])
if test_ws_ip in addrs_submitted:
raise RegistryError(code=ErrorCode.PARAMETER_VALUE_RANGE_ERROR)
else:
return MagicMock(
res_data=[self.mockDataHostChange],
code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
)
def mockUpdateDomainCommands(self, _request, cleaned):
if getattr(_request, "name", None) == "dnssec-invalid.gov":
raise RegistryError(code=ErrorCode.PARAMETER_VALUE_RANGE_ERROR)
@ -890,6 +916,15 @@ class MockEppLib(TestCase):
raise RegistryError(code=ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION)
return None
def mockRenewDomainCommand(self, _request, cleaned):
if getattr(_request, "name", None) == "fake-error.gov":
raise RegistryError(code=ErrorCode.PARAMETER_VALUE_RANGE_ERROR)
else:
return MagicMock(
res_data=[self.mockRenewedDomainExpDate],
code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
)
def mockInfoDomainCommands(self, _request, cleaned):
request_name = getattr(_request, "name", None)

View file

@ -8,6 +8,7 @@ from registrar.admin import (
DomainAdmin,
DomainApplicationAdmin,
DomainApplicationAdminForm,
DomainInvitationAdmin,
ListHeaderAdmin,
MyUserAdmin,
AuditedAdmin,
@ -847,6 +848,44 @@ class TestDomainApplicationAdmin(MockEppLib):
User.objects.all().delete()
class DomainInvitationAdminTest(TestCase):
"""Tests for the DomainInvitation page"""
def setUp(self):
"""Create a client object"""
self.client = Client(HTTP_HOST="localhost:8080")
self.factory = RequestFactory()
self.admin = ListHeaderAdmin(model=DomainInvitationAdmin, admin_site=AdminSite())
self.superuser = create_superuser()
def tearDown(self):
"""Delete all DomainInvitation objects"""
DomainInvitation.objects.all().delete()
def test_get_filters(self):
"""Ensures that our filters are displaying correctly"""
# Have to get creative to get past linter
p = "adminpass"
self.client.login(username="superuser", password=p)
response = self.client.get(
"/admin/registrar/domaininvitation/",
{},
follow=True,
)
# Assert that the filters are added
self.assertContains(response, "invited", count=4)
self.assertContains(response, "retrieved", count=4)
# Check for the HTML context specificially
invited_html = '<a href="?status__exact=invited">invited</a>'
retrieved_html = '<a href="?status__exact=retrieved">retrieved</a>'
self.assertContains(response, invited_html, count=1)
self.assertContains(response, retrieved_html, count=1)
class ListHeaderAdminTest(TestCase):
def setUp(self):
self.site = AdminSite()

View file

@ -56,7 +56,7 @@ class TestDomainCache(MockEppLib):
self.assertFalse("avail" in domain._cache.keys())
# using a setter should clear the cache
domain.registry_expiration_date = datetime.date.today()
domain.dnssecdata = []
self.assertEquals(domain._cache, {})
# send should have been called only once
@ -1953,6 +1953,48 @@ class TestRegistrantDNSSEC(MockEppLib):
self.assertTrue(err.is_client_error() or err.is_session_error() or err.is_server_error())
class TestExpirationDate(MockEppLib):
"""User may renew expiration date by a number of units of time"""
def setUp(self):
"""
Domain exists in registry
"""
super().setUp()
# for the tests, need a domain in the ready state
self.domain, _ = Domain.objects.get_or_create(name="fake.gov", state=Domain.State.READY)
# for the test, need a domain that will raise an exception
self.domain_w_error, _ = Domain.objects.get_or_create(name="fake-error.gov", state=Domain.State.READY)
def tearDown(self):
Domain.objects.all().delete()
super().tearDown()
def test_expiration_date_setter_not_implemented(self):
"""assert that the setter for expiration date is not implemented and will raise error"""
with self.assertRaises(NotImplementedError):
self.domain.registry_expiration_date = datetime.date.today()
def test_renew_domain(self):
"""assert that the renew_domain sets new expiration date in cache and saves to registrar"""
self.domain.renew_domain()
test_date = datetime.date(2023, 5, 25)
self.assertEquals(self.domain._cache["ex_date"], test_date)
self.assertEquals(self.domain.expiration_date, test_date)
def test_renew_domain_error(self):
"""assert that the renew_domain raises an exception when registry raises error"""
with self.assertRaises(RegistryError):
self.domain_w_error.renew_domain()
def test_expiration_date_updated_on_info_domain_call(self):
"""assert that expiration date in db is updated on info domain call"""
# force fetch_cache to be called
self.domain.statuses
test_date = datetime.date(2023, 5, 25)
self.assertEquals(self.domain.expiration_date, test_date)
class TestAnalystClientHold(MockEppLib):
"""Rule: Analysts may suspend or restore a domain by using client hold"""

View file

@ -212,6 +212,7 @@ class ExportDataTest(TestCase):
)
def tearDown(self):
# Dummy push - will remove
Domain.objects.all().delete()
DomainInformation.objects.all().delete()
User.objects.all().delete()

View file

@ -16,9 +16,316 @@ from registrar.models import (
from django.core.management import call_command
from unittest.mock import patch
from registrar.models.contact import Contact
from .common import less_console_noise
class TestOrganizationMigration(TestCase):
def setUp(self):
"""Defines the file name of migration_json and the folder its contained in"""
self.test_data_file_location = "registrar/tests/data"
self.migration_json_filename = "test_migrationFilepaths.json"
def tearDown(self):
"""Deletes all DB objects related to migrations"""
# Delete domain information
Domain.objects.all().delete()
DomainInformation.objects.all().delete()
DomainInvitation.objects.all().delete()
TransitionDomain.objects.all().delete()
# Delete users
User.objects.all().delete()
UserDomainRole.objects.all().delete()
def run_load_domains(self):
"""
This method executes the load_transition_domain command.
It uses 'unittest.mock.patch' to mock the TerminalHelper.query_yes_no_exit method,
which is a user prompt in the terminal. The mock function always returns True,
allowing the test to proceed without manual user input.
The 'call_command' function from Django's management framework is then used to
execute the load_transition_domain command with the specified arguments.
"""
# noqa here because splitting this up makes it confusing.
# ES501
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa
return_value=True,
):
call_command(
"load_transition_domain",
self.migration_json_filename,
directory=self.test_data_file_location,
)
def run_transfer_domains(self):
"""
This method executes the transfer_transition_domains_to_domains command.
The 'call_command' function from Django's management framework is then used to
execute the load_transition_domain command with the specified arguments.
"""
call_command("transfer_transition_domains_to_domains")
def run_load_organization_data(self):
"""
This method executes the load_organization_data command.
It uses 'unittest.mock.patch' to mock the TerminalHelper.query_yes_no_exit method,
which is a user prompt in the terminal. The mock function always returns True,
allowing the test to proceed without manual user input.
The 'call_command' function from Django's management framework is then used to
execute the load_organization_data command with the specified arguments.
"""
# noqa here (E501) because splitting this up makes it
# confusing to read.
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa
return_value=True,
):
call_command(
"load_organization_data",
self.migration_json_filename,
directory=self.test_data_file_location,
)
def compare_tables(
self,
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
):
"""Does a diff between the transition_domain and the following tables:
domain, domain_information and the domain_invitation.
Verifies that the data loaded correctly."""
missing_domains = []
duplicate_domains = []
missing_domain_informations = []
for transition_domain in TransitionDomain.objects.all():
transition_domain_name = transition_domain.domain_name
# Check Domain table
matching_domains = Domain.objects.filter(name=transition_domain_name)
# Check Domain Information table
matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name)
if len(matching_domains) == 0:
missing_domains.append(transition_domain_name)
elif len(matching_domains) > 1:
duplicate_domains.append(transition_domain_name)
if len(matching_domain_informations) == 0:
missing_domain_informations.append(transition_domain_name)
total_missing_domains = len(missing_domains)
total_duplicate_domains = len(duplicate_domains)
total_missing_domain_informations = len(missing_domain_informations)
total_transition_domains = len(TransitionDomain.objects.all())
total_domains = len(Domain.objects.all())
total_domain_informations = len(DomainInformation.objects.all())
self.assertEqual(total_missing_domains, expected_missing_domains)
self.assertEqual(total_duplicate_domains, expected_duplicate_domains)
self.assertEqual(total_missing_domain_informations, expected_missing_domain_informations)
self.assertEqual(total_transition_domains, expected_total_transition_domains)
self.assertEqual(total_domains, expected_total_domains)
self.assertEqual(total_domain_informations, expected_total_domain_informations)
def test_load_organization_data_transition_domain(self):
"""
This test verifies the functionality of the load_organization_data method for TransitionDomain objects.
The test follows these steps:
1. Parses all existing data by running the load_domains and transfer_domains methods.
2. Attempts to add organization data to the parsed data by running the load_organization_data method.
3. Checks that the data has been loaded as expected.
The expected result is a set of TransitionDomain objects with specific attributes.
The test fetches the actual TransitionDomain objects from the database and compares them with the expected objects.
""" # noqa - E501 (harder to read)
# == First, parse all existing data == #
self.run_load_domains()
self.run_transfer_domains()
# == Second, try adding org data to it == #
self.run_load_organization_data()
# == Third, test that we've loaded data as we expect == #
transition_domains = TransitionDomain.objects.filter(domain_name="fakewebsite2.gov")
# Should return three objects (three unique emails)
self.assertEqual(transition_domains.count(), 3)
# Lets test the first one
transition = transition_domains.first()
expected_transition_domain = TransitionDomain(
username="alexandra.bobbitt5@test.com",
domain_name="fakewebsite2.gov",
status="on hold",
email_sent=True,
organization_type="Federal",
organization_name="Fanoodle",
federal_type="Executive",
federal_agency="Department of Commerce",
epp_creation_date=datetime.date(2004, 5, 7),
epp_expiration_date=datetime.date(2023, 9, 30),
first_name="Seline",
middle_name="testmiddle2",
last_name="Tower",
title=None,
email="stower3@answers.com",
phone="151-539-6028",
address_line="93001 Arizona Drive",
city="Columbus",
state_territory="Oh",
zipcode="43268",
)
expected_transition_domain.id = transition.id
self.assertEqual(transition, expected_transition_domain)
def test_load_organization_data_domain_information(self):
"""
This test verifies the functionality of the load_organization_data method.
The test follows these steps:
1. Parses all existing data by running the load_domains and transfer_domains methods.
2. Attempts to add organization data to the parsed data by running the load_organization_data method.
3. Checks that the data has been loaded as expected.
The expected result is a DomainInformation object with specific attributes.
The test fetches the actual DomainInformation object from the database
and compares it with the expected object.
"""
# == First, parse all existing data == #
self.run_load_domains()
self.run_transfer_domains()
# == Second, try adding org data to it == #
self.run_load_organization_data()
# == Third, test that we've loaded data as we expect == #
_domain = Domain.objects.filter(name="fakewebsite2.gov").get()
domain_information = DomainInformation.objects.filter(domain=_domain).get()
expected_creator = User.objects.filter(username="System").get()
expected_ao = Contact.objects.filter(first_name="Seline", middle_name="testmiddle2", last_name="Tower").get()
expected_domain_information = DomainInformation(
creator=expected_creator,
organization_type="federal",
federal_agency="Department of Commerce",
federal_type="executive",
organization_name="Fanoodle",
address_line1="93001 Arizona Drive",
city="Columbus",
state_territory="Oh",
zipcode="43268",
authorizing_official=expected_ao,
domain=_domain,
)
# Given that these are different objects, this needs to be set
expected_domain_information.id = domain_information.id
self.assertEqual(domain_information, expected_domain_information)
def test_load_organization_data_preserves_existing_data(self):
"""
This test verifies that the load_organization_data method does not overwrite existing data.
The test follows these steps:
1. Parses all existing data by running the load_domains and transfer_domains methods.
2. Adds pre-existing fake data to a DomainInformation object and saves it to the database.
3. Runs the load_organization_data method.
4. Checks that the pre-existing data in the DomainInformation object has not been overwritten.
The expected result is that the DomainInformation object retains its pre-existing data
after the load_organization_data method is run.
"""
# == First, parse all existing data == #
self.run_load_domains()
self.run_transfer_domains()
# == Second, try add prexisting fake data == #
_domain_old = Domain.objects.filter(name="fakewebsite2.gov").get()
domain_information_old = DomainInformation.objects.filter(domain=_domain_old).get()
domain_information_old.address_line1 = "93001 Galactic Way"
domain_information_old.city = "Olympus"
domain_information_old.state_territory = "MA"
domain_information_old.zipcode = "12345"
domain_information_old.save()
# == Third, try running the script == #
self.run_load_organization_data()
# == Fourth, test that no data is overwritten as we expect == #
_domain = Domain.objects.filter(name="fakewebsite2.gov").get()
domain_information = DomainInformation.objects.filter(domain=_domain).get()
expected_creator = User.objects.filter(username="System").get()
expected_ao = Contact.objects.filter(first_name="Seline", middle_name="testmiddle2", last_name="Tower").get()
expected_domain_information = DomainInformation(
creator=expected_creator,
organization_type="federal",
federal_agency="Department of Commerce",
federal_type="executive",
organization_name="Fanoodle",
address_line1="93001 Galactic Way",
city="Olympus",
state_territory="MA",
zipcode="12345",
authorizing_official=expected_ao,
domain=_domain,
)
# Given that these are different objects, this needs to be set
expected_domain_information.id = domain_information.id
self.assertEqual(domain_information, expected_domain_information)
def test_load_organization_data_integrity(self):
"""
This test verifies the data integrity after running the load_organization_data method.
The test follows these steps:
1. Parses all existing data by running the load_domains and transfer_domains methods.
2. Attempts to add organization data to the parsed data by running the load_organization_data method.
3. Checks that the data has not been corrupted by comparing the actual counts of objects in the database
with the expected counts.
The expected result is that the counts of objects in the database
match the expected counts, indicating that the data has not been corrupted.
"""
# First, parse all existing data
self.run_load_domains()
self.run_transfer_domains()
# Second, try adding org data to it
self.run_load_organization_data()
# Third, test that we didn't corrupt any data
expected_total_transition_domains = 9
expected_total_domains = 5
expected_total_domain_informations = 5
expected_missing_domains = 0
expected_duplicate_domains = 0
expected_missing_domain_informations = 0
self.compare_tables(
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
)
class TestMigrations(TestCase):
def setUp(self):
""" """
@ -41,11 +348,12 @@ class TestMigrations(TestCase):
self.migration_json_filename = "test_migrationFilepaths.json"
def tearDown(self):
super().tearDown()
# Delete domain information
TransitionDomain.objects.all().delete()
Domain.objects.all().delete()
DomainInvitation.objects.all().delete()
DomainInformation.objects.all().delete()
DomainInvitation.objects.all().delete()
# Delete users
User.objects.all().delete()

View file

@ -13,6 +13,12 @@ import boto3_mocking # type: ignore
from registrar.utility.errors import (
NameserverError,
NameserverErrorCodes,
SecurityEmailError,
SecurityEmailErrorCodes,
GenericError,
GenericErrorCodes,
DsDataError,
DsDataErrorCodes,
)
from registrar.models import (
@ -1076,6 +1082,8 @@ class TestWithDomainPermissions(TestWithUser):
self.domain_with_ip, _ = Domain.objects.get_or_create(name="nameserverwithip.gov")
self.domain_just_nameserver, _ = Domain.objects.get_or_create(name="justnameserver.com")
self.domain_no_information, _ = Domain.objects.get_or_create(name="noinformation.gov")
self.domain_on_hold, _ = Domain.objects.get_or_create(name="on-hold.gov", state=Domain.State.ON_HOLD)
self.domain_deleted, _ = Domain.objects.get_or_create(name="deleted.gov", state=Domain.State.DELETED)
self.domain_dsdata, _ = Domain.objects.get_or_create(name="dnssec-dsdata.gov")
self.domain_multdsdata, _ = Domain.objects.get_or_create(name="dnssec-multdsdata.gov")
@ -1090,6 +1098,8 @@ class TestWithDomainPermissions(TestWithUser):
DomainInformation.objects.get_or_create(creator=self.user, domain=self.domain_dnssec_none)
DomainInformation.objects.get_or_create(creator=self.user, domain=self.domain_with_ip)
DomainInformation.objects.get_or_create(creator=self.user, domain=self.domain_just_nameserver)
DomainInformation.objects.get_or_create(creator=self.user, domain=self.domain_on_hold)
DomainInformation.objects.get_or_create(creator=self.user, domain=self.domain_deleted)
self.role, _ = UserDomainRole.objects.get_or_create(
user=self.user, domain=self.domain, role=UserDomainRole.Roles.MANAGER
@ -1118,6 +1128,12 @@ class TestWithDomainPermissions(TestWithUser):
domain=self.domain_just_nameserver,
role=UserDomainRole.Roles.MANAGER,
)
UserDomainRole.objects.get_or_create(
user=self.user, domain=self.domain_on_hold, role=UserDomainRole.Roles.MANAGER
)
UserDomainRole.objects.get_or_create(
user=self.user, domain=self.domain_deleted, role=UserDomainRole.Roles.MANAGER
)
def tearDown(self):
try:
@ -1171,6 +1187,31 @@ class TestDomainPermissions(TestWithDomainPermissions):
response = self.client.get(reverse(view_name, kwargs={"pk": self.domain.id}))
self.assertEqual(response.status_code, 403)
def test_domain_pages_blocked_for_on_hold_and_deleted(self):
"""Test that the domain pages are blocked for on hold and deleted domains"""
self.client.force_login(self.user)
for view_name in [
"domain-users",
"domain-users-add",
"domain-dns",
"domain-dns-nameservers",
"domain-dns-dnssec",
"domain-dns-dnssec-dsdata",
"domain-org-name-address",
"domain-authorizing-official",
"domain-your-contact-information",
"domain-security-email",
]:
for domain in [
self.domain_on_hold,
self.domain_deleted,
]:
with self.subTest(view_name=view_name, domain=domain):
with less_console_noise():
response = self.client.get(reverse(view_name, kwargs={"pk": domain.id}))
self.assertEqual(response.status_code, 403)
class TestDomainOverview(TestWithDomainPermissions, WebTest):
def setUp(self):
@ -1178,6 +1219,8 @@ class TestDomainOverview(TestWithDomainPermissions, WebTest):
self.app.set_user(self.user.username)
self.client.force_login(self.user)
class TestDomainDetail(TestDomainOverview):
def test_domain_detail_link_works(self):
home_page = self.app.get("/")
self.assertContains(home_page, "igorville.gov")
@ -1186,7 +1229,7 @@ class TestDomainOverview(TestWithDomainPermissions, WebTest):
self.assertContains(detail_page, "igorville.gov")
self.assertContains(detail_page, "Status")
def test_domain_overview_blocked_for_ineligible_user(self):
def test_domain_detail_blocked_for_ineligible_user(self):
"""We could easily duplicate this test for all domain management
views, but a single url test should be solid enough since all domain
management pages share the same permissions class"""
@ -1198,7 +1241,16 @@ class TestDomainOverview(TestWithDomainPermissions, WebTest):
response = self.client.get(reverse("domain", kwargs={"pk": self.domain.id}))
self.assertEqual(response.status_code, 403)
def test_domain_see_just_nameserver(self):
def test_domain_detail_allowed_for_on_hold(self):
"""Test that the domain overview page displays for on hold domain"""
home_page = self.app.get("/")
self.assertContains(home_page, "on-hold.gov")
# View domain overview page
detail_page = self.client.get(reverse("domain", kwargs={"pk": self.domain_on_hold.id}))
self.assertNotContains(detail_page, "Edit")
def test_domain_detail_see_just_nameserver(self):
home_page = self.app.get("/")
self.assertContains(home_page, "justnameserver.com")
@ -1209,7 +1261,7 @@ class TestDomainOverview(TestWithDomainPermissions, WebTest):
self.assertContains(detail_page, "ns1.justnameserver.com")
self.assertContains(detail_page, "ns2.justnameserver.com")
def test_domain_see_nameserver_and_ip(self):
def test_domain_detail_see_nameserver_and_ip(self):
home_page = self.app.get("/")
self.assertContains(home_page, "nameserverwithip.gov")
@ -1225,7 +1277,7 @@ class TestDomainOverview(TestWithDomainPermissions, WebTest):
self.assertContains(detail_page, "(1.2.3.4,")
self.assertContains(detail_page, "2.3.4.5)")
def test_domain_with_no_information_or_application(self):
def test_domain_detail_with_no_information_or_application(self):
"""Test that domain management page returns 200 and displays error
when no domain information or domain application exist"""
# have to use staff user for this test
@ -1255,12 +1307,12 @@ class TestDomainManagers(TestDomainOverview):
def test_domain_managers_add_link(self):
"""Button to get to user add page works."""
management_page = self.app.get(reverse("domain-users", kwargs={"pk": self.domain.id}))
add_page = management_page.click("Add another user")
self.assertContains(add_page, "Add another user")
add_page = management_page.click("Add a domain manager")
self.assertContains(add_page, "Add a domain manager")
def test_domain_user_add(self):
response = self.client.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
self.assertContains(response, "Add another user")
self.assertContains(response, "Add a domain manager")
def test_domain_user_add_form(self):
"""Adding an existing user works."""
@ -1456,6 +1508,62 @@ class TestDomainNameservers(TestDomainOverview):
status_code=200,
)
def test_domain_nameservers_form_submit_duplicate_host(self):
"""Nameserver form catches error when host is duplicated.
Uses self.app WebTest because we need to interact with forms.
"""
# initial nameservers page has one server with two ips
nameservers_page = self.app.get(reverse("domain-dns-nameservers", kwargs={"pk": self.domain.id}))
session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
# attempt to submit the form with duplicate host names of fake.host.com
nameservers_page.form["form-0-ip"] = ""
nameservers_page.form["form-1-server"] = "fake.host.com"
with less_console_noise(): # swallow log warning message
result = nameservers_page.form.submit()
# form submission was a post with an error, response should be a 200
# error text appears twice, once at the top of the page, once around
# the required field. remove duplicate entry
self.assertContains(
result,
str(NameserverError(code=NameserverErrorCodes.DUPLICATE_HOST)),
count=2,
status_code=200,
)
def test_domain_nameservers_form_submit_whitespace(self):
"""Nameserver form removes whitespace from ip.
Uses self.app WebTest because we need to interact with forms.
"""
nameserver1 = "ns1.igorville.gov"
nameserver2 = "ns2.igorville.gov"
valid_ip = "1.1. 1.1"
# initial nameservers page has one server with two ips
# have to throw an error in order to test that the whitespace has been stripped from ip
nameservers_page = self.app.get(reverse("domain-dns-nameservers", kwargs={"pk": self.domain.id}))
session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
# attempt to submit the form without one host and an ip with whitespace
nameservers_page.form["form-0-server"] = nameserver1
nameservers_page.form["form-1-ip"] = valid_ip
nameservers_page.form["form-1-server"] = nameserver2
with less_console_noise(): # swallow log warning message
result = nameservers_page.form.submit()
# form submission was a post with an ip address which has been stripped of whitespace,
# response should be a 302 to success page
self.assertEqual(result.status_code, 302)
self.assertEqual(
result["Location"],
reverse("domain-dns-nameservers", kwargs={"pk": self.domain.id}),
)
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
page = result.follow()
# in the event of a generic nameserver error from registry error, there will be a 302
# with an error message displayed, so need to follow 302 and test for success message
self.assertContains(page, "The name servers for this domain have been updated")
def test_domain_nameservers_form_submit_glue_record_not_allowed(self):
"""Nameserver form catches error when IP is present
but host not subdomain.
@ -1547,7 +1655,7 @@ class TestDomainNameservers(TestDomainOverview):
"""
nameserver1 = "ns1.igorville.gov"
nameserver2 = "ns2.igorville.gov"
invalid_ip = "127.0.0.1"
valid_ip = "127.0.0.1"
# initial nameservers page has one server with two ips
nameservers_page = self.app.get(reverse("domain-dns-nameservers", kwargs={"pk": self.domain.id}))
session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
@ -1556,7 +1664,7 @@ class TestDomainNameservers(TestDomainOverview):
# only one has ips
nameservers_page.form["form-0-server"] = nameserver1
nameservers_page.form["form-1-server"] = nameserver2
nameservers_page.form["form-1-ip"] = invalid_ip
nameservers_page.form["form-1-ip"] = valid_ip
with less_console_noise(): # swallow log warning message
result = nameservers_page.form.submit()
# form submission was a successful post, response should be a 302
@ -1734,13 +1842,13 @@ class TestDomainSecurityEmail(TestDomainOverview):
(
"RegistryError",
form_data_registry_error,
"""
Were experiencing a system connection error. Please wait a few minutes
and try again. If you continue to receive this error after a few tries,
contact help@get.gov
""",
str(GenericError(code=GenericErrorCodes.CANNOT_CONTACT_REGISTRY)),
),
(
"ContactError",
form_data_contact_error,
str(SecurityEmailError(code=SecurityEmailErrorCodes.BAD_DATA)),
),
("ContactError", form_data_contact_error, "Value entered was wrong."),
(
"RegistrySuccess",
form_data_success,
@ -1874,7 +1982,30 @@ class TestDomainDNSSEC(TestDomainOverview):
self.assertContains(page, "The DS Data records for this domain have been updated.")
def test_ds_data_form_invalid(self):
"""DS Data form errors with invalid data
"""DS Data form errors with invalid data (missing required fields)
Uses self.app WebTest because we need to interact with forms.
"""
add_data_page = self.app.get(reverse("domain-dns-dnssec-dsdata", kwargs={"pk": self.domain_dsdata.id}))
session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
# all four form fields are required, so will test with each blank
add_data_page.forms[0]["form-0-key_tag"] = ""
add_data_page.forms[0]["form-0-algorithm"] = ""
add_data_page.forms[0]["form-0-digest_type"] = ""
add_data_page.forms[0]["form-0-digest"] = ""
with less_console_noise(): # swallow logged warning message
result = add_data_page.forms[0].submit()
# form submission was a post with an error, response should be a 200
# error text appears twice, once at the top of the page, once around
# the field.
self.assertContains(result, "Key tag is required", count=2, status_code=200)
self.assertContains(result, "Algorithm is required", count=2, status_code=200)
self.assertContains(result, "Digest type is required", count=2, status_code=200)
self.assertContains(result, "Digest is required", count=2, status_code=200)
def test_ds_data_form_invalid_keytag(self):
"""DS Data form errors with invalid data (key tag too large)
Uses self.app WebTest because we need to interact with forms.
"""
@ -1883,13 +2014,87 @@ class TestDomainDNSSEC(TestDomainOverview):
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
# first two nameservers are required, so if we empty one out we should
# get a form error
add_data_page.forms[0]["form-0-key_tag"] = ""
add_data_page.forms[0]["form-0-key_tag"] = "65536" # > 65535
add_data_page.forms[0]["form-0-algorithm"] = ""
add_data_page.forms[0]["form-0-digest_type"] = ""
add_data_page.forms[0]["form-0-digest"] = ""
with less_console_noise(): # swallow logged warning message
result = add_data_page.forms[0].submit()
# form submission was a post with an error, response should be a 200
# error text appears twice, once at the top of the page, once around
# the field.
self.assertContains(result, "Key tag is required", count=2, status_code=200)
self.assertContains(
result, str(DsDataError(code=DsDataErrorCodes.INVALID_KEYTAG_SIZE)), count=2, status_code=200
)
def test_ds_data_form_invalid_digest_chars(self):
"""DS Data form errors with invalid data (digest contains non hexadecimal chars)
Uses self.app WebTest because we need to interact with forms.
"""
add_data_page = self.app.get(reverse("domain-dns-dnssec-dsdata", kwargs={"pk": self.domain_dsdata.id}))
session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
# first two nameservers are required, so if we empty one out we should
# get a form error
add_data_page.forms[0]["form-0-key_tag"] = "1234"
add_data_page.forms[0]["form-0-algorithm"] = "3"
add_data_page.forms[0]["form-0-digest_type"] = "1"
add_data_page.forms[0]["form-0-digest"] = "GG1234"
with less_console_noise(): # swallow logged warning message
result = add_data_page.forms[0].submit()
# form submission was a post with an error, response should be a 200
# error text appears twice, once at the top of the page, once around
# the field.
self.assertContains(
result, str(DsDataError(code=DsDataErrorCodes.INVALID_DIGEST_CHARS)), count=2, status_code=200
)
def test_ds_data_form_invalid_digest_sha1(self):
"""DS Data form errors with invalid data (digest is invalid sha-1)
Uses self.app WebTest because we need to interact with forms.
"""
add_data_page = self.app.get(reverse("domain-dns-dnssec-dsdata", kwargs={"pk": self.domain_dsdata.id}))
session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
# first two nameservers are required, so if we empty one out we should
# get a form error
add_data_page.forms[0]["form-0-key_tag"] = "1234"
add_data_page.forms[0]["form-0-algorithm"] = "3"
add_data_page.forms[0]["form-0-digest_type"] = "1" # SHA-1
add_data_page.forms[0]["form-0-digest"] = "A123"
with less_console_noise(): # swallow logged warning message
result = add_data_page.forms[0].submit()
# form submission was a post with an error, response should be a 200
# error text appears twice, once at the top of the page, once around
# the field.
self.assertContains(
result, str(DsDataError(code=DsDataErrorCodes.INVALID_DIGEST_SHA1)), count=2, status_code=200
)
def test_ds_data_form_invalid_digest_sha256(self):
"""DS Data form errors with invalid data (digest is invalid sha-256)
Uses self.app WebTest because we need to interact with forms.
"""
add_data_page = self.app.get(reverse("domain-dns-dnssec-dsdata", kwargs={"pk": self.domain_dsdata.id}))
session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
# first two nameservers are required, so if we empty one out we should
# get a form error
add_data_page.forms[0]["form-0-key_tag"] = "1234"
add_data_page.forms[0]["form-0-algorithm"] = "3"
add_data_page.forms[0]["form-0-digest_type"] = "2" # SHA-256
add_data_page.forms[0]["form-0-digest"] = "GG1234"
with less_console_noise(): # swallow logged warning message
result = add_data_page.forms[0].submit()
# form submission was a post with an error, response should be a 200
# error text appears twice, once at the top of the page, once around
# the field.
self.assertContains(
result, str(DsDataError(code=DsDataErrorCodes.INVALID_DIGEST_SHA256)), count=2, status_code=200
)
class TestApplicationStatus(TestWithUser, WebTest):

View file

@ -66,18 +66,20 @@ class NameserverErrorCodes(IntEnum):
value but is not a subdomain
- 3 INVALID_IP invalid ip address format or invalid version
- 4 TOO_MANY_HOSTS more than the max allowed host values
- 5 UNABLE_TO_UPDATE_DOMAIN unable to update the domain
- 6 MISSING_HOST host is missing for a nameserver
- 7 INVALID_HOST host is invalid for a nameserver
- 5 MISSING_HOST host is missing for a nameserver
- 6 INVALID_HOST host is invalid for a nameserver
- 7 DUPLICATE_HOST host is a duplicate
- 8 BAD_DATA bad data input for nameserver
"""
MISSING_IP = 1
GLUE_RECORD_NOT_ALLOWED = 2
INVALID_IP = 3
TOO_MANY_HOSTS = 4
UNABLE_TO_UPDATE_DOMAIN = 5
MISSING_HOST = 6
INVALID_HOST = 7
MISSING_HOST = 5
INVALID_HOST = 6
DUPLICATE_HOST = 7
BAD_DATA = 8
class NameserverError(Exception):
@ -91,11 +93,13 @@ class NameserverError(Exception):
NameserverErrorCodes.GLUE_RECORD_NOT_ALLOWED: ("Name server address does not match domain name"),
NameserverErrorCodes.INVALID_IP: ("{}: Enter an IP address in the required format."),
NameserverErrorCodes.TOO_MANY_HOSTS: ("Too many hosts provided, you may not have more than 13 nameservers."),
NameserverErrorCodes.UNABLE_TO_UPDATE_DOMAIN: (
"Unable to update domain, changes were not applied. Check logs as a Registry Error is the likely cause"
),
NameserverErrorCodes.MISSING_HOST: ("Name server must be provided to enter IP address."),
NameserverErrorCodes.INVALID_HOST: ("Enter a name server in the required format, like ns1.example.com"),
NameserverErrorCodes.DUPLICATE_HOST: ("Remove duplicate entry"),
NameserverErrorCodes.BAD_DATA: (
"Theres something wrong with the name server information you provided. "
"If you need help email us at help@get.gov."
),
}
def __init__(self, *args, code=None, nameserver=None, ip=None, **kwargs):
@ -112,3 +116,77 @@ class NameserverError(Exception):
def __str__(self):
return f"{self.message}"
class DsDataErrorCodes(IntEnum):
"""Used in the DsDataError class for
error mapping.
Overview of ds data error codes:
- 1 BAD_DATA bad data input in ds data
- 2 INVALID_DIGEST_SHA1 invalid digest for digest type SHA-1
- 3 INVALID_DIGEST_SHA256 invalid digest for digest type SHA-256
- 4 INVALID_DIGEST_CHARS invalid chars in digest
- 5 INVALID_KEYTAG_SIZE invalid key tag size > 65535
"""
BAD_DATA = 1
INVALID_DIGEST_SHA1 = 2
INVALID_DIGEST_SHA256 = 3
INVALID_DIGEST_CHARS = 4
INVALID_KEYTAG_SIZE = 5
class DsDataError(Exception):
"""
DsDataError class used to raise exceptions on
the ds data getter
"""
_error_mapping = {
DsDataErrorCodes.BAD_DATA: (
"Theres something wrong with the DS data you provided. If you need help email us at help@get.gov."
),
DsDataErrorCodes.INVALID_DIGEST_SHA1: ("SHA-1 digest must be exactly 40 characters."),
DsDataErrorCodes.INVALID_DIGEST_SHA256: ("SHA-256 digest must be exactly 64 characters."),
DsDataErrorCodes.INVALID_DIGEST_CHARS: ("Digest must contain only alphanumeric characters [0-9,a-f]."),
DsDataErrorCodes.INVALID_KEYTAG_SIZE: ("Key tag must be less than 65535"),
}
def __init__(self, *args, code=None, **kwargs):
super().__init__(*args, **kwargs)
self.code = code
if self.code in self._error_mapping:
self.message = self._error_mapping.get(self.code)
def __str__(self):
return f"{self.message}"
class SecurityEmailErrorCodes(IntEnum):
"""Used in the SecurityEmailError class for
error mapping.
Overview of security email error codes:
- 1 BAD_DATA bad data input in security email
"""
BAD_DATA = 1
class SecurityEmailError(Exception):
"""
SecurityEmailError class used to raise exceptions on
the security email form
"""
_error_mapping = {
SecurityEmailErrorCodes.BAD_DATA: ("Enter an email address in the required format, like name@example.com.")
}
def __init__(self, *args, code=None, **kwargs):
super().__init__(*args, **kwargs)
self.code = code
if self.code in self._error_mapping:
self.message = self._error_mapping.get(self.code)
def __str__(self):
return f"{self.message}"

View file

@ -28,11 +28,16 @@ from registrar.utility.errors import (
GenericErrorCodes,
NameserverError,
NameserverErrorCodes as nsErrorCodes,
DsDataError,
DsDataErrorCodes,
SecurityEmailError,
SecurityEmailErrorCodes,
)
from registrar.models.utility.contact_error import ContactError
from ..forms import (
ContactForm,
AuthorizingOfficialContactForm,
DomainOrgNameAddressForm,
DomainAddUserForm,
DomainSecurityEmailForm,
@ -147,6 +152,28 @@ class DomainView(DomainBaseView):
context["security_email"] = security_email
return context
def in_editable_state(self, pk):
"""Override in_editable_state from DomainPermission
Allow detail page to be viewable"""
requested_domain = None
if Domain.objects.filter(id=pk).exists():
requested_domain = Domain.objects.get(id=pk)
# return true if the domain exists, this will allow the detail page to load
if requested_domain:
return True
return False
def _get_domain(self, request):
"""
override get_domain for this view so that domain overview
always resets the cache for the domain object
"""
self.session = request.session
self.object = self.get_object()
self._update_session_with_domain()
class DomainOrgNameAddressView(DomainFormBaseView):
"""Organization name and mailing address view"""
@ -170,7 +197,7 @@ class DomainOrgNameAddressView(DomainFormBaseView):
"""The form is valid, save the organization name and mailing address."""
form.save()
messages.success(self.request, "The organization name and mailing address has been updated.")
messages.success(self.request, "The organization information has been updated.")
# superclass has the redirect
return super().form_valid(form)
@ -182,7 +209,7 @@ class DomainAuthorizingOfficialView(DomainFormBaseView):
model = Domain
template_name = "domain_authorizing_official.html"
context_object_name = "domain"
form_class = ContactForm
form_class = AuthorizingOfficialContactForm
def get_form_kwargs(self, *args, **kwargs):
"""Add domain_info.authorizing_official instance to make a bound form."""
@ -304,7 +331,7 @@ class DomainNameserversView(DomainFormBaseView):
except NameserverError as Err:
# NamserverErrors *should* be caught in form; if reached here,
# there was an uncaught error in submission (through EPP)
messages.error(self.request, NameserverError(code=nsErrorCodes.UNABLE_TO_UPDATE_DOMAIN))
messages.error(self.request, NameserverError(code=nsErrorCodes.BAD_DATA))
logger.error(f"Nameservers error: {Err}")
# TODO: registry is not throwing an error when no connection
except RegistryError as Err:
@ -315,7 +342,7 @@ class DomainNameserversView(DomainFormBaseView):
)
logger.error(f"Registry connection error: {Err}")
else:
messages.error(self.request, GenericError(code=GenericErrorCodes.GENERIC_ERROR))
messages.error(self.request, NameserverError(code=nsErrorCodes.BAD_DATA))
logger.error(f"Registry error: {Err}")
else:
messages.success(
@ -445,7 +472,7 @@ class DomainDsDataView(DomainFormBaseView):
modal_button = (
'<button type="submit" '
'class="usa-button usa-button--secondary" '
'name="disable-override-click">Delete all records</button>'
'name="disable-override-click">Remove all DS Data</button>'
)
# context to back out of a broken form on all fields delete
@ -491,7 +518,7 @@ class DomainDsDataView(DomainFormBaseView):
)
logger.error(f"Registry connection error: {err}")
else:
messages.error(self.request, GenericError(code=GenericErrorCodes.GENERIC_ERROR))
messages.error(self.request, DsDataError(code=DsDataErrorCodes.BAD_DATA))
logger.error(f"Registry error: {err}")
return self.form_invalid(formset)
else:
@ -581,10 +608,10 @@ class DomainSecurityEmailView(DomainFormBaseView):
)
logger.error(f"Registry connection error: {Err}")
else:
messages.error(self.request, GenericError(code=GenericErrorCodes.GENERIC_ERROR))
messages.error(self.request, SecurityEmailError(code=SecurityEmailErrorCodes.BAD_DATA))
logger.error(f"Registry error: {Err}")
except ContactError as Err:
messages.error(self.request, GenericError(code=GenericErrorCodes.GENERIC_ERROR))
messages.error(self.request, SecurityEmailError(code=SecurityEmailErrorCodes.BAD_DATA))
logger.error(f"Generic registry error: {Err}")
else:
messages.success(self.request, "The security email for this domain has been updated.")

View file

@ -3,6 +3,7 @@
from django.contrib.auth.mixins import PermissionRequiredMixin
from registrar.models import (
Domain,
DomainApplication,
DomainInvitation,
DomainInformation,
@ -45,6 +46,10 @@ class DomainPermission(PermissionsLoginMixin):
if pk is None:
raise ValueError("Primary key is None")
# test if domain in editable state
if not self.in_editable_state(pk):
return False
if self.can_access_other_user_domains(pk):
return True
@ -55,6 +60,18 @@ class DomainPermission(PermissionsLoginMixin):
# if we need to check more about the nature of role, do it here.
return True
def in_editable_state(self, pk):
"""Is the domain in an editable state"""
requested_domain = None
if Domain.objects.filter(id=pk).exists():
requested_domain = Domain.objects.get(id=pk)
# if domain is editable return true
if requested_domain and requested_domain.is_editable():
return True
return False
def can_access_other_user_domains(self, pk):
"""Checks to see if an authorized user (staff or superuser)
can access a domain that they did not create or was invited to.