Merge branch 'main' into za/1102-epp-contact-disclose-update

This commit is contained in:
zandercymatics 2023-10-04 10:50:21 -06:00
commit 89a1390ec7
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
14 changed files with 1140 additions and 141 deletions

2
src/Pipfile.lock generated
View file

@ -353,7 +353,7 @@
},
"fred-epplib": {
"git": "https://github.com/cisagov/epplib.git",
"ref": "f818cbf0b069a12f03e1d72e4b9f4900924b832d"
"ref": "d56d183f1664f34c40ca9716a3a9a345f0ef561c"
},
"furl": {
"hashes": [

View file

@ -11,6 +11,7 @@ logger = logging.getLogger(__name__)
NAMESPACE = SimpleNamespace(
EPP="urn:ietf:params:xml:ns:epp-1.0",
SEC_DNS="urn:ietf:params:xml:ns:secDNS-1.1",
XSI="http://www.w3.org/2001/XMLSchema-instance",
FRED="noop",
NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0",
@ -25,6 +26,7 @@ NAMESPACE = SimpleNamespace(
SCHEMA_LOCATION = SimpleNamespace(
XSI="urn:ietf:params:xml:ns:epp-1.0 epp-1.0.xsd",
FRED="noop fred-1.5.0.xsd",
SEC_DNS="urn:ietf:params:xml:ns:secDNS-1.1 secDNS-1.1.xsd",
NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0 contact-1.0.xsd",
NIC_DOMAIN="urn:ietf:params:xml:ns:domain-1.0 domain-1.0.xsd",
NIC_ENUMVAL="noop enumval-1.2.0.xsd",
@ -44,7 +46,8 @@ except NameError:
try:
from .client import CLIENT, commands
from .errors import RegistryError, ErrorCode
from epplib.models import common
from epplib.models import common, info
from epplib.responses import extensions
from epplib import responses
except ImportError:
pass
@ -53,7 +56,9 @@ __all__ = [
"CLIENT",
"commands",
"common",
"extensions",
"responses",
"info",
"ErrorCode",
"RegistryError",
]

View file

@ -87,6 +87,11 @@ class UserFixture:
"first_name": "Erin",
"last_name": "Song",
},
{
"username": "e0ea8b94-6e53-4430-814a-849a7ca45f21",
"first_name": "Kristina",
"last_name": "Yin",
},
]
STAFF = [
@ -145,6 +150,12 @@ class UserFixture:
"last_name": "Song-Analyst",
"email": "erin.song+1@gsa.gov",
},
{
"username": "9a98e4c9-9409-479d-964e-4aec7799107f",
"first_name": "Kristina-Analyst",
"last_name": "Yin-Analyst",
"email": "kristina.yin+1@gsa.gov",
},
]
STAFF_PERMISSIONS = [

View file

@ -0,0 +1,17 @@
# Generated by Django 4.2.1 on 2023-10-02 22:07
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("registrar", "0032_alter_transitiondomain_status"),
]
operations = [
migrations.AlterField(
model_name="userdomainrole",
name="role",
field=models.TextField(choices=[("manager", "Admin")]),
),
]

View file

@ -1,5 +1,5 @@
from itertools import zip_longest
import logging
from datetime import date
from string import digits
from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore
@ -10,9 +10,12 @@ from epplibwrapper import (
CLIENT as registry,
commands,
common as epp,
extensions,
info as eppInfo,
RegistryError,
ErrorCode,
)
from registrar.models.utility.contact_error import ContactError
from .utility.domain_field import DomainField
from .utility.domain_helper import DomainHelper
@ -279,6 +282,27 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error("Error _create_host, code was %s error was %s" % (e.code, e))
return e.code
@Cache
def dnssecdata(self) -> extensions.DNSSECExtension:
return self._get_property("dnssecdata")
@dnssecdata.setter # type: ignore
def dnssecdata(self, _dnssecdata: extensions.DNSSECExtension):
updateParams = {
"maxSigLife": _dnssecdata.get("maxSigLife", None),
"dsData": _dnssecdata.get("dsData", None),
"keyData": _dnssecdata.get("keyData", None),
"remAllDsKeyData": True,
}
request = commands.UpdateDomain(name=self.name)
extension = commands.UpdateDomainDNSSECExtension(**updateParams)
request.add_extension(extension)
try:
registry.send(request, cleaned=True)
except RegistryError as e:
logger.error("Error adding DNSSEC, code was %s error was %s" % (e.code, e))
raise e
@nameservers.setter # type: ignore
def nameservers(self, hosts: list[tuple[str]]):
"""host should be a tuple of type str, str,... where the elements are
@ -352,9 +376,9 @@ class Domain(TimeStampedModel, DomainHelper):
raise NotImplementedError()
@Cache
def registrant_contact(self) -> PublicContact:
"""Get or set the registrant for this domain."""
raise NotImplementedError()
def registrant_contact(self) -> PublicContact | None:
registrant = PublicContact.ContactTypeChoices.REGISTRANT
return self.generic_contact_getter(registrant)
@registrant_contact.setter # type: ignore
def registrant_contact(self, contact: PublicContact):
@ -367,9 +391,10 @@ class Domain(TimeStampedModel, DomainHelper):
)
@Cache
def administrative_contact(self) -> PublicContact:
"""Get or set the admin contact for this domain."""
raise NotImplementedError()
def administrative_contact(self) -> PublicContact | None:
"""Get the admin contact for this domain."""
admin = PublicContact.ContactTypeChoices.ADMINISTRATIVE
return self.generic_contact_getter(admin)
@administrative_contact.setter # type: ignore
def administrative_contact(self, contact: PublicContact):
@ -381,12 +406,6 @@ class Domain(TimeStampedModel, DomainHelper):
self._make_contact_in_registry(contact=contact)
self._update_domain_with_contact(contact, rem=False)
def get_default_security_contact(self):
logger.info("getting default sec contact")
contact = PublicContact.get_default_security()
contact.domain = self
return contact
def _update_epp_contact(self, contact: PublicContact):
"""Sends UpdateContact to update the actual contact object,
domain object remains unaffected
@ -440,26 +459,10 @@ class Domain(TimeStampedModel, DomainHelper):
)
@Cache
def security_contact(self) -> PublicContact:
def security_contact(self) -> PublicContact | None:
"""Get or set the security contact for this domain."""
try:
contacts = self._get_property("contacts")
for contact in contacts:
if (
"type" in contact.keys()
and contact["type"] == PublicContact.ContactTypeChoices.SECURITY
):
tempContact = self.get_default_security_contact()
tempContact.email = contact["email"]
return tempContact
except Exception as err: # use better error handling
logger.info("Couldn't get contact %s" % err)
# TODO - remove this ideally it should return None,
# but error handling needs to be
# added on the security email page so that it can handle it being none
return self.get_default_security_contact()
security = PublicContact.ContactTypeChoices.SECURITY
return self.generic_contact_getter(security)
def _add_registrant_to_existing_domain(self, contact: PublicContact):
"""Used to change the registrant contact on an existing domain"""
@ -533,6 +536,7 @@ class Domain(TimeStampedModel, DomainHelper):
.filter(domain=self, contact_type=contact.contact_type)
.get()
)
if isRegistrant:
# send update domain only for registant contacts
existing_contact.delete()
@ -589,9 +593,10 @@ class Domain(TimeStampedModel, DomainHelper):
)
@Cache
def technical_contact(self) -> PublicContact:
def technical_contact(self) -> PublicContact | None:
"""Get or set the tech contact for this domain."""
raise NotImplementedError()
tech = PublicContact.ContactTypeChoices.TECHNICAL
return self.generic_contact_getter(tech)
@technical_contact.setter # type: ignore
def technical_contact(self, contact: PublicContact):
@ -674,6 +679,231 @@ class Domain(TimeStampedModel, DomainHelper):
help_text="Very basic info about the lifecycle of this domain object",
)
def isActive(self):
return self.state == Domain.State.CREATED
def map_epp_contact_to_public_contact(
self, contact: eppInfo.InfoContactResultData, contact_id, contact_type
):
"""Maps the Epp contact representation to a PublicContact object.
contact -> eppInfo.InfoContactResultData: The converted contact object
contact_id -> str: The given registry_id of the object (i.e "cheese@cia.gov")
contact_type -> str: The given contact type, (i.e. "tech" or "registrant")
"""
if contact is None:
return None
if contact_type is None:
raise ContactError("contact_type is None")
if contact_id is None:
raise ContactError("contact_id is None")
# Since contact_id is registry_id,
# check that its the right length
contact_id_length = len(contact_id)
if (
contact_id_length > PublicContact.get_max_id_length()
or contact_id_length < 1
):
raise ContactError(
"contact_id is of invalid length. "
"Cannot exceed 16 characters, "
f"got {contact_id} with a length of {contact_id_length}"
)
if not isinstance(contact, eppInfo.InfoContactResultData):
raise ContactError("Contact must be of type InfoContactResultData")
auth_info = contact.auth_info
postal_info = contact.postal_info
addr = postal_info.addr
streets = None
if addr is not None:
streets = addr.street
streets_kwargs = self._convert_streets_to_dict(streets)
desired_contact = PublicContact(
domain=self,
contact_type=contact_type,
registry_id=contact_id,
email=contact.email or "",
voice=contact.voice or "",
fax=contact.fax,
name=postal_info.name or "",
org=postal_info.org,
# For linter - default to "" instead of None
pw=getattr(auth_info, "pw", ""),
city=getattr(addr, "city", ""),
pc=getattr(addr, "pc", ""),
cc=getattr(addr, "cc", ""),
sp=getattr(addr, "sp", ""),
**streets_kwargs,
) # type: ignore
return desired_contact
def _convert_streets_to_dict(self, streets):
"""
Converts EPPLibs street representation
to PublicContacts.
Args:
streets (Sequence[str]): Streets from EPPLib.
Returns:
dict: {
"street1": str or "",
"street2": str or None,
"street3": str or None,
}
EPPLib returns 'street' as an sequence of strings.
Meanwhile, PublicContact has this split into three
seperate properties: street1, street2, street3.
Handles this disparity.
"""
# 'zips' two lists together.
# For instance, (('street1', 'some_value_here'),
# ('street2', 'some_value_here'))
# Dict then converts this to a useable kwarg which we can pass in
street_dict = dict(
zip_longest(
["street1", "street2", "street3"],
streets if streets is not None else [""],
fillvalue=None,
)
)
return street_dict
def _request_contact_info(self, contact: PublicContact):
try:
req = commands.InfoContact(id=contact.registry_id)
return registry.send(req, cleaned=True).res_data[0]
except RegistryError as error:
logger.error(
"Registry threw error for contact id %s contact type is %s, error code is\n %s full error is %s", # noqa
contact.registry_id,
contact.contact_type,
error.code,
error,
)
raise error
def generic_contact_getter(
self, contact_type_choice: PublicContact.ContactTypeChoices
) -> PublicContact | None:
"""Retrieves the desired PublicContact from the registry.
This abstracts the caching and EPP retrieval for
all contact items and thus may result in EPP calls being sent.
contact_type_choice is a literal in PublicContact.ContactTypeChoices,
for instance: PublicContact.ContactTypeChoices.SECURITY.
If you wanted to setup getter logic for Security, you would call:
cache_contact_helper(PublicContact.ContactTypeChoices.SECURITY),
or cache_contact_helper("security").
"""
# registrant_contact(s) are an edge case. They exist on
# the "registrant" property as opposed to contacts.
desired_property = "contacts"
if contact_type_choice == PublicContact.ContactTypeChoices.REGISTRANT:
desired_property = "registrant"
try:
# Grab from cache
contacts = self._get_property(desired_property)
except KeyError as error:
logger.error(f"Could not find {contact_type_choice}: {error}")
return None
else:
cached_contact = self.get_contact_in_keys(contacts, contact_type_choice)
if cached_contact is None:
# TODO - #1103
raise ContactError("No contact was found in cache or the registry")
return cached_contact
def get_default_security_contact(self):
"""Gets the default security contact."""
contact = PublicContact.get_default_security()
contact.domain = self
return contact
def get_default_administrative_contact(self):
"""Gets the default administrative contact."""
contact = PublicContact.get_default_administrative()
contact.domain = self
return contact
def get_default_technical_contact(self):
"""Gets the default technical contact."""
contact = PublicContact.get_default_technical()
contact.domain = self
return contact
def get_default_registrant_contact(self):
"""Gets the default registrant contact."""
contact = PublicContact.get_default_registrant()
contact.domain = self
return contact
def get_contact_in_keys(self, contacts, contact_type):
"""Gets a contact object.
Args:
contacts ([PublicContact]): List of PublicContacts
contact_type (literal): Which PublicContact to get
Returns:
PublicContact | None
"""
# Registrant doesn't exist as an array, and is of
# a special data type, so we need to handle that.
if contact_type == PublicContact.ContactTypeChoices.REGISTRANT:
desired_contact = None
if isinstance(contacts, str):
desired_contact = self._registrant_to_public_contact(
self._cache["registrant"]
)
# Set the cache with the updated object
# for performance reasons.
if "registrant" in self._cache:
self._cache["registrant"] = desired_contact
elif isinstance(contacts, PublicContact):
desired_contact = contacts
return self._handle_registrant_contact(desired_contact)
_registry_id: str
if contact_type in contacts:
_registry_id = contacts.get(contact_type)
desired = PublicContact.objects.filter(
registry_id=_registry_id, domain=self, contact_type=contact_type
)
if desired.count() == 1:
return desired.get()
logger.info(f"Requested contact {_registry_id} does not exist in cache.")
return None
def _handle_registrant_contact(self, contact):
if (
contact.contact_type is not None
and contact.contact_type == PublicContact.ContactTypeChoices.REGISTRANT
):
return contact
else:
raise ValueError("Invalid contact object for registrant_contact")
# ForeignKey on UserDomainRole creates a "permissions" member for
# all of the user-roles that are in place for this domain
@ -720,9 +950,9 @@ class Domain(TimeStampedModel, DomainHelper):
try:
logger.info("Getting domain info from epp")
req = commands.InfoDomain(name=self.name)
domainInfo = registry.send(req, cleaned=True).res_data[0]
domainInfoResponse = registry.send(req, cleaned=True)
exitEarly = True
return domainInfo
return domainInfoResponse
except RegistryError as e:
count += 1
@ -772,12 +1002,10 @@ class Domain(TimeStampedModel, DomainHelper):
security_contact = self.get_default_security_contact()
security_contact.save()
technical_contact = PublicContact.get_default_technical()
technical_contact.domain = self
technical_contact = self.get_default_technical_contact()
technical_contact.save()
administrative_contact = PublicContact.get_default_administrative()
administrative_contact.domain = self
administrative_contact = self.get_default_administrative_contact()
administrative_contact.save()
@transition(
@ -919,16 +1147,34 @@ class Domain(TimeStampedModel, DomainHelper):
)
return err.code
def _request_contact_info(self, contact: PublicContact):
req = commands.InfoContact(id=contact.registry_id)
return registry.send(req, cleaned=True).res_data[0]
def _fetch_contacts(self, contact_data):
"""Fetch contact info."""
choices = PublicContact.ContactTypeChoices
# We expect that all these fields get populated,
# so we can create these early, rather than waiting.
contacts_dict = {
choices.ADMINISTRATIVE: None,
choices.SECURITY: None,
choices.TECHNICAL: None,
}
for domainContact in contact_data:
req = commands.InfoContact(id=domainContact.contact)
data = registry.send(req, cleaned=True).res_data[0]
# Map the object we recieved from EPP to a PublicContact
mapped_object = self.map_epp_contact_to_public_contact(
data, domainContact.contact, domainContact.type
)
# Find/create it in the DB
in_db = self._get_or_create_public_contact(mapped_object)
contacts_dict[in_db.contact_type] = in_db.registry_id
return contacts_dict
def _get_or_create_contact(self, contact: PublicContact):
"""Try to fetch info about a contact. Create it if it does not exist."""
try:
return self._request_contact_info(contact)
except RegistryError as e:
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST:
logger.info(
@ -951,6 +1197,23 @@ class Domain(TimeStampedModel, DomainHelper):
raise e
def _fetch_hosts(self, host_data):
"""Fetch host info."""
hosts = []
for name in host_data:
req = commands.InfoHost(name=name)
data = registry.send(req, cleaned=True).res_data[0]
host = {
"name": name,
"addrs": getattr(data, "addrs", ...),
"cr_date": getattr(data, "cr_date", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
}
hosts.append({k: v for k, v in host.items() if v is not ...})
return hosts
def _update_or_create_host(self, host):
raise NotImplementedError()
@ -961,7 +1224,8 @@ class Domain(TimeStampedModel, DomainHelper):
"""Contact registry for info about a domain."""
try:
# get info from registry
data = self._get_or_create_domain()
dataResponse = self._get_or_create_domain()
data = dataResponse.res_data[0]
# extract properties from response
# (Ellipsis is used to mean "null")
cache = {
@ -976,14 +1240,21 @@ class Domain(TimeStampedModel, DomainHelper):
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
}
# remove null properties (to distinguish between "a value of None" and null)
cleaned = {k: v for k, v in cache.items() if v is not ...}
# statuses can just be a list no need to keep the epp object
if "statuses" in cleaned.keys():
if "statuses" in cleaned:
cleaned["statuses"] = [status.state for status in cleaned["statuses"]]
# get extensions info, if there is any
# DNSSECExtension is one possible extension, make sure to handle
# only DNSSECExtension and not other type extensions
returned_extensions = dataResponse.extensions
cleaned["dnssecdata"] = None
for extension in returned_extensions:
if isinstance(extension, extensions.DNSSECExtension):
cleaned["dnssecdata"] = extension
# Capture and store old hosts and contacts from cache if they exist
old_cache_hosts = self._cache.get("hosts")
old_cache_contacts = self._cache.get("contacts")
@ -993,7 +1264,7 @@ class Domain(TimeStampedModel, DomainHelper):
fetch_contacts
and "_contacts" in cleaned
and isinstance(cleaned["_contacts"], list)
and len(cleaned["_contacts"])
and len(cleaned["_contacts"]) > 0
):
cleaned["contacts"] = self._fetch_contacts(cleaned["_contacts"])
# We're only getting contacts, so retain the old
@ -1015,52 +1286,69 @@ class Domain(TimeStampedModel, DomainHelper):
# and pass them along.
if old_cache_contacts is not None:
cleaned["contacts"] = old_cache_contacts
# replace the prior cache with new data
self._cache = cleaned
except RegistryError as e:
logger.error(e)
def _fetch_contacts(self, contact_data):
"""Fetch contact info."""
contacts = []
for domainContact in contact_data:
req = commands.InfoContact(id=domainContact.contact)
data = registry.send(req, cleaned=True).res_data[0]
contact = {
"id": domainContact.contact,
"type": domainContact.type,
"auth_info": getattr(data, "auth_info", ...),
"cr_date": getattr(data, "cr_date", ...),
"disclose": getattr(data, "disclose", ...),
"email": getattr(data, "email", ...),
"fax": getattr(data, "fax", ...),
"postal_info": getattr(data, "postal_info", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
"voice": getattr(data, "voice", ...),
}
contacts.append({k: v for k, v in contact.items() if v is not ...})
return contacts
def _get_or_create_public_contact(self, public_contact: PublicContact):
"""Tries to find a PublicContact object in our DB.
If it can't, it'll create it. Returns PublicContact"""
db_contact = PublicContact.objects.filter(
registry_id=public_contact.registry_id,
contact_type=public_contact.contact_type,
domain=self,
)
def _fetch_hosts(self, host_data):
"""Fetch host info."""
hosts = []
for name in host_data:
req = commands.InfoHost(name=name)
data = registry.send(req, cleaned=True).res_data[0]
host = {
"name": name,
"addrs": getattr(data, "addrs", ...),
"cr_date": getattr(data, "cr_date", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
}
hosts.append({k: v for k, v in host.items() if v is not ...})
return hosts
# Raise an error if we find duplicates.
# This should not occur
if db_contact.count() > 1:
raise Exception(
f"Multiple contacts found for {public_contact.contact_type}"
)
# Save to DB if it doesn't exist already.
if db_contact.count() == 0:
# Doesn't run custom save logic, just saves to DB
public_contact.save(skip_epp_save=True)
logger.info(f"Created a new PublicContact: {public_contact}")
# Append the item we just created
return public_contact
existing_contact = db_contact.get()
# Does the item we're grabbing match
# what we have in our DB?
if (
existing_contact.email != public_contact.email
or existing_contact.registry_id != public_contact.registry_id
):
existing_contact.delete()
public_contact.save()
logger.warning("Requested PublicContact is out of sync " "with DB.")
return public_contact
# If it already exists, we can
# assume that the DB instance was updated
# during set, so we should just use that.
return existing_contact
def _registrant_to_public_contact(self, registry_id: str):
"""EPPLib returns the registrant as a string,
which is the registrants associated registry_id. This function is used to
convert that id to a useable object by calling commands.InfoContact
on that ID, then mapping that object to type PublicContact."""
contact = PublicContact(
registry_id=registry_id,
contact_type=PublicContact.ContactTypeChoices.REGISTRANT,
)
# Grabs the expanded contact
full_object = self._request_contact_info(contact)
# Maps it to type PublicContact
mapped_object = self.map_epp_contact_to_public_contact(
full_object, contact.registry_id, contact.contact_type
)
return self._get_or_create_public_contact(mapped_object)
def _invalidate_cache(self):
"""Remove cache data when updates are made."""

View file

@ -29,7 +29,8 @@ class PublicContact(TimeStampedModel):
def save(self, *args, **kwargs):
"""Save to the registry and also locally in the registrar database."""
if hasattr(self, "domain"):
skip_epp_save = kwargs.pop("skip_epp_save", False)
if hasattr(self, "domain") and not skip_epp_save:
match self.contact_type:
case PublicContact.ContactTypeChoices.REGISTRANT:
self.domain.registrant_contact = self
@ -148,6 +149,10 @@ class PublicContact(TimeStampedModel):
pw="thisisnotapassword",
)
@classmethod
def get_max_id_length(cls):
return cls._meta.get_field("registry_id").max_length
def __str__(self):
return (
f"{self.name} <{self.email}>"

View file

@ -15,7 +15,7 @@ class UserDomainRole(TimeStampedModel):
elsewhere.
"""
ADMIN = "admin"
ADMIN = "manager"
user = models.ForeignKey(
"registrar.User",

View file

@ -0,0 +1,2 @@
class ContactError(Exception):
...

View file

@ -21,7 +21,7 @@
<button
type="submit"
class="usa-button"
>Add security email</button>
>{% if form.security_email.value is None or form.security_email.value == "dotgov@cisa.dhs.gov"%}Add security email{% else %}Save{% endif %}</button>
</form>
{% endblock %} {# domain_content #}

View file

@ -26,6 +26,7 @@ from registrar.models import (
from epplibwrapper import (
commands,
common,
info,
RegistryError,
ErrorCode,
)
@ -555,32 +556,117 @@ class MockEppLib(TestCase):
contacts=...,
hosts=...,
statuses=...,
registrant=...,
):
self.auth_info = auth_info
self.cr_date = cr_date
self.contacts = contacts
self.hosts = hosts
self.statuses = statuses
self.registrant = registrant
def dummyInfoContactResultData(
self,
id,
email,
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
pw="thisisnotapassword",
):
fake = info.InfoContactResultData(
id=id,
postal_info=common.PostalInfo(
name="Registry Customer Service",
addr=common.ContactAddr(
street=["4200 Wilson Blvd."],
city="Arlington",
pc="22201",
cc="US",
sp="VA",
),
org="Cybersecurity and Infrastructure Security Agency",
type="type",
),
voice="+1.8882820870",
fax="+1-212-9876543",
email=email,
auth_info=common.ContactAuthInfo(pw=pw),
roid=...,
statuses=[],
cl_id=...,
cr_id=...,
cr_date=cr_date,
up_id=...,
up_date=...,
tr_date=...,
disclose=...,
vat=...,
ident=...,
notify_email=...,
)
return fake
mockDataInfoDomain = fakedEppObject(
"fakepw",
"fakePw",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
contacts=[common.DomainContact(contact="123", type="security")],
contacts=[
common.DomainContact(
contact="123", type=PublicContact.ContactTypeChoices.SECURITY
)
],
hosts=["fake.host.com"],
statuses=[
common.Status(state="serverTransferProhibited", description="", lang="en"),
common.Status(state="inactive", description="", lang="en"),
],
)
mockDataInfoContact = mockDataInfoDomain.dummyInfoContactResultData(
"123", "123@mail.gov", datetime.datetime(2023, 5, 25, 19, 45, 35), "lastPw"
)
InfoDomainWithContacts = fakedEppObject(
"fakepw",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
contacts=[
common.DomainContact(
contact="securityContact",
type=PublicContact.ContactTypeChoices.SECURITY,
),
common.DomainContact(
contact="technicalContact",
type=PublicContact.ContactTypeChoices.TECHNICAL,
),
common.DomainContact(
contact="adminContact",
type=PublicContact.ContactTypeChoices.ADMINISTRATIVE,
),
],
hosts=["fake.host.com"],
statuses=[
common.Status(state="serverTransferProhibited", description="", lang="en"),
common.Status(state="inactive", description="", lang="en"),
],
registrant="regContact",
)
mockSecurityContact = InfoDomainWithContacts.dummyInfoContactResultData(
"securityContact", "security@mail.gov"
)
mockTechnicalContact = InfoDomainWithContacts.dummyInfoContactResultData(
"technicalContact", "tech@mail.gov"
)
mockAdministrativeContact = InfoDomainWithContacts.dummyInfoContactResultData(
"adminContact", "admin@mail.gov"
)
mockRegistrantContact = InfoDomainWithContacts.dummyInfoContactResultData(
"regContact", "registrant@mail.gov"
)
infoDomainNoContact = fakedEppObject(
"security",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
contacts=[],
hosts=["fake.host.com"],
)
mockDataInfoContact = fakedEppObject(
"anotherPw", cr_date=datetime.datetime(2023, 7, 25, 19, 45, 35)
)
mockDataInfoHosts = fakedEppObject(
"lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35)
)
@ -593,9 +679,28 @@ class MockEppLib(TestCase):
if isinstance(_request, commands.InfoDomain):
if getattr(_request, "name", None) == "security.gov":
return MagicMock(res_data=[self.infoDomainNoContact])
return MagicMock(res_data=[self.mockDataInfoDomain])
elif getattr(_request, "name", None) == "freeman.gov":
return MagicMock(res_data=[self.InfoDomainWithContacts])
else:
return MagicMock(res_data=[self.mockDataInfoDomain])
elif isinstance(_request, commands.InfoContact):
return MagicMock(res_data=[self.mockDataInfoContact])
mocked_result: info.InfoContactResultData
# For testing contact types
match getattr(_request, "id", None):
case "securityContact":
mocked_result = self.mockSecurityContact
case "technicalContact":
mocked_result = self.mockTechnicalContact
case "adminContact":
mocked_result = self.mockAdministrativeContact
case "regContact":
mocked_result = self.mockRegistrantContact
case _:
# Default contact return
mocked_result = self.mockDataInfoContact
return MagicMock(res_data=[mocked_result])
elif (
isinstance(_request, commands.CreateContact)
and getattr(_request, "id", None) == "fail"

View file

@ -3,6 +3,7 @@ Feature being tested: Registry Integration
This file tests the various ways in which the registrar interacts with the registry.
"""
from typing import Mapping, Any
from django.test import TestCase
from django.db.utils import IntegrityError
from unittest.mock import MagicMock, patch, call
@ -20,19 +21,28 @@ from django_fsm import TransitionNotAllowed # type: ignore
from epplibwrapper import (
commands,
common,
extensions,
responses,
RegistryError,
ErrorCode,
)
import logging
logger = logging.getLogger(__name__)
class TestDomainCache(MockEppLib):
def tearDown(self):
PublicContact.objects.all().delete()
Domain.objects.all().delete()
super().tearDown()
def test_cache_sets_resets(self):
"""Cache should be set on getter and reset on setter calls"""
domain, _ = Domain.objects.get_or_create(name="igorville.gov")
# trigger getter
_ = domain.creation_date
domain._get_property("contacts")
# getter should set the domain cache with a InfoDomain object
# (see InfoDomainResult)
self.assertEquals(domain._cache["auth_info"], self.mockDataInfoDomain.auth_info)
@ -80,13 +90,16 @@ class TestDomainCache(MockEppLib):
def test_cache_nested_elements(self):
"""Cache works correctly with the nested objects cache and hosts"""
domain, _ = Domain.objects.get_or_create(name="igorville.gov")
# the cached contacts and hosts should be dictionaries of what is passed to them
# The contact list will initially contain objects of type 'DomainContact'
# this is then transformed into PublicContact, and cache should NOT
# hold onto the DomainContact object
expectedUnfurledContactsList = [
common.DomainContact(contact="123", type="security"),
]
expectedContactsDict = {
"id": self.mockDataInfoDomain.contacts[0].contact,
"type": self.mockDataInfoDomain.contacts[0].type,
"auth_info": self.mockDataInfoContact.auth_info,
"cr_date": self.mockDataInfoContact.cr_date,
PublicContact.ContactTypeChoices.ADMINISTRATIVE: None,
PublicContact.ContactTypeChoices.SECURITY: "123",
PublicContact.ContactTypeChoices.TECHNICAL: None,
}
expectedHostsDict = {
"name": self.mockDataInfoDomain.hosts[0],
@ -102,13 +115,15 @@ class TestDomainCache(MockEppLib):
# check contacts
self.assertEqual(domain._cache["_contacts"], self.mockDataInfoDomain.contacts)
self.assertEqual(domain._cache["contacts"], [expectedContactsDict])
# The contact list should not contain what is sent by the registry by default,
# as _fetch_cache will transform the type to PublicContact
self.assertNotEqual(domain._cache["contacts"], expectedUnfurledContactsList)
self.assertEqual(domain._cache["contacts"], expectedContactsDict)
# get and check hosts is set correctly
domain._get_property("hosts")
self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
self.assertEqual(domain._cache["contacts"], [expectedContactsDict])
self.assertEqual(domain._cache["contacts"], expectedContactsDict)
# invalidate cache
domain._cache = {}
@ -119,11 +134,64 @@ class TestDomainCache(MockEppLib):
# get contacts
domain._get_property("contacts")
self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
self.assertEqual(domain._cache["contacts"], [expectedContactsDict])
self.assertEqual(domain._cache["contacts"], expectedContactsDict)
def tearDown(self) -> None:
Domain.objects.all().delete()
super().tearDown()
def test_map_epp_contact_to_public_contact(self):
# Tests that the mapper is working how we expect
domain, _ = Domain.objects.get_or_create(name="registry.gov")
security = PublicContact.ContactTypeChoices.SECURITY
mapped = domain.map_epp_contact_to_public_contact(
self.mockDataInfoContact,
self.mockDataInfoContact.id,
security,
)
expected_contact = PublicContact(
domain=domain,
contact_type=security,
registry_id="123",
email="123@mail.gov",
voice="+1.8882820870",
fax="+1-212-9876543",
pw="lastPw",
name="Registry Customer Service",
org="Cybersecurity and Infrastructure Security Agency",
city="Arlington",
pc="22201",
cc="US",
sp="VA",
street1="4200 Wilson Blvd.",
)
# Test purposes only, since we're comparing
# two duplicate objects. We would expect
# these not to have the same state.
expected_contact._state = mapped._state
# Mapped object is what we expect
self.assertEqual(mapped.__dict__, expected_contact.__dict__)
# The mapped object should correctly translate to a DB
# object. If not, something else went wrong.
db_object = domain._get_or_create_public_contact(mapped)
in_db = PublicContact.objects.filter(
registry_id=domain.security_contact.registry_id,
contact_type=security,
).get()
# DB Object is the same as the mapped object
self.assertEqual(db_object, in_db)
domain.security_contact = in_db
# Trigger the getter
_ = domain.security_contact
# Check to see that changes made
# to DB objects persist in cache correctly
in_db.email = "123test@mail.gov"
in_db.save()
cached_contact = domain._cache["contacts"].get(security)
self.assertEqual(cached_contact, in_db.registry_id)
self.assertEqual(domain.security_contact.email, "123test@mail.gov")
class TestDomainCreation(MockEppLib):
@ -207,7 +275,10 @@ class TestDomainCreation(MockEppLib):
def tearDown(self) -> None:
DomainInformation.objects.all().delete()
DomainApplication.objects.all().delete()
PublicContact.objects.all().delete()
Domain.objects.all().delete()
User.objects.all().delete()
DraftDomain.objects.all().delete()
super().tearDown()
@ -221,7 +292,6 @@ class TestDomainStatuses(MockEppLib):
_ = domain.statuses
status_list = [status.state for status in self.mockDataInfoDomain.statuses]
self.assertEquals(domain._cache["statuses"], status_list)
# Called in _fetch_cache
self.mockedSendFunction.assert_has_calls(
[
@ -265,6 +335,7 @@ class TestDomainStatuses(MockEppLib):
raise
def tearDown(self) -> None:
PublicContact.objects.all().delete()
Domain.objects.all().delete()
super().tearDown()
@ -387,12 +458,17 @@ class TestRegistrantContacts(MockEppLib):
And the registrant is the admin on a domain
"""
super().setUp()
# Creates a domain with no contact associated to it
self.domain, _ = Domain.objects.get_or_create(name="security.gov")
# Creates a domain with an associated contact
self.domain_contact, _ = Domain.objects.get_or_create(name="freeman.gov")
def tearDown(self):
super().tearDown()
# self.contactMailingAddressPatch.stop()
# self.createContactPatch.stop()
self.domain._invalidate_cache()
self.domain_contact._invalidate_cache()
PublicContact.objects.all().delete()
Domain.objects.all().delete()
def test_no_security_email(self):
"""
@ -734,6 +810,133 @@ class TestRegistrantContacts(MockEppLib):
"""
raise
def test_contact_getter_security(self):
security = PublicContact.ContactTypeChoices.SECURITY
# Create prexisting object
expected_contact = self.domain.map_epp_contact_to_public_contact(
self.mockSecurityContact,
contact_id="securityContact",
contact_type=security,
)
# Checks if we grabbed the correct PublicContact
self.assertEqual(
self.domain_contact.security_contact.email, expected_contact.email
)
expected_contact_db = PublicContact.objects.filter(
registry_id=self.domain_contact.security_contact.registry_id,
contact_type=security,
).get()
self.assertEqual(self.domain_contact.security_contact, expected_contact_db)
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoContact(id="securityContact", auth_info=None),
cleaned=True,
),
]
)
# Checks if we are receiving the cache we expect
cache = self.domain_contact._cache["contacts"]
self.assertEqual(cache.get(security), "securityContact")
def test_contact_getter_technical(self):
technical = PublicContact.ContactTypeChoices.TECHNICAL
expected_contact = self.domain.map_epp_contact_to_public_contact(
self.mockTechnicalContact,
contact_id="technicalContact",
contact_type=technical,
)
self.assertEqual(
self.domain_contact.technical_contact.email, expected_contact.email
)
# Checks if we grab the correct PublicContact
expected_contact_db = PublicContact.objects.filter(
registry_id=self.domain_contact.technical_contact.registry_id,
contact_type=technical,
).get()
# Checks if we grab the correct PublicContact
self.assertEqual(self.domain_contact.technical_contact, expected_contact_db)
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoContact(id="technicalContact", auth_info=None),
cleaned=True,
),
]
)
# Checks if we are receiving the cache we expect
cache = self.domain_contact._cache["contacts"]
self.assertEqual(cache.get(technical), "technicalContact")
def test_contact_getter_administrative(self):
administrative = PublicContact.ContactTypeChoices.ADMINISTRATIVE
expected_contact = self.domain.map_epp_contact_to_public_contact(
self.mockAdministrativeContact,
contact_id="adminContact",
contact_type=administrative,
)
self.assertEqual(
self.domain_contact.administrative_contact.email, expected_contact.email
)
expected_contact_db = PublicContact.objects.filter(
registry_id=self.domain_contact.administrative_contact.registry_id,
contact_type=administrative,
).get()
# Checks if we grab the correct PublicContact
self.assertEqual(
self.domain_contact.administrative_contact, expected_contact_db
)
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoContact(id="adminContact", auth_info=None),
cleaned=True,
),
]
)
# Checks if we are receiving the cache we expect
cache = self.domain_contact._cache["contacts"]
self.assertEqual(cache.get(administrative), "adminContact")
def test_contact_getter_registrant(self):
expected_contact = self.domain.map_epp_contact_to_public_contact(
self.mockRegistrantContact,
contact_id="regContact",
contact_type=PublicContact.ContactTypeChoices.REGISTRANT,
)
self.assertEqual(
self.domain_contact.registrant_contact.email, expected_contact.email
)
expected_contact_db = PublicContact.objects.filter(
registry_id=self.domain_contact.registrant_contact.registry_id,
contact_type=PublicContact.ContactTypeChoices.REGISTRANT,
).get()
# Checks if we grab the correct PublicContact
self.assertEqual(self.domain_contact.registrant_contact, expected_contact_db)
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoContact(id="regContact", auth_info=None),
cleaned=True,
),
]
)
# Checks if we are receiving the cache we expect.
self.assertEqual(self.domain_contact._cache["registrant"], expected_contact_db)
class TestRegistrantNameservers(TestCase):
"""Rule: Registrants may modify their nameservers"""
@ -874,44 +1077,372 @@ class TestRegistrantNameservers(TestCase):
raise
class TestRegistrantDNSSEC(TestCase):
class TestRegistrantDNSSEC(MockEppLib):
"""Rule: Registrants may modify their secure DNS data"""
# helper function to create UpdateDomainDNSSECExtention object for verification
def createUpdateExtension(self, dnssecdata: extensions.DNSSECExtension):
return commands.UpdateDomainDNSSECExtension(
maxSigLife=dnssecdata.maxSigLife,
dsData=dnssecdata.dsData,
keyData=dnssecdata.keyData,
remDsData=None,
remKeyData=None,
remAllDsKeyData=True,
)
def setUp(self):
"""
Background:
Given the registrant is logged in
And the registrant is the admin on a domain
Given the analyst is logged in
And a domain exists in the registry
"""
pass
super().setUp()
# for the tests, need a domain in the unknown state
self.domain, _ = Domain.objects.get_or_create(name="fake.gov")
self.addDsData1 = {
"keyTag": 1234,
"alg": 3,
"digestType": 1,
"digest": "ec0bdd990b39feead889f0ba613db4adec0bdd99",
}
self.addDsData2 = {
"keyTag": 2345,
"alg": 3,
"digestType": 1,
"digest": "ec0bdd990b39feead889f0ba613db4adecb4adec",
}
self.keyDataDict = {
"flags": 257,
"protocol": 3,
"alg": 1,
"pubKey": "AQPJ////4Q==",
}
self.dnssecExtensionWithDsData: Mapping[str, Any] = {
"dsData": [common.DSData(**self.addDsData1)]
}
self.dnssecExtensionWithMultDsData: Mapping[str, Any] = {
"dsData": [
common.DSData(**self.addDsData1),
common.DSData(**self.addDsData2),
],
}
self.dnssecExtensionWithKeyData: Mapping[str, Any] = {
"maxSigLife": 3215,
"keyData": [common.DNSSECKeyData(**self.keyDataDict)],
}
@skip("not implemented yet")
def test_user_adds_dns_data(self):
def tearDown(self):
Domain.objects.all().delete()
super().tearDown()
def test_user_adds_dnssec_data(self):
"""
Scenario: Registrant adds DNS data
Scenario: Registrant adds DNSSEC data.
Verify that both the setter and getter are functioning properly
This test verifies:
1 - setter calls UpdateDomain command
2 - setter adds the UpdateDNSSECExtension extension to the command
3 - setter causes the getter to call info domain on next get from cache
4 - getter properly parses dnssecdata from InfoDomain response and sets to cache
"""
raise
@skip("not implemented yet")
# make sure to stop any other patcher so there are no conflicts
self.mockSendPatch.stop()
def side_effect(_request, cleaned):
return MagicMock(
res_data=[self.mockDataInfoDomain],
extensions=[
extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
],
)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
self.domain.dnssecdata = self.dnssecExtensionWithDsData
# get the DNS SEC extension added to the UpdateDomain command and
# verify that it is properly sent
# args[0] is the _request sent to registry
args, _ = mocked_send.call_args
# assert that the extension matches
self.assertEquals(
args[0].extensions[0],
self.createUpdateExtension(
extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
),
)
# test that the dnssecdata getter is functioning properly
dnssecdata_get = self.domain.dnssecdata
mocked_send.assert_has_calls(
[
call(
commands.UpdateDomain(
name="fake.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call(
commands.InfoDomain(
name="fake.gov",
),
cleaned=True,
),
]
)
self.assertEquals(
dnssecdata_get.dsData, self.dnssecExtensionWithDsData["dsData"]
)
patcher.stop()
def test_dnssec_is_idempotent(self):
"""
Scenario: Registrant adds DNS data twice, due to a UI glitch
"""
# implementation note: this requires seeing what happens when these are actually
# sent like this, and then implementing appropriate mocks for any errors the
# registry normally sends in this case
raise
@skip("not implemented yet")
This test verifies:
1 - UpdateDomain command called twice
2 - setter causes the getter to call info domain on next get from cache
3 - getter properly parses dnssecdata from InfoDomain response and sets to cache
"""
# make sure to stop any other patcher so there are no conflicts
self.mockSendPatch.stop()
def side_effect(_request, cleaned):
return MagicMock(
res_data=[self.mockDataInfoDomain],
extensions=[
extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
],
)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
# set the dnssecdata once
self.domain.dnssecdata = self.dnssecExtensionWithDsData
# set the dnssecdata again
self.domain.dnssecdata = self.dnssecExtensionWithDsData
# test that the dnssecdata getter is functioning properly
dnssecdata_get = self.domain.dnssecdata
mocked_send.assert_has_calls(
[
call(
commands.UpdateDomain(
name="fake.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call(
commands.UpdateDomain(
name="fake.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call(
commands.InfoDomain(
name="fake.gov",
),
cleaned=True,
),
]
)
self.assertEquals(
dnssecdata_get.dsData, self.dnssecExtensionWithDsData["dsData"]
)
patcher.stop()
def test_user_adds_dnssec_data_multiple_dsdata(self):
"""
Scenario: Registrant adds DNSSEC data with multiple DSData.
Verify that both the setter and getter are functioning properly
This test verifies:
1 - setter calls UpdateDomain command
2 - setter adds the UpdateDNSSECExtension extension to the command
3 - setter causes the getter to call info domain on next get from cache
4 - getter properly parses dnssecdata from InfoDomain response and sets to cache
"""
# make sure to stop any other patcher so there are no conflicts
self.mockSendPatch.stop()
def side_effect(_request, cleaned):
return MagicMock(
res_data=[self.mockDataInfoDomain],
extensions=[
extensions.DNSSECExtension(**self.dnssecExtensionWithMultDsData)
],
)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
self.domain.dnssecdata = self.dnssecExtensionWithMultDsData
# get the DNS SEC extension added to the UpdateDomain command
# and verify that it is properly sent
# args[0] is the _request sent to registry
args, _ = mocked_send.call_args
# assert that the extension matches
self.assertEquals(
args[0].extensions[0],
self.createUpdateExtension(
extensions.DNSSECExtension(**self.dnssecExtensionWithMultDsData)
),
)
# test that the dnssecdata getter is functioning properly
dnssecdata_get = self.domain.dnssecdata
mocked_send.assert_has_calls(
[
call(
commands.UpdateDomain(
name="fake.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call(
commands.InfoDomain(
name="fake.gov",
),
cleaned=True,
),
]
)
self.assertEquals(
dnssecdata_get.dsData, self.dnssecExtensionWithMultDsData["dsData"]
)
patcher.stop()
def test_user_adds_dnssec_keydata(self):
"""
Scenario: Registrant adds DNSSEC data.
Verify that both the setter and getter are functioning properly
This test verifies:
1 - setter calls UpdateDomain command
2 - setter adds the UpdateDNSSECExtension extension to the command
3 - setter causes the getter to call info domain on next get from cache
4 - getter properly parses dnssecdata from InfoDomain response and sets to cache
"""
# make sure to stop any other patcher so there are no conflicts
self.mockSendPatch.stop()
def side_effect(_request, cleaned):
return MagicMock(
res_data=[self.mockDataInfoDomain],
extensions=[
extensions.DNSSECExtension(**self.dnssecExtensionWithKeyData)
],
)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
self.domain.dnssecdata = self.dnssecExtensionWithKeyData
# get the DNS SEC extension added to the UpdateDomain command
# and verify that it is properly sent
# args[0] is the _request sent to registry
args, _ = mocked_send.call_args
# assert that the extension matches
self.assertEquals(
args[0].extensions[0],
self.createUpdateExtension(
extensions.DNSSECExtension(**self.dnssecExtensionWithKeyData)
),
)
# test that the dnssecdata getter is functioning properly
dnssecdata_get = self.domain.dnssecdata
mocked_send.assert_has_calls(
[
call(
commands.UpdateDomain(
name="fake.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call(
commands.InfoDomain(
name="fake.gov",
),
cleaned=True,
),
]
)
self.assertEquals(
dnssecdata_get.keyData, self.dnssecExtensionWithKeyData["keyData"]
)
patcher.stop()
def test_update_is_unsuccessful(self):
"""
Scenario: An update to the dns data is unsuccessful
When an error is returned from epplibwrapper
Then a user-friendly error message is returned for displaying on the web
"""
raise
# make sure to stop any other patcher so there are no conflicts
self.mockSendPatch.stop()
def side_effect(_request, cleaned):
raise RegistryError(code=ErrorCode.PARAMETER_VALUE_RANGE_ERROR)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
# if RegistryError is raised, view formats user-friendly
# error message if error is_client_error, is_session_error, or
# is_server_error; so test for those conditions
with self.assertRaises(RegistryError) as err:
self.domain.dnssecdata = self.dnssecExtensionWithDsData
self.assertTrue(
err.is_client_error() or err.is_session_error() or err.is_server_error()
)
patcher.stop()
class TestAnalystClientHold(MockEppLib):

View file

@ -1,11 +1,11 @@
from unittest import skip
from unittest.mock import MagicMock, ANY
from unittest.mock import MagicMock, ANY, patch
from django.conf import settings
from django.test import Client, TestCase
from django.urls import reverse
from django.contrib.auth import get_user_model
from .common import completed_application
from .common import MockEppLib, completed_application # type: ignore
from django_webtest import WebTest # type: ignore
import boto3_mocking # type: ignore
@ -25,7 +25,6 @@ from registrar.models import (
from registrar.views.application import ApplicationWizard, Step
from .common import less_console_noise
from .common import MockEppLib
class TestViews(TestCase):
@ -1133,7 +1132,7 @@ class TestDomainPermissions(TestWithDomainPermissions):
self.assertEqual(response.status_code, 403)
class TestDomainDetail(TestWithDomainPermissions, WebTest):
class TestDomainDetail(TestWithDomainPermissions, WebTest, MockEppLib):
def setUp(self):
super().setUp()
self.app.set_user(self.user.username)
@ -1426,6 +1425,40 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest):
)
self.assertContains(page, "Testy")
def test_domain_security_email_existing_security_contact(self):
"""Can load domain's security email page."""
self.mockSendPatch = patch("registrar.models.domain.registry.send")
self.mockedSendFunction = self.mockSendPatch.start()
self.mockedSendFunction.side_effect = self.mockSend
domain_contact, _ = Domain.objects.get_or_create(name="freeman.gov")
# Add current user to this domain
_ = UserDomainRole(user=self.user, domain=domain_contact, role="admin").save()
page = self.client.get(
reverse("domain-security-email", kwargs={"pk": domain_contact.id})
)
# Loads correctly
self.assertContains(page, "Domain security email")
self.assertContains(page, "security@mail.gov")
self.mockSendPatch.stop()
def test_domain_security_email_no_security_contact(self):
"""Loads a domain with no defined security email.
We should not show the default."""
self.mockSendPatch = patch("registrar.models.domain.registry.send")
self.mockedSendFunction = self.mockSendPatch.start()
self.mockedSendFunction.side_effect = self.mockSend
page = self.client.get(
reverse("domain-security-email", kwargs={"pk": self.domain.id})
)
# Loads correctly
self.assertContains(page, "Domain security email")
self.assertNotContains(page, "dotgov@cisa.dhs.gov")
self.mockSendPatch.stop()
def test_domain_security_email(self):
"""Can load domain's security email page."""
page = self.client.get(
@ -1433,10 +1466,8 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest):
)
self.assertContains(page, "Domain security email")
@skip("Ticket 912 needs to fix this one")
def test_domain_security_email_form(self):
"""Adding a security email works.
Uses self.app WebTest because we need to interact with forms.
"""
security_email_page = self.app.get(
@ -1456,7 +1487,7 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest):
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_page = result.follow()
self.assertContains(
success_page, "The security email for this domain have been updated"
success_page, "The security email for this domain has been updated"
)
def test_domain_overview_blocked_for_ineligible_user(self):

View file

@ -259,7 +259,11 @@ class DomainSecurityEmailView(DomainPermissionView, FormMixin):
"""The initial value for the form."""
domain = self.get_object()
initial = super().get_initial()
initial["security_email"] = domain.security_contact.email
security_contact = domain.security_contact
if security_contact is None or security_contact.email == "dotgov@cisa.dhs.gov":
initial["security_email"] = None
return initial
initial["security_email"] = security_contact.email
return initial
def get_success_url(self):
@ -288,7 +292,7 @@ class DomainSecurityEmailView(DomainPermissionView, FormMixin):
contact.save()
messages.success(
self.request, "The security email for this domain have been updated."
self.request, "The security email for this domain has been updated."
)
# superclass has the redirect

View file

@ -22,7 +22,7 @@ django-phonenumber-field[phonenumberslite]==7.1.0
django-widget-tweaks==1.4.12
environs[django]==9.5.0
faker==18.10.0
git+https://github.com/cisagov/epplib.git@f818cbf0b069a12f03e1d72e4b9f4900924b832d#egg=fred-epplib
git+https://github.com/cisagov/epplib.git@d56d183f1664f34c40ca9716a3a9a345f0ef561c#egg=fred-epplib
furl==2.1.3
future==0.18.3 ; python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'
gunicorn==20.1.0
@ -49,5 +49,5 @@ setuptools==67.8.0 ; python_version >= '3.7'
six==1.16.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
sqlparse==0.4.4 ; python_version >= '3.5'
typing-extensions==4.6.3
urllib3==1.26.16 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5'
urllib3==1.26.17 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5'
whitenoise==6.4.0