Merge branch 'main' into za/937-invitation-does-not-work

This commit is contained in:
zandercymatics 2023-10-04 13:43:55 -06:00
commit 1e61a60fc9
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
24 changed files with 1679 additions and 223 deletions

View file

@ -19,6 +19,7 @@ jobs:
|| startsWith(github.head_ref, 'rh/') || startsWith(github.head_ref, 'rh/')
|| startsWith(github.head_ref, 'nl/') || startsWith(github.head_ref, 'nl/')
|| startsWith(github.head_ref, 'dk/') || startsWith(github.head_ref, 'dk/')
|| startsWith(github.head_ref, 'es/')
outputs: outputs:
environment: ${{ steps.var.outputs.environment}} environment: ${{ steps.var.outputs.environment}}
runs-on: "ubuntu-latest" runs-on: "ubuntu-latest"

View file

@ -15,6 +15,7 @@ on:
options: options:
- stable - stable
- staging - staging
- es
- nl - nl
- rh - rh
- za - za

View file

@ -16,6 +16,7 @@ on:
options: options:
- stable - stable
- staging - staging
- es
- nl - nl
- rh - rh
- za - za

View file

@ -0,0 +1,30 @@
---
applications:
- name: getgov-es
buildpacks:
- python_buildpack
path: ../../src
instances: 1
memory: 512M
stack: cflinuxfs4
timeout: 180
command: ./run.sh
health-check-type: http
health-check-http-endpoint: /health
health-check-invocation-timeout: 40
env:
# Send stdout and stderr straight to the terminal without buffering
PYTHONUNBUFFERED: yup
# Tell Django where to find its configuration
DJANGO_SETTINGS_MODULE: registrar.config.settings
# Tell Django where it is being hosted
DJANGO_BASE_URL: https://getgov-es.app.cloud.gov
# Tell Django how much stuff to log
DJANGO_LOG_LEVEL: INFO
# default public site location
GETGOV_PUBLIC_SITE_URL: https://beta.get.gov
routes:
- route: getgov-es.app.cloud.gov
services:
- getgov-credentials
- getgov-es-database

2
src/Pipfile.lock generated
View file

@ -353,7 +353,7 @@
}, },
"fred-epplib": { "fred-epplib": {
"git": "https://github.com/cisagov/epplib.git", "git": "https://github.com/cisagov/epplib.git",
"ref": "f818cbf0b069a12f03e1d72e4b9f4900924b832d" "ref": "d56d183f1664f34c40ca9716a3a9a345f0ef561c"
}, },
"furl": { "furl": {
"hashes": [ "hashes": [

View file

@ -39,7 +39,7 @@ class AvailableViewTest(TestCase):
self.assertIn("gsa.gov", domains) self.assertIn("gsa.gov", domains)
# entries are all lowercase so GSA.GOV is not in the set # entries are all lowercase so GSA.GOV is not in the set
self.assertNotIn("GSA.GOV", domains) self.assertNotIn("GSA.GOV", domains)
self.assertNotIn("igorville.gov", domains) self.assertNotIn("igorvilleremixed.gov", domains)
# all the entries have dots # all the entries have dots
self.assertNotIn("gsa", domains) self.assertNotIn("gsa", domains)
@ -48,7 +48,7 @@ class AvailableViewTest(TestCase):
# input is lowercased so GSA.GOV should be found # input is lowercased so GSA.GOV should be found
self.assertTrue(in_domains("GSA.GOV")) self.assertTrue(in_domains("GSA.GOV"))
# This domain should not have been registered # This domain should not have been registered
self.assertFalse(in_domains("igorville.gov")) self.assertFalse(in_domains("igorvilleremixed.gov"))
def test_in_domains_dotgov(self): def test_in_domains_dotgov(self):
"""Domain searches work without trailing .gov""" """Domain searches work without trailing .gov"""
@ -56,7 +56,7 @@ class AvailableViewTest(TestCase):
# input is lowercased so GSA.GOV should be found # input is lowercased so GSA.GOV should be found
self.assertTrue(in_domains("GSA")) self.assertTrue(in_domains("GSA"))
# This domain should not have been registered # This domain should not have been registered
self.assertFalse(in_domains("igorville")) self.assertFalse(in_domains("igorvilleremixed"))
def test_not_available_domain(self): def test_not_available_domain(self):
"""gsa.gov is not available""" """gsa.gov is not available"""
@ -66,17 +66,17 @@ class AvailableViewTest(TestCase):
self.assertFalse(json.loads(response.content)["available"]) self.assertFalse(json.loads(response.content)["available"])
def test_available_domain(self): def test_available_domain(self):
"""igorville.gov is still available""" """igorvilleremixed.gov is still available"""
request = self.factory.get(API_BASE_PATH + "igorville.gov") request = self.factory.get(API_BASE_PATH + "igorvilleremixed.gov")
request.user = self.user request.user = self.user
response = available(request, domain="igorville.gov") response = available(request, domain="igorvilleremixed.gov")
self.assertTrue(json.loads(response.content)["available"]) self.assertTrue(json.loads(response.content)["available"])
def test_available_domain_dotgov(self): def test_available_domain_dotgov(self):
"""igorville.gov is still available even without the .gov suffix""" """igorvilleremixed.gov is still available even without the .gov suffix"""
request = self.factory.get(API_BASE_PATH + "igorville") request = self.factory.get(API_BASE_PATH + "igorvilleremixed")
request.user = self.user request.user = self.user
response = available(request, domain="igorville") response = available(request, domain="igorvilleremixed")
self.assertTrue(json.loads(response.content)["available"]) self.assertTrue(json.loads(response.content)["available"])
def test_error_handling(self): def test_error_handling(self):

View file

@ -11,6 +11,7 @@ logger = logging.getLogger(__name__)
NAMESPACE = SimpleNamespace( NAMESPACE = SimpleNamespace(
EPP="urn:ietf:params:xml:ns:epp-1.0", EPP="urn:ietf:params:xml:ns:epp-1.0",
SEC_DNS="urn:ietf:params:xml:ns:secDNS-1.1",
XSI="http://www.w3.org/2001/XMLSchema-instance", XSI="http://www.w3.org/2001/XMLSchema-instance",
FRED="noop", FRED="noop",
NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0", NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0",
@ -25,6 +26,7 @@ NAMESPACE = SimpleNamespace(
SCHEMA_LOCATION = SimpleNamespace( SCHEMA_LOCATION = SimpleNamespace(
XSI="urn:ietf:params:xml:ns:epp-1.0 epp-1.0.xsd", XSI="urn:ietf:params:xml:ns:epp-1.0 epp-1.0.xsd",
FRED="noop fred-1.5.0.xsd", FRED="noop fred-1.5.0.xsd",
SEC_DNS="urn:ietf:params:xml:ns:secDNS-1.1 secDNS-1.1.xsd",
NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0 contact-1.0.xsd", NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0 contact-1.0.xsd",
NIC_DOMAIN="urn:ietf:params:xml:ns:domain-1.0 domain-1.0.xsd", NIC_DOMAIN="urn:ietf:params:xml:ns:domain-1.0 domain-1.0.xsd",
NIC_ENUMVAL="noop enumval-1.2.0.xsd", NIC_ENUMVAL="noop enumval-1.2.0.xsd",
@ -44,7 +46,9 @@ except NameError:
try: try:
from .client import CLIENT, commands from .client import CLIENT, commands
from .errors import RegistryError, ErrorCode from .errors import RegistryError, ErrorCode
from epplib.models import common from epplib.models import common, info
from epplib.responses import extensions
from epplib import responses
except ImportError: except ImportError:
pass pass
@ -52,6 +56,9 @@ __all__ = [
"CLIENT", "CLIENT",
"commands", "commands",
"common", "common",
"extensions",
"responses",
"info",
"ErrorCode", "ErrorCode",
"RegistryError", "RegistryError",
] ]

View file

@ -6,10 +6,13 @@ from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.http.response import HttpResponseRedirect from django.http.response import HttpResponseRedirect
from django.urls import reverse from django.urls import reverse
from epplibwrapper.errors import ErrorCode, RegistryError
from registrar.models.domain import Domain
from registrar.models.utility.admin_sort_fields import AdminSortFields from registrar.models.utility.admin_sort_fields import AdminSortFields
from . import models from . import models
from auditlog.models import LogEntry # type: ignore from auditlog.models import LogEntry # type: ignore
from auditlog.admin import LogEntryAdmin # type: ignore from auditlog.admin import LogEntryAdmin # type: ignore
from django_fsm import TransitionNotAllowed # type: ignore
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -716,16 +719,61 @@ class DomainAdmin(ListHeaderAdmin):
return super().response_change(request, obj) return super().response_change(request, obj)
def do_delete_domain(self, request, obj): def do_delete_domain(self, request, obj):
if not isinstance(obj, Domain):
# Could be problematic if the type is similar,
# but not the same (same field/func names).
# We do not want to accidentally delete records.
self.message_user(request, "Object is not of type Domain", messages.ERROR)
return
try: try:
obj.deleted() obj.deletedInEpp()
obj.save() obj.save()
except Exception as err: except RegistryError as err:
self.message_user(request, err, messages.ERROR) # Using variables to get past the linter
message1 = f"Cannot delete Domain when in state {obj.state}"
message2 = "This subdomain is being used as a hostname on another domain"
# Human-readable mappings of ErrorCodes. Can be expanded.
error_messages = {
# noqa on these items as black wants to reformat to an invalid length
ErrorCode.OBJECT_STATUS_PROHIBITS_OPERATION: message1,
ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION: message2,
}
message = "Cannot connect to the registry"
if not err.is_connection_error():
# If nothing is found, will default to returned err
message = error_messages.get(err.code, err)
self.message_user(
request, f"Error deleting this Domain: {message}", messages.ERROR
)
except TransitionNotAllowed:
if obj.state == Domain.State.DELETED:
self.message_user(
request,
"This domain is already deleted",
messages.INFO,
)
else:
self.message_user(
request,
"Error deleting this Domain: "
f"Can't switch from state '{obj.state}' to 'deleted'"
", must be either 'dns_needed' or 'on_hold'",
messages.ERROR,
)
except Exception:
self.message_user(
request,
"Could not delete: An unspecified error occured",
messages.ERROR,
)
else: else:
self.message_user( self.message_user(
request, request,
("Domain %s Should now be deleted " ". Thanks!") % obj.name, ("Domain %s has been deleted. Thanks!") % obj.name,
) )
return HttpResponseRedirect(".") return HttpResponseRedirect(".")
def do_get_status(self, request, obj): def do_get_status(self, request, obj):

View file

@ -570,6 +570,7 @@ SECURE_SSL_REDIRECT = True
ALLOWED_HOSTS = [ ALLOWED_HOSTS = [
"getgov-stable.app.cloud.gov", "getgov-stable.app.cloud.gov",
"getgov-staging.app.cloud.gov", "getgov-staging.app.cloud.gov",
"getgov-es.app.cloud.gov",
"getgov-nl.app.cloud.gov", "getgov-nl.app.cloud.gov",
"getgov-rh.app.cloud.gov", "getgov-rh.app.cloud.gov",
"getgov-za.app.cloud.gov", "getgov-za.app.cloud.gov",

View file

@ -83,6 +83,16 @@ class UserFixture:
"first_name": "Nicolle", "first_name": "Nicolle",
"last_name": "LeClair", "last_name": "LeClair",
}, },
{
"username": "24840450-bf47-4d89-8aa9-c612fe68f9da",
"first_name": "Erin",
"last_name": "Song",
},
{
"username": "e0ea8b94-6e53-4430-814a-849a7ca45f21",
"first_name": "Kristina",
"last_name": "Yin",
},
] ]
STAFF = [ STAFF = [
@ -134,6 +144,18 @@ class UserFixture:
"last_name": "LeClair-Analyst", "last_name": "LeClair-Analyst",
"email": "nicolle.leclair@ecstech.com", "email": "nicolle.leclair@ecstech.com",
}, },
{
"username": "378d0bc4-d5a7-461b-bd84-3ae6f6864af9",
"first_name": "Erin-Analyst",
"last_name": "Song-Analyst",
"email": "erin.song+1@gsa.gov",
},
{
"username": "9a98e4c9-9409-479d-964e-4aec7799107f",
"first_name": "Kristina-Analyst",
"last_name": "Yin-Analyst",
"email": "kristina.yin+1@gsa.gov",
},
] ]
STAFF_PERMISSIONS = [ STAFF_PERMISSIONS = [

View file

@ -0,0 +1,17 @@
# Generated by Django 4.2.1 on 2023-10-02 22:07
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("registrar", "0032_alter_transitiondomain_status"),
]
operations = [
migrations.AlterField(
model_name="userdomainrole",
name="role",
field=models.TextField(choices=[("manager", "Admin")]),
),
]

View file

@ -1,8 +1,8 @@
from itertools import zip_longest
import logging import logging
from datetime import date from datetime import date
from string import digits from string import digits
from django_fsm import FSMField, transition # type: ignore from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore
from django.db import models from django.db import models
@ -10,9 +10,12 @@ from epplibwrapper import (
CLIENT as registry, CLIENT as registry,
commands, commands,
common as epp, common as epp,
extensions,
info as eppInfo,
RegistryError, RegistryError,
ErrorCode, ErrorCode,
) )
from registrar.models.utility.contact_error import ContactError
from .utility.domain_field import DomainField from .utility.domain_field import DomainField
from .utility.domain_helper import DomainHelper from .utility.domain_helper import DomainHelper
@ -279,6 +282,27 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error("Error _create_host, code was %s error was %s" % (e.code, e)) logger.error("Error _create_host, code was %s error was %s" % (e.code, e))
return e.code return e.code
@Cache
def dnssecdata(self) -> extensions.DNSSECExtension:
return self._get_property("dnssecdata")
@dnssecdata.setter # type: ignore
def dnssecdata(self, _dnssecdata: extensions.DNSSECExtension):
updateParams = {
"maxSigLife": _dnssecdata.get("maxSigLife", None),
"dsData": _dnssecdata.get("dsData", None),
"keyData": _dnssecdata.get("keyData", None),
"remAllDsKeyData": True,
}
request = commands.UpdateDomain(name=self.name)
extension = commands.UpdateDomainDNSSECExtension(**updateParams)
request.add_extension(extension)
try:
registry.send(request, cleaned=True)
except RegistryError as e:
logger.error("Error adding DNSSEC, code was %s error was %s" % (e.code, e))
raise e
@nameservers.setter # type: ignore @nameservers.setter # type: ignore
def nameservers(self, hosts: list[tuple[str]]): def nameservers(self, hosts: list[tuple[str]]):
"""host should be a tuple of type str, str,... where the elements are """host should be a tuple of type str, str,... where the elements are
@ -352,9 +376,9 @@ class Domain(TimeStampedModel, DomainHelper):
raise NotImplementedError() raise NotImplementedError()
@Cache @Cache
def registrant_contact(self) -> PublicContact: def registrant_contact(self) -> PublicContact | None:
"""Get or set the registrant for this domain.""" registrant = PublicContact.ContactTypeChoices.REGISTRANT
raise NotImplementedError() return self.generic_contact_getter(registrant)
@registrant_contact.setter # type: ignore @registrant_contact.setter # type: ignore
def registrant_contact(self, contact: PublicContact): def registrant_contact(self, contact: PublicContact):
@ -367,9 +391,10 @@ class Domain(TimeStampedModel, DomainHelper):
) )
@Cache @Cache
def administrative_contact(self) -> PublicContact: def administrative_contact(self) -> PublicContact | None:
"""Get or set the admin contact for this domain.""" """Get the admin contact for this domain."""
raise NotImplementedError() admin = PublicContact.ContactTypeChoices.ADMINISTRATIVE
return self.generic_contact_getter(admin)
@administrative_contact.setter # type: ignore @administrative_contact.setter # type: ignore
def administrative_contact(self, contact: PublicContact): def administrative_contact(self, contact: PublicContact):
@ -381,12 +406,6 @@ class Domain(TimeStampedModel, DomainHelper):
self._make_contact_in_registry(contact=contact) self._make_contact_in_registry(contact=contact)
self._update_domain_with_contact(contact, rem=False) self._update_domain_with_contact(contact, rem=False)
def get_default_security_contact(self):
logger.info("getting default sec contact")
contact = PublicContact.get_default_security()
contact.domain = self
return contact
def _update_epp_contact(self, contact: PublicContact): def _update_epp_contact(self, contact: PublicContact):
"""Sends UpdateContact to update the actual contact object, """Sends UpdateContact to update the actual contact object,
domain object remains unaffected domain object remains unaffected
@ -440,26 +459,10 @@ class Domain(TimeStampedModel, DomainHelper):
) )
@Cache @Cache
def security_contact(self) -> PublicContact: def security_contact(self) -> PublicContact | None:
"""Get or set the security contact for this domain.""" """Get or set the security contact for this domain."""
try: security = PublicContact.ContactTypeChoices.SECURITY
contacts = self._get_property("contacts") return self.generic_contact_getter(security)
for contact in contacts:
if (
"type" in contact.keys()
and contact["type"] == PublicContact.ContactTypeChoices.SECURITY
):
tempContact = self.get_default_security_contact()
tempContact.email = contact["email"]
return tempContact
except Exception as err: # use better error handling
logger.info("Couldn't get contact %s" % err)
# TODO - remove this ideally it should return None,
# but error handling needs to be
# added on the security email page so that it can handle it being none
return self.get_default_security_contact()
def _add_registrant_to_existing_domain(self, contact: PublicContact): def _add_registrant_to_existing_domain(self, contact: PublicContact):
"""Used to change the registrant contact on an existing domain""" """Used to change the registrant contact on an existing domain"""
@ -533,6 +536,7 @@ class Domain(TimeStampedModel, DomainHelper):
.filter(domain=self, contact_type=contact.contact_type) .filter(domain=self, contact_type=contact.contact_type)
.get() .get()
) )
if isRegistrant: if isRegistrant:
# send update domain only for registant contacts # send update domain only for registant contacts
existing_contact.delete() existing_contact.delete()
@ -589,9 +593,10 @@ class Domain(TimeStampedModel, DomainHelper):
) )
@Cache @Cache
def technical_contact(self) -> PublicContact: def technical_contact(self) -> PublicContact | None:
"""Get or set the tech contact for this domain.""" """Get or set the tech contact for this domain."""
raise NotImplementedError() tech = PublicContact.ContactTypeChoices.TECHNICAL
return self.generic_contact_getter(tech)
@technical_contact.setter # type: ignore @technical_contact.setter # type: ignore
def technical_contact(self, contact: PublicContact): def technical_contact(self, contact: PublicContact):
@ -609,11 +614,6 @@ class Domain(TimeStampedModel, DomainHelper):
""" """
return self.state == self.State.READY return self.state == self.State.READY
def delete_request(self):
"""Delete from host. Possibly a duplicate of _delete_host?"""
# TODO fix in ticket #901
pass
def transfer(self): def transfer(self):
"""Going somewhere. Not implemented.""" """Going somewhere. Not implemented."""
raise NotImplementedError() raise NotImplementedError()
@ -658,7 +658,7 @@ class Domain(TimeStampedModel, DomainHelper):
"""This domain should be deleted from the registry """This domain should be deleted from the registry
may raises RegistryError, should be caught or handled correctly by caller""" may raises RegistryError, should be caught or handled correctly by caller"""
request = commands.DeleteDomain(name=self.name) request = commands.DeleteDomain(name=self.name)
registry.send(request) registry.send(request, cleaned=True)
def __str__(self) -> str: def __str__(self) -> str:
return self.name return self.name
@ -679,6 +679,231 @@ class Domain(TimeStampedModel, DomainHelper):
help_text="Very basic info about the lifecycle of this domain object", help_text="Very basic info about the lifecycle of this domain object",
) )
def isActive(self):
return self.state == Domain.State.CREATED
def map_epp_contact_to_public_contact(
self, contact: eppInfo.InfoContactResultData, contact_id, contact_type
):
"""Maps the Epp contact representation to a PublicContact object.
contact -> eppInfo.InfoContactResultData: The converted contact object
contact_id -> str: The given registry_id of the object (i.e "cheese@cia.gov")
contact_type -> str: The given contact type, (i.e. "tech" or "registrant")
"""
if contact is None:
return None
if contact_type is None:
raise ContactError("contact_type is None")
if contact_id is None:
raise ContactError("contact_id is None")
# Since contact_id is registry_id,
# check that its the right length
contact_id_length = len(contact_id)
if (
contact_id_length > PublicContact.get_max_id_length()
or contact_id_length < 1
):
raise ContactError(
"contact_id is of invalid length. "
"Cannot exceed 16 characters, "
f"got {contact_id} with a length of {contact_id_length}"
)
if not isinstance(contact, eppInfo.InfoContactResultData):
raise ContactError("Contact must be of type InfoContactResultData")
auth_info = contact.auth_info
postal_info = contact.postal_info
addr = postal_info.addr
streets = None
if addr is not None:
streets = addr.street
streets_kwargs = self._convert_streets_to_dict(streets)
desired_contact = PublicContact(
domain=self,
contact_type=contact_type,
registry_id=contact_id,
email=contact.email or "",
voice=contact.voice or "",
fax=contact.fax,
name=postal_info.name or "",
org=postal_info.org,
# For linter - default to "" instead of None
pw=getattr(auth_info, "pw", ""),
city=getattr(addr, "city", ""),
pc=getattr(addr, "pc", ""),
cc=getattr(addr, "cc", ""),
sp=getattr(addr, "sp", ""),
**streets_kwargs,
) # type: ignore
return desired_contact
def _convert_streets_to_dict(self, streets):
"""
Converts EPPLibs street representation
to PublicContacts.
Args:
streets (Sequence[str]): Streets from EPPLib.
Returns:
dict: {
"street1": str or "",
"street2": str or None,
"street3": str or None,
}
EPPLib returns 'street' as an sequence of strings.
Meanwhile, PublicContact has this split into three
seperate properties: street1, street2, street3.
Handles this disparity.
"""
# 'zips' two lists together.
# For instance, (('street1', 'some_value_here'),
# ('street2', 'some_value_here'))
# Dict then converts this to a useable kwarg which we can pass in
street_dict = dict(
zip_longest(
["street1", "street2", "street3"],
streets if streets is not None else [""],
fillvalue=None,
)
)
return street_dict
def _request_contact_info(self, contact: PublicContact):
try:
req = commands.InfoContact(id=contact.registry_id)
return registry.send(req, cleaned=True).res_data[0]
except RegistryError as error:
logger.error(
"Registry threw error for contact id %s contact type is %s, error code is\n %s full error is %s", # noqa
contact.registry_id,
contact.contact_type,
error.code,
error,
)
raise error
def generic_contact_getter(
self, contact_type_choice: PublicContact.ContactTypeChoices
) -> PublicContact | None:
"""Retrieves the desired PublicContact from the registry.
This abstracts the caching and EPP retrieval for
all contact items and thus may result in EPP calls being sent.
contact_type_choice is a literal in PublicContact.ContactTypeChoices,
for instance: PublicContact.ContactTypeChoices.SECURITY.
If you wanted to setup getter logic for Security, you would call:
cache_contact_helper(PublicContact.ContactTypeChoices.SECURITY),
or cache_contact_helper("security").
"""
# registrant_contact(s) are an edge case. They exist on
# the "registrant" property as opposed to contacts.
desired_property = "contacts"
if contact_type_choice == PublicContact.ContactTypeChoices.REGISTRANT:
desired_property = "registrant"
try:
# Grab from cache
contacts = self._get_property(desired_property)
except KeyError as error:
logger.error(f"Could not find {contact_type_choice}: {error}")
return None
else:
cached_contact = self.get_contact_in_keys(contacts, contact_type_choice)
if cached_contact is None:
# TODO - #1103
raise ContactError("No contact was found in cache or the registry")
return cached_contact
def get_default_security_contact(self):
"""Gets the default security contact."""
contact = PublicContact.get_default_security()
contact.domain = self
return contact
def get_default_administrative_contact(self):
"""Gets the default administrative contact."""
contact = PublicContact.get_default_administrative()
contact.domain = self
return contact
def get_default_technical_contact(self):
"""Gets the default technical contact."""
contact = PublicContact.get_default_technical()
contact.domain = self
return contact
def get_default_registrant_contact(self):
"""Gets the default registrant contact."""
contact = PublicContact.get_default_registrant()
contact.domain = self
return contact
def get_contact_in_keys(self, contacts, contact_type):
"""Gets a contact object.
Args:
contacts ([PublicContact]): List of PublicContacts
contact_type (literal): Which PublicContact to get
Returns:
PublicContact | None
"""
# Registrant doesn't exist as an array, and is of
# a special data type, so we need to handle that.
if contact_type == PublicContact.ContactTypeChoices.REGISTRANT:
desired_contact = None
if isinstance(contacts, str):
desired_contact = self._registrant_to_public_contact(
self._cache["registrant"]
)
# Set the cache with the updated object
# for performance reasons.
if "registrant" in self._cache:
self._cache["registrant"] = desired_contact
elif isinstance(contacts, PublicContact):
desired_contact = contacts
return self._handle_registrant_contact(desired_contact)
_registry_id: str
if contact_type in contacts:
_registry_id = contacts.get(contact_type)
desired = PublicContact.objects.filter(
registry_id=_registry_id, domain=self, contact_type=contact_type
)
if desired.count() == 1:
return desired.get()
logger.info(f"Requested contact {_registry_id} does not exist in cache.")
return None
def _handle_registrant_contact(self, contact):
if (
contact.contact_type is not None
and contact.contact_type == PublicContact.ContactTypeChoices.REGISTRANT
):
return contact
else:
raise ValueError("Invalid contact object for registrant_contact")
# ForeignKey on UserDomainRole creates a "permissions" member for # ForeignKey on UserDomainRole creates a "permissions" member for
# all of the user-roles that are in place for this domain # all of the user-roles that are in place for this domain
@ -725,9 +950,9 @@ class Domain(TimeStampedModel, DomainHelper):
try: try:
logger.info("Getting domain info from epp") logger.info("Getting domain info from epp")
req = commands.InfoDomain(name=self.name) req = commands.InfoDomain(name=self.name)
domainInfo = registry.send(req, cleaned=True).res_data[0] domainInfoResponse = registry.send(req, cleaned=True)
exitEarly = True exitEarly = True
return domainInfo return domainInfoResponse
except RegistryError as e: except RegistryError as e:
count += 1 count += 1
@ -777,12 +1002,10 @@ class Domain(TimeStampedModel, DomainHelper):
security_contact = self.get_default_security_contact() security_contact = self.get_default_security_contact()
security_contact.save() security_contact.save()
technical_contact = PublicContact.get_default_technical() technical_contact = self.get_default_technical_contact()
technical_contact.domain = self
technical_contact.save() technical_contact.save()
administrative_contact = PublicContact.get_default_administrative() administrative_contact = self.get_default_administrative_contact()
administrative_contact.domain = self
administrative_contact.save() administrative_contact.save()
@transition( @transition(
@ -804,16 +1027,32 @@ class Domain(TimeStampedModel, DomainHelper):
self._remove_client_hold() self._remove_client_hold()
# TODO -on the client hold ticket any additional error handling here # TODO -on the client hold ticket any additional error handling here
@transition(field="state", source=State.ON_HOLD, target=State.DELETED) @transition(
def deleted(self): field="state", source=[State.ON_HOLD, State.DNS_NEEDED], target=State.DELETED
"""domain is deleted in epp but is saved in our database""" )
# TODO Domains may not be deleted if: def deletedInEpp(self):
# a child host is being used by """Domain is deleted in epp but is saved in our database.
# another .gov domains. The host must be first removed Error handling should be provided by the caller."""
# and/or renamed before the parent domain may be deleted. # While we want to log errors, we want to preserve
logger.info("pendingCreate()-> inside pending create") # that information when this function is called.
self._delete_domain() # Human-readable errors are introduced at the admin.py level,
# TODO - delete ticket any additional error handling here # as doing everything here would reduce reliablity.
try:
logger.info("deletedInEpp()-> inside _delete_domain")
self._delete_domain()
except RegistryError as err:
logger.error(f"Could not delete domain. Registry returned error: {err}")
raise err
except TransitionNotAllowed as err:
logger.error("Could not delete domain. FSM failure: {err}")
raise err
except Exception as err:
logger.error(
f"Could not delete domain. An unspecified error occured: {err}"
)
raise err
else:
self._invalidate_cache()
@transition( @transition(
field="state", field="state",
@ -910,16 +1149,34 @@ class Domain(TimeStampedModel, DomainHelper):
) )
return err.code return err.code
def _request_contact_info(self, contact: PublicContact): def _fetch_contacts(self, contact_data):
req = commands.InfoContact(id=contact.registry_id) """Fetch contact info."""
return registry.send(req, cleaned=True).res_data[0] choices = PublicContact.ContactTypeChoices
# We expect that all these fields get populated,
# so we can create these early, rather than waiting.
contacts_dict = {
choices.ADMINISTRATIVE: None,
choices.SECURITY: None,
choices.TECHNICAL: None,
}
for domainContact in contact_data:
req = commands.InfoContact(id=domainContact.contact)
data = registry.send(req, cleaned=True).res_data[0]
# Map the object we recieved from EPP to a PublicContact
mapped_object = self.map_epp_contact_to_public_contact(
data, domainContact.contact, domainContact.type
)
# Find/create it in the DB
in_db = self._get_or_create_public_contact(mapped_object)
contacts_dict[in_db.contact_type] = in_db.registry_id
return contacts_dict
def _get_or_create_contact(self, contact: PublicContact): def _get_or_create_contact(self, contact: PublicContact):
"""Try to fetch info about a contact. Create it if it does not exist.""" """Try to fetch info about a contact. Create it if it does not exist."""
try: try:
return self._request_contact_info(contact) return self._request_contact_info(contact)
except RegistryError as e: except RegistryError as e:
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST: if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST:
logger.info( logger.info(
@ -942,6 +1199,23 @@ class Domain(TimeStampedModel, DomainHelper):
raise e raise e
def _fetch_hosts(self, host_data):
"""Fetch host info."""
hosts = []
for name in host_data:
req = commands.InfoHost(name=name)
data = registry.send(req, cleaned=True).res_data[0]
host = {
"name": name,
"addrs": getattr(data, "addrs", ...),
"cr_date": getattr(data, "cr_date", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
}
hosts.append({k: v for k, v in host.items() if v is not ...})
return hosts
def _update_or_create_host(self, host): def _update_or_create_host(self, host):
raise NotImplementedError() raise NotImplementedError()
@ -952,7 +1226,8 @@ class Domain(TimeStampedModel, DomainHelper):
"""Contact registry for info about a domain.""" """Contact registry for info about a domain."""
try: try:
# get info from registry # get info from registry
data = self._get_or_create_domain() dataResponse = self._get_or_create_domain()
data = dataResponse.res_data[0]
# extract properties from response # extract properties from response
# (Ellipsis is used to mean "null") # (Ellipsis is used to mean "null")
cache = { cache = {
@ -967,89 +1242,116 @@ class Domain(TimeStampedModel, DomainHelper):
"tr_date": getattr(data, "tr_date", ...), "tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...), "up_date": getattr(data, "up_date", ...),
} }
# remove null properties (to distinguish between "a value of None" and null) # remove null properties (to distinguish between "a value of None" and null)
cleaned = {k: v for k, v in cache.items() if v is not ...} cleaned = {k: v for k, v in cache.items() if v is not ...}
# statuses can just be a list no need to keep the epp object # statuses can just be a list no need to keep the epp object
if "statuses" in cleaned.keys(): if "statuses" in cleaned:
cleaned["statuses"] = [status.state for status in cleaned["statuses"]] cleaned["statuses"] = [status.state for status in cleaned["statuses"]]
# get extensions info, if there is any
# DNSSECExtension is one possible extension, make sure to handle
# only DNSSECExtension and not other type extensions
returned_extensions = dataResponse.extensions
cleaned["dnssecdata"] = None
for extension in returned_extensions:
if isinstance(extension, extensions.DNSSECExtension):
cleaned["dnssecdata"] = extension
# Capture and store old hosts and contacts from cache if they exist
old_cache_hosts = self._cache.get("hosts")
old_cache_contacts = self._cache.get("contacts")
# get contact info, if there are any # get contact info, if there are any
if ( if (
# fetch_contacts and fetch_contacts
"_contacts" in cleaned and "_contacts" in cleaned
and isinstance(cleaned["_contacts"], list) and isinstance(cleaned["_contacts"], list)
and len(cleaned["_contacts"]) and len(cleaned["_contacts"]) > 0
): ):
cleaned["contacts"] = [] cleaned["contacts"] = self._fetch_contacts(cleaned["_contacts"])
for domainContact in cleaned["_contacts"]: # We're only getting contacts, so retain the old
# we do not use _get_or_create_* because we expect the object we # hosts that existed in cache (if they existed)
# just asked the registry for still exists -- # and pass them along.
# if not, that's a problem if old_cache_hosts is not None:
cleaned["hosts"] = old_cache_hosts
# TODO- discuss-should we check if contact is in public contacts
# and add it if not- this is really to keep in mine the transisiton
req = commands.InfoContact(id=domainContact.contact)
data = registry.send(req, cleaned=True).res_data[0]
# extract properties from response
# (Ellipsis is used to mean "null")
# convert this to use PublicContactInstead
contact = {
"id": domainContact.contact,
"type": domainContact.type,
"auth_info": getattr(data, "auth_info", ...),
"cr_date": getattr(data, "cr_date", ...),
"disclose": getattr(data, "disclose", ...),
"email": getattr(data, "email", ...),
"fax": getattr(data, "fax", ...),
"postal_info": getattr(data, "postal_info", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
"voice": getattr(data, "voice", ...),
}
cleaned["contacts"].append(
{k: v for k, v in contact.items() if v is not ...}
)
# get nameserver info, if there are any # get nameserver info, if there are any
if ( if (
# fetch_hosts and fetch_hosts
"_hosts" in cleaned and "_hosts" in cleaned
and isinstance(cleaned["_hosts"], list) and isinstance(cleaned["_hosts"], list)
and len(cleaned["_hosts"]) and len(cleaned["_hosts"])
): ):
# TODO- add elif in cache set it to be the old cache value cleaned["hosts"] = self._fetch_hosts(cleaned["_hosts"])
# no point in removing # We're only getting hosts, so retain the old
cleaned["hosts"] = [] # contacts that existed in cache (if they existed)
for name in cleaned["_hosts"]: # and pass them along.
# we do not use _get_or_create_* because we expect the object we if old_cache_contacts is not None:
# just asked the registry for still exists -- cleaned["contacts"] = old_cache_contacts
# if not, that's a problem
req = commands.InfoHost(name=name)
data = registry.send(req, cleaned=True).res_data[0]
# extract properties from response
# (Ellipsis is used to mean "null")
host = {
"name": name,
"addrs": getattr(data, "addrs", ...),
"cr_date": getattr(data, "cr_date", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
}
cleaned["hosts"].append(
{k: v for k, v in host.items() if v is not ...}
)
# replace the prior cache with new data # replace the prior cache with new data
self._cache = cleaned self._cache = cleaned
except RegistryError as e: except RegistryError as e:
logger.error(e) logger.error(e)
def _get_or_create_public_contact(self, public_contact: PublicContact):
"""Tries to find a PublicContact object in our DB.
If it can't, it'll create it. Returns PublicContact"""
db_contact = PublicContact.objects.filter(
registry_id=public_contact.registry_id,
contact_type=public_contact.contact_type,
domain=self,
)
# Raise an error if we find duplicates.
# This should not occur
if db_contact.count() > 1:
raise Exception(
f"Multiple contacts found for {public_contact.contact_type}"
)
# Save to DB if it doesn't exist already.
if db_contact.count() == 0:
# Doesn't run custom save logic, just saves to DB
public_contact.save(skip_epp_save=True)
logger.info(f"Created a new PublicContact: {public_contact}")
# Append the item we just created
return public_contact
existing_contact = db_contact.get()
# Does the item we're grabbing match
# what we have in our DB?
if (
existing_contact.email != public_contact.email
or existing_contact.registry_id != public_contact.registry_id
):
existing_contact.delete()
public_contact.save()
logger.warning("Requested PublicContact is out of sync " "with DB.")
return public_contact
# If it already exists, we can
# assume that the DB instance was updated
# during set, so we should just use that.
return existing_contact
def _registrant_to_public_contact(self, registry_id: str):
"""EPPLib returns the registrant as a string,
which is the registrants associated registry_id. This function is used to
convert that id to a useable object by calling commands.InfoContact
on that ID, then mapping that object to type PublicContact."""
contact = PublicContact(
registry_id=registry_id,
contact_type=PublicContact.ContactTypeChoices.REGISTRANT,
)
# Grabs the expanded contact
full_object = self._request_contact_info(contact)
# Maps it to type PublicContact
mapped_object = self.map_epp_contact_to_public_contact(
full_object, contact.registry_id, contact.contact_type
)
return self._get_or_create_public_contact(mapped_object)
def _invalidate_cache(self): def _invalidate_cache(self):
"""Remove cache data when updates are made.""" """Remove cache data when updates are made."""
self._cache = {} self._cache = {}

View file

@ -6,6 +6,7 @@ import logging
from django.apps import apps from django.apps import apps
from django.db import models from django.db import models
from django_fsm import FSMField, transition # type: ignore from django_fsm import FSMField, transition # type: ignore
from registrar.models.domain import Domain
from .utility.time_stamped_model import TimeStampedModel from .utility.time_stamped_model import TimeStampedModel
from ..utility.email import send_templated_email, EmailSendingError from ..utility.email import send_templated_email, EmailSendingError
@ -610,9 +611,11 @@ class DomainApplication(TimeStampedModel):
As side effects this will delete the domain and domain_information As side effects this will delete the domain and domain_information
(will cascade), and send an email notification.""" (will cascade), and send an email notification."""
if self.status == self.APPROVED: if self.status == self.APPROVED:
self.approved_domain.delete_request() domain_state = self.approved_domain.state
# Only reject if it exists on EPP
if domain_state != Domain.State.UNKNOWN:
self.approved_domain.deletedInEpp()
self.approved_domain.delete() self.approved_domain.delete()
self.approved_domain = None self.approved_domain = None
@ -638,7 +641,10 @@ class DomainApplication(TimeStampedModel):
and domain_information (will cascade) when they exist.""" and domain_information (will cascade) when they exist."""
if self.status == self.APPROVED: if self.status == self.APPROVED:
self.approved_domain.delete_request() domain_state = self.approved_domain.state
# Only reject if it exists on EPP
if domain_state != Domain.State.UNKNOWN:
self.approved_domain.deletedInEpp()
self.approved_domain.delete() self.approved_domain.delete()
self.approved_domain = None self.approved_domain = None

View file

@ -29,7 +29,8 @@ class PublicContact(TimeStampedModel):
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
"""Save to the registry and also locally in the registrar database.""" """Save to the registry and also locally in the registrar database."""
if hasattr(self, "domain"): skip_epp_save = kwargs.pop("skip_epp_save", False)
if hasattr(self, "domain") and not skip_epp_save:
match self.contact_type: match self.contact_type:
case PublicContact.ContactTypeChoices.REGISTRANT: case PublicContact.ContactTypeChoices.REGISTRANT:
self.domain.registrant_contact = self self.domain.registrant_contact = self
@ -148,6 +149,10 @@ class PublicContact(TimeStampedModel):
pw="thisisnotapassword", pw="thisisnotapassword",
) )
@classmethod
def get_max_id_length(cls):
return cls._meta.get_field("registry_id").max_length
def __str__(self): def __str__(self):
return ( return (
f"{self.name} <{self.email}>" f"{self.name} <{self.email}>"

View file

@ -15,7 +15,7 @@ class UserDomainRole(TimeStampedModel):
elsewhere. elsewhere.
""" """
ADMIN = "admin" ADMIN = "manager"
user = models.ForeignKey( user = models.ForeignKey(
"registrar.User", "registrar.User",

View file

@ -0,0 +1,2 @@
class ContactError(Exception):
...

View file

@ -15,7 +15,9 @@
{% endif %} {% endif %}
<input id="manageDomainSubmitButton" type="submit" value="Manage Domain" name="_edit_domain"> <input id="manageDomainSubmitButton" type="submit" value="Manage Domain" name="_edit_domain">
<input type="submit" value="get status" name="_get_status"> <input type="submit" value="get status" name="_get_status">
<input type="submit" value="EPP Delete Domain" name="_delete_domain"> {% if original.state != original.State.DELETED %}
<input type="submit" value="Delete Domain in Registry" name="_delete_domain">
{% endif %}
</div> </div>
{{ block.super }} {{ block.super }}
{% endblock %} {% endblock %}

View file

@ -21,7 +21,7 @@
<button <button
type="submit" type="submit"
class="usa-button" class="usa-button"
>Add security email</button> >{% if form.security_email.value is None or form.security_email.value == "dotgov@cisa.dhs.gov"%}Add security email{% else %}Save{% endif %}</button>
</form> </form>
{% endblock %} {# domain_content #} {% endblock %} {# domain_content #}

View file

@ -26,6 +26,7 @@ from registrar.models import (
from epplibwrapper import ( from epplibwrapper import (
commands, commands,
common, common,
info,
RegistryError, RegistryError,
ErrorCode, ErrorCode,
) )
@ -555,32 +556,117 @@ class MockEppLib(TestCase):
contacts=..., contacts=...,
hosts=..., hosts=...,
statuses=..., statuses=...,
registrant=...,
): ):
self.auth_info = auth_info self.auth_info = auth_info
self.cr_date = cr_date self.cr_date = cr_date
self.contacts = contacts self.contacts = contacts
self.hosts = hosts self.hosts = hosts
self.statuses = statuses self.statuses = statuses
self.registrant = registrant
def dummyInfoContactResultData(
self,
id,
email,
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
pw="thisisnotapassword",
):
fake = info.InfoContactResultData(
id=id,
postal_info=common.PostalInfo(
name="Registry Customer Service",
addr=common.ContactAddr(
street=["4200 Wilson Blvd."],
city="Arlington",
pc="22201",
cc="US",
sp="VA",
),
org="Cybersecurity and Infrastructure Security Agency",
type="type",
),
voice="+1.8882820870",
fax="+1-212-9876543",
email=email,
auth_info=common.ContactAuthInfo(pw=pw),
roid=...,
statuses=[],
cl_id=...,
cr_id=...,
cr_date=cr_date,
up_id=...,
up_date=...,
tr_date=...,
disclose=...,
vat=...,
ident=...,
notify_email=...,
)
return fake
mockDataInfoDomain = fakedEppObject( mockDataInfoDomain = fakedEppObject(
"fakepw", "fakePw",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35), cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
contacts=[common.DomainContact(contact="123", type="security")], contacts=[
common.DomainContact(
contact="123", type=PublicContact.ContactTypeChoices.SECURITY
)
],
hosts=["fake.host.com"], hosts=["fake.host.com"],
statuses=[ statuses=[
common.Status(state="serverTransferProhibited", description="", lang="en"), common.Status(state="serverTransferProhibited", description="", lang="en"),
common.Status(state="inactive", description="", lang="en"), common.Status(state="inactive", description="", lang="en"),
], ],
) )
mockDataInfoContact = mockDataInfoDomain.dummyInfoContactResultData(
"123", "123@mail.gov", datetime.datetime(2023, 5, 25, 19, 45, 35), "lastPw"
)
InfoDomainWithContacts = fakedEppObject(
"fakepw",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
contacts=[
common.DomainContact(
contact="securityContact",
type=PublicContact.ContactTypeChoices.SECURITY,
),
common.DomainContact(
contact="technicalContact",
type=PublicContact.ContactTypeChoices.TECHNICAL,
),
common.DomainContact(
contact="adminContact",
type=PublicContact.ContactTypeChoices.ADMINISTRATIVE,
),
],
hosts=["fake.host.com"],
statuses=[
common.Status(state="serverTransferProhibited", description="", lang="en"),
common.Status(state="inactive", description="", lang="en"),
],
registrant="regContact",
)
mockSecurityContact = InfoDomainWithContacts.dummyInfoContactResultData(
"securityContact", "security@mail.gov"
)
mockTechnicalContact = InfoDomainWithContacts.dummyInfoContactResultData(
"technicalContact", "tech@mail.gov"
)
mockAdministrativeContact = InfoDomainWithContacts.dummyInfoContactResultData(
"adminContact", "admin@mail.gov"
)
mockRegistrantContact = InfoDomainWithContacts.dummyInfoContactResultData(
"regContact", "registrant@mail.gov"
)
infoDomainNoContact = fakedEppObject( infoDomainNoContact = fakedEppObject(
"security", "security",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35), cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
contacts=[], contacts=[],
hosts=["fake.host.com"], hosts=["fake.host.com"],
) )
mockDataInfoContact = fakedEppObject(
"anotherPw", cr_date=datetime.datetime(2023, 7, 25, 19, 45, 35)
)
mockDataInfoHosts = fakedEppObject( mockDataInfoHosts = fakedEppObject(
"lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35) "lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35)
) )
@ -593,9 +679,28 @@ class MockEppLib(TestCase):
if isinstance(_request, commands.InfoDomain): if isinstance(_request, commands.InfoDomain):
if getattr(_request, "name", None) == "security.gov": if getattr(_request, "name", None) == "security.gov":
return MagicMock(res_data=[self.infoDomainNoContact]) return MagicMock(res_data=[self.infoDomainNoContact])
return MagicMock(res_data=[self.mockDataInfoDomain]) elif getattr(_request, "name", None) == "freeman.gov":
return MagicMock(res_data=[self.InfoDomainWithContacts])
else:
return MagicMock(res_data=[self.mockDataInfoDomain])
elif isinstance(_request, commands.InfoContact): elif isinstance(_request, commands.InfoContact):
return MagicMock(res_data=[self.mockDataInfoContact]) mocked_result: info.InfoContactResultData
# For testing contact types
match getattr(_request, "id", None):
case "securityContact":
mocked_result = self.mockSecurityContact
case "technicalContact":
mocked_result = self.mockTechnicalContact
case "adminContact":
mocked_result = self.mockAdministrativeContact
case "regContact":
mocked_result = self.mockRegistrantContact
case _:
# Default contact return
mocked_result = self.mockDataInfoContact
return MagicMock(res_data=[mocked_result])
elif ( elif (
isinstance(_request, commands.CreateContact) isinstance(_request, commands.CreateContact)
and getattr(_request, "id", None) == "fail" and getattr(_request, "id", None) == "fail"
@ -604,6 +709,16 @@ class MockEppLib(TestCase):
# use this for when a contact is being updated # use this for when a contact is being updated
# sets the second send() to fail # sets the second send() to fail
raise RegistryError(code=ErrorCode.OBJECT_EXISTS) raise RegistryError(code=ErrorCode.OBJECT_EXISTS)
elif (
isinstance(_request, commands.DeleteDomain)
and getattr(_request, "name", None) == "failDelete.gov"
):
name = getattr(_request, "name", None)
fake_nameserver = "ns1.failDelete.gov"
if name in fake_nameserver:
raise RegistryError(
code=ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION
)
return MagicMock(res_data=[self.mockDataInfoHosts]) return MagicMock(res_data=[self.mockDataInfoHosts])
def setUp(self): def setUp(self):

View file

@ -49,6 +49,7 @@ class TestDomainAdmin(MockEppLib):
self.client = Client(HTTP_HOST="localhost:8080") self.client = Client(HTTP_HOST="localhost:8080")
self.superuser = create_superuser() self.superuser = create_superuser()
self.staffuser = create_user() self.staffuser = create_user()
self.factory = RequestFactory()
super().setUp() super().setUp()
def test_place_and_remove_hold(self): def test_place_and_remove_hold(self):
@ -87,6 +88,155 @@ class TestDomainAdmin(MockEppLib):
self.assertContains(response, "Place hold") self.assertContains(response, "Place hold")
self.assertNotContains(response, "Remove hold") self.assertNotContains(response, "Remove hold")
def test_deletion_is_successful(self):
"""
Scenario: Domain deletion is unsuccessful
When the domain is deleted
Then a user-friendly success message is returned for displaying on the web
And `state` is et to `DELETED`
"""
domain = create_ready_domain()
# Put in client hold
domain.place_client_hold()
p = "userpass"
self.client.login(username="staffuser", password=p)
# Ensure everything is displaying correctly
response = self.client.get(
"/admin/registrar/domain/{}/change/".format(domain.pk),
follow=True,
)
self.assertEqual(response.status_code, 200)
self.assertContains(response, domain.name)
self.assertContains(response, "Delete Domain in Registry")
# Test the info dialog
request = self.factory.post(
"/admin/registrar/domain/{}/change/".format(domain.pk),
{"_delete_domain": "Delete Domain in Registry", "name": domain.name},
follow=True,
)
request.user = self.client
with patch("django.contrib.messages.add_message") as mock_add_message:
self.admin.do_delete_domain(request, domain)
mock_add_message.assert_called_once_with(
request,
messages.INFO,
"Domain city.gov has been deleted. Thanks!",
extra_tags="",
fail_silently=False,
)
self.assertEqual(domain.state, Domain.State.DELETED)
def test_deletion_ready_fsm_failure(self):
"""
Scenario: Domain deletion is unsuccessful
When an error is returned from epplibwrapper
Then a user-friendly error message is returned for displaying on the web
And `state` is not set to `DELETED`
"""
domain = create_ready_domain()
p = "userpass"
self.client.login(username="staffuser", password=p)
# Ensure everything is displaying correctly
response = self.client.get(
"/admin/registrar/domain/{}/change/".format(domain.pk),
follow=True,
)
self.assertEqual(response.status_code, 200)
self.assertContains(response, domain.name)
self.assertContains(response, "Delete Domain in Registry")
# Test the error
request = self.factory.post(
"/admin/registrar/domain/{}/change/".format(domain.pk),
{"_delete_domain": "Delete Domain in Registry", "name": domain.name},
follow=True,
)
request.user = self.client
with patch("django.contrib.messages.add_message") as mock_add_message:
self.admin.do_delete_domain(request, domain)
mock_add_message.assert_called_once_with(
request,
messages.ERROR,
"Error deleting this Domain: "
"Can't switch from state 'ready' to 'deleted'"
", must be either 'dns_needed' or 'on_hold'",
extra_tags="",
fail_silently=False,
)
self.assertEqual(domain.state, Domain.State.READY)
def test_analyst_deletes_domain_idempotent(self):
"""
Scenario: Analyst tries to delete an already deleted domain
Given `state` is already `DELETED`
When `domain.deletedInEpp()` is called
Then `commands.DeleteDomain` is sent to the registry
And Domain returns normally without an error dialog
"""
domain = create_ready_domain()
# Put in client hold
domain.place_client_hold()
p = "userpass"
self.client.login(username="staffuser", password=p)
# Ensure everything is displaying correctly
response = self.client.get(
"/admin/registrar/domain/{}/change/".format(domain.pk),
follow=True,
)
self.assertEqual(response.status_code, 200)
self.assertContains(response, domain.name)
self.assertContains(response, "Delete Domain in Registry")
# Test the info dialog
request = self.factory.post(
"/admin/registrar/domain/{}/change/".format(domain.pk),
{"_delete_domain": "Delete Domain in Registry", "name": domain.name},
follow=True,
)
request.user = self.client
# Delete it once
with patch("django.contrib.messages.add_message") as mock_add_message:
self.admin.do_delete_domain(request, domain)
mock_add_message.assert_called_once_with(
request,
messages.INFO,
"Domain city.gov has been deleted. Thanks!",
extra_tags="",
fail_silently=False,
)
self.assertEqual(domain.state, Domain.State.DELETED)
# Try to delete it again
# Test the info dialog
request = self.factory.post(
"/admin/registrar/domain/{}/change/".format(domain.pk),
{"_delete_domain": "Delete Domain in Registry", "name": domain.name},
follow=True,
)
request.user = self.client
with patch("django.contrib.messages.add_message") as mock_add_message:
self.admin.do_delete_domain(request, domain)
mock_add_message.assert_called_once_with(
request,
messages.INFO,
"This domain is already deleted",
extra_tags="",
fail_silently=False,
)
self.assertEqual(domain.state, Domain.State.DELETED)
@skip("Waiting on epp lib to implement") @skip("Waiting on epp lib to implement")
def test_place_and_remove_hold_epp(self): def test_place_and_remove_hold_epp(self):
raise raise
@ -138,8 +288,9 @@ class TestDomainApplicationAdminForm(TestCase):
) )
class TestDomainApplicationAdmin(TestCase): class TestDomainApplicationAdmin(MockEppLib):
def setUp(self): def setUp(self):
super().setUp()
self.site = AdminSite() self.site = AdminSite()
self.factory = RequestFactory() self.factory = RequestFactory()
self.admin = DomainApplicationAdmin( self.admin = DomainApplicationAdmin(
@ -690,6 +841,7 @@ class TestDomainApplicationAdmin(TestCase):
domain_information.refresh_from_db() domain_information.refresh_from_db()
def tearDown(self): def tearDown(self):
super().tearDown()
Domain.objects.all().delete() Domain.objects.all().delete()
DomainInformation.objects.all().delete() DomainInformation.objects.all().delete()
DomainApplication.objects.all().delete() DomainApplication.objects.all().delete()

View file

@ -3,9 +3,10 @@ Feature being tested: Registry Integration
This file tests the various ways in which the registrar interacts with the registry. This file tests the various ways in which the registrar interacts with the registry.
""" """
from typing import Mapping, Any
from django.test import TestCase from django.test import TestCase
from django.db.utils import IntegrityError from django.db.utils import IntegrityError
from unittest.mock import patch, call from unittest.mock import MagicMock, patch, call
import datetime import datetime
from registrar.models import Domain from registrar.models import Domain
@ -16,22 +17,32 @@ from registrar.models.draft_domain import DraftDomain
from registrar.models.public_contact import PublicContact from registrar.models.public_contact import PublicContact
from registrar.models.user import User from registrar.models.user import User
from .common import MockEppLib from .common import MockEppLib
from django_fsm import TransitionNotAllowed # type: ignore
from epplibwrapper import ( from epplibwrapper import (
commands, commands,
common, common,
extensions,
responses,
RegistryError, RegistryError,
ErrorCode, ErrorCode,
) )
import logging
logger = logging.getLogger(__name__)
class TestDomainCache(MockEppLib): class TestDomainCache(MockEppLib):
def tearDown(self):
PublicContact.objects.all().delete()
Domain.objects.all().delete()
super().tearDown()
def test_cache_sets_resets(self): def test_cache_sets_resets(self):
"""Cache should be set on getter and reset on setter calls""" """Cache should be set on getter and reset on setter calls"""
domain, _ = Domain.objects.get_or_create(name="igorville.gov") domain, _ = Domain.objects.get_or_create(name="igorville.gov")
# trigger getter # trigger getter
_ = domain.creation_date _ = domain.creation_date
domain._get_property("contacts")
# getter should set the domain cache with a InfoDomain object # getter should set the domain cache with a InfoDomain object
# (see InfoDomainResult) # (see InfoDomainResult)
self.assertEquals(domain._cache["auth_info"], self.mockDataInfoDomain.auth_info) self.assertEquals(domain._cache["auth_info"], self.mockDataInfoDomain.auth_info)
@ -51,8 +62,6 @@ class TestDomainCache(MockEppLib):
commands.InfoDomain(name="igorville.gov", auth_info=None), commands.InfoDomain(name="igorville.gov", auth_info=None),
cleaned=True, cleaned=True,
), ),
call(commands.InfoContact(id="123", auth_info=None), cleaned=True),
call(commands.InfoHost(name="fake.host.com"), cleaned=True),
], ],
any_order=False, # Ensure calls are in the specified order any_order=False, # Ensure calls are in the specified order
) )
@ -74,8 +83,6 @@ class TestDomainCache(MockEppLib):
call( call(
commands.InfoDomain(name="igorville.gov", auth_info=None), cleaned=True commands.InfoDomain(name="igorville.gov", auth_info=None), cleaned=True
), ),
call(commands.InfoContact(id="123", auth_info=None), cleaned=True),
call(commands.InfoHost(name="fake.host.com"), cleaned=True),
] ]
self.mockedSendFunction.assert_has_calls(expectedCalls) self.mockedSendFunction.assert_has_calls(expectedCalls)
@ -83,13 +90,16 @@ class TestDomainCache(MockEppLib):
def test_cache_nested_elements(self): def test_cache_nested_elements(self):
"""Cache works correctly with the nested objects cache and hosts""" """Cache works correctly with the nested objects cache and hosts"""
domain, _ = Domain.objects.get_or_create(name="igorville.gov") domain, _ = Domain.objects.get_or_create(name="igorville.gov")
# The contact list will initially contain objects of type 'DomainContact'
# the cached contacts and hosts should be dictionaries of what is passed to them # this is then transformed into PublicContact, and cache should NOT
# hold onto the DomainContact object
expectedUnfurledContactsList = [
common.DomainContact(contact="123", type="security"),
]
expectedContactsDict = { expectedContactsDict = {
"id": self.mockDataInfoDomain.contacts[0].contact, PublicContact.ContactTypeChoices.ADMINISTRATIVE: None,
"type": self.mockDataInfoDomain.contacts[0].type, PublicContact.ContactTypeChoices.SECURITY: "123",
"auth_info": self.mockDataInfoContact.auth_info, PublicContact.ContactTypeChoices.TECHNICAL: None,
"cr_date": self.mockDataInfoContact.cr_date,
} }
expectedHostsDict = { expectedHostsDict = {
"name": self.mockDataInfoDomain.hosts[0], "name": self.mockDataInfoDomain.hosts[0],
@ -105,15 +115,83 @@ class TestDomainCache(MockEppLib):
# check contacts # check contacts
self.assertEqual(domain._cache["_contacts"], self.mockDataInfoDomain.contacts) self.assertEqual(domain._cache["_contacts"], self.mockDataInfoDomain.contacts)
self.assertEqual(domain._cache["contacts"], [expectedContactsDict]) # The contact list should not contain what is sent by the registry by default,
# as _fetch_cache will transform the type to PublicContact
self.assertNotEqual(domain._cache["contacts"], expectedUnfurledContactsList)
self.assertEqual(domain._cache["contacts"], expectedContactsDict)
# get and check hosts is set correctly # get and check hosts is set correctly
domain._get_property("hosts") domain._get_property("hosts")
self.assertEqual(domain._cache["hosts"], [expectedHostsDict]) self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
self.assertEqual(domain._cache["contacts"], expectedContactsDict)
# invalidate cache
domain._cache = {}
def tearDown(self) -> None: # get host
Domain.objects.all().delete() domain._get_property("hosts")
super().tearDown() self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
# get contacts
domain._get_property("contacts")
self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
self.assertEqual(domain._cache["contacts"], expectedContactsDict)
def test_map_epp_contact_to_public_contact(self):
# Tests that the mapper is working how we expect
domain, _ = Domain.objects.get_or_create(name="registry.gov")
security = PublicContact.ContactTypeChoices.SECURITY
mapped = domain.map_epp_contact_to_public_contact(
self.mockDataInfoContact,
self.mockDataInfoContact.id,
security,
)
expected_contact = PublicContact(
domain=domain,
contact_type=security,
registry_id="123",
email="123@mail.gov",
voice="+1.8882820870",
fax="+1-212-9876543",
pw="lastPw",
name="Registry Customer Service",
org="Cybersecurity and Infrastructure Security Agency",
city="Arlington",
pc="22201",
cc="US",
sp="VA",
street1="4200 Wilson Blvd.",
)
# Test purposes only, since we're comparing
# two duplicate objects. We would expect
# these not to have the same state.
expected_contact._state = mapped._state
# Mapped object is what we expect
self.assertEqual(mapped.__dict__, expected_contact.__dict__)
# The mapped object should correctly translate to a DB
# object. If not, something else went wrong.
db_object = domain._get_or_create_public_contact(mapped)
in_db = PublicContact.objects.filter(
registry_id=domain.security_contact.registry_id,
contact_type=security,
).get()
# DB Object is the same as the mapped object
self.assertEqual(db_object, in_db)
domain.security_contact = in_db
# Trigger the getter
_ = domain.security_contact
# Check to see that changes made
# to DB objects persist in cache correctly
in_db.email = "123test@mail.gov"
in_db.save()
cached_contact = domain._cache["contacts"].get(security)
self.assertEqual(cached_contact, in_db.registry_id)
self.assertEqual(domain.security_contact.email, "123test@mail.gov")
class TestDomainCreation(MockEppLib): class TestDomainCreation(MockEppLib):
@ -170,8 +248,6 @@ class TestDomainCreation(MockEppLib):
commands.InfoDomain(name="beef-tongue.gov", auth_info=None), commands.InfoDomain(name="beef-tongue.gov", auth_info=None),
cleaned=True, cleaned=True,
), ),
call(commands.InfoContact(id="123", auth_info=None), cleaned=True),
call(commands.InfoHost(name="fake.host.com"), cleaned=True),
], ],
any_order=False, # Ensure calls are in the specified order any_order=False, # Ensure calls are in the specified order
) )
@ -199,7 +275,10 @@ class TestDomainCreation(MockEppLib):
def tearDown(self) -> None: def tearDown(self) -> None:
DomainInformation.objects.all().delete() DomainInformation.objects.all().delete()
DomainApplication.objects.all().delete() DomainApplication.objects.all().delete()
PublicContact.objects.all().delete()
Domain.objects.all().delete() Domain.objects.all().delete()
User.objects.all().delete()
DraftDomain.objects.all().delete()
super().tearDown() super().tearDown()
@ -213,7 +292,6 @@ class TestDomainStatuses(MockEppLib):
_ = domain.statuses _ = domain.statuses
status_list = [status.state for status in self.mockDataInfoDomain.statuses] status_list = [status.state for status in self.mockDataInfoDomain.statuses]
self.assertEquals(domain._cache["statuses"], status_list) self.assertEquals(domain._cache["statuses"], status_list)
# Called in _fetch_cache # Called in _fetch_cache
self.mockedSendFunction.assert_has_calls( self.mockedSendFunction.assert_has_calls(
[ [
@ -221,8 +299,6 @@ class TestDomainStatuses(MockEppLib):
commands.InfoDomain(name="chicken-liver.gov", auth_info=None), commands.InfoDomain(name="chicken-liver.gov", auth_info=None),
cleaned=True, cleaned=True,
), ),
call(commands.InfoContact(id="123", auth_info=None), cleaned=True),
call(commands.InfoHost(name="fake.host.com"), cleaned=True),
], ],
any_order=False, # Ensure calls are in the specified order any_order=False, # Ensure calls are in the specified order
) )
@ -259,10 +335,119 @@ class TestDomainStatuses(MockEppLib):
raise raise
def tearDown(self) -> None: def tearDown(self) -> None:
PublicContact.objects.all().delete()
Domain.objects.all().delete() Domain.objects.all().delete()
super().tearDown() super().tearDown()
class TestDomainAvailable(MockEppLib):
"""Test Domain.available"""
# No SetUp or tearDown necessary for these tests
def test_domain_available(self):
"""
Scenario: Testing whether an available domain is available
Should return True
Mock response to mimic EPP Response
Validate CheckDomain command is called
Validate response given mock
"""
def side_effect(_request, cleaned):
return MagicMock(
res_data=[
responses.check.CheckDomainResultData(
name="available.gov", avail=True, reason=None
)
],
)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
available = Domain.available("available.gov")
mocked_send.assert_has_calls(
[
call(
commands.CheckDomain(
["available.gov"],
),
cleaned=True,
)
]
)
self.assertTrue(available)
patcher.stop()
def test_domain_unavailable(self):
"""
Scenario: Testing whether an unavailable domain is available
Should return False
Mock response to mimic EPP Response
Validate CheckDomain command is called
Validate response given mock
"""
def side_effect(_request, cleaned):
return MagicMock(
res_data=[
responses.check.CheckDomainResultData(
name="unavailable.gov", avail=False, reason="In Use"
)
],
)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
available = Domain.available("unavailable.gov")
mocked_send.assert_has_calls(
[
call(
commands.CheckDomain(
["unavailable.gov"],
),
cleaned=True,
)
]
)
self.assertFalse(available)
patcher.stop()
def test_domain_available_with_value_error(self):
"""
Scenario: Testing whether an invalid domain is available
Should throw ValueError
Validate ValueError is raised
"""
with self.assertRaises(ValueError):
Domain.available("invalid-string")
def test_domain_available_unsuccessful(self):
"""
Scenario: Testing behavior when registry raises a RegistryError
Validate RegistryError is raised
"""
def side_effect(_request, cleaned):
raise RegistryError(code=ErrorCode.COMMAND_SYNTAX_ERROR)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
with self.assertRaises(RegistryError):
Domain.available("raises-error.gov")
patcher.stop()
class TestRegistrantContacts(MockEppLib): class TestRegistrantContacts(MockEppLib):
"""Rule: Registrants may modify their WHOIS data""" """Rule: Registrants may modify their WHOIS data"""
@ -273,12 +458,17 @@ class TestRegistrantContacts(MockEppLib):
And the registrant is the admin on a domain And the registrant is the admin on a domain
""" """
super().setUp() super().setUp()
# Creates a domain with no contact associated to it
self.domain, _ = Domain.objects.get_or_create(name="security.gov") self.domain, _ = Domain.objects.get_or_create(name="security.gov")
# Creates a domain with an associated contact
self.domain_contact, _ = Domain.objects.get_or_create(name="freeman.gov")
def tearDown(self): def tearDown(self):
super().tearDown() super().tearDown()
# self.contactMailingAddressPatch.stop() self.domain._invalidate_cache()
# self.createContactPatch.stop() self.domain_contact._invalidate_cache()
PublicContact.objects.all().delete()
Domain.objects.all().delete()
def test_no_security_email(self): def test_no_security_email(self):
""" """
@ -524,6 +714,133 @@ class TestRegistrantContacts(MockEppLib):
""" """
raise raise
def test_contact_getter_security(self):
security = PublicContact.ContactTypeChoices.SECURITY
# Create prexisting object
expected_contact = self.domain.map_epp_contact_to_public_contact(
self.mockSecurityContact,
contact_id="securityContact",
contact_type=security,
)
# Checks if we grabbed the correct PublicContact
self.assertEqual(
self.domain_contact.security_contact.email, expected_contact.email
)
expected_contact_db = PublicContact.objects.filter(
registry_id=self.domain_contact.security_contact.registry_id,
contact_type=security,
).get()
self.assertEqual(self.domain_contact.security_contact, expected_contact_db)
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoContact(id="securityContact", auth_info=None),
cleaned=True,
),
]
)
# Checks if we are receiving the cache we expect
cache = self.domain_contact._cache["contacts"]
self.assertEqual(cache.get(security), "securityContact")
def test_contact_getter_technical(self):
technical = PublicContact.ContactTypeChoices.TECHNICAL
expected_contact = self.domain.map_epp_contact_to_public_contact(
self.mockTechnicalContact,
contact_id="technicalContact",
contact_type=technical,
)
self.assertEqual(
self.domain_contact.technical_contact.email, expected_contact.email
)
# Checks if we grab the correct PublicContact
expected_contact_db = PublicContact.objects.filter(
registry_id=self.domain_contact.technical_contact.registry_id,
contact_type=technical,
).get()
# Checks if we grab the correct PublicContact
self.assertEqual(self.domain_contact.technical_contact, expected_contact_db)
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoContact(id="technicalContact", auth_info=None),
cleaned=True,
),
]
)
# Checks if we are receiving the cache we expect
cache = self.domain_contact._cache["contacts"]
self.assertEqual(cache.get(technical), "technicalContact")
def test_contact_getter_administrative(self):
administrative = PublicContact.ContactTypeChoices.ADMINISTRATIVE
expected_contact = self.domain.map_epp_contact_to_public_contact(
self.mockAdministrativeContact,
contact_id="adminContact",
contact_type=administrative,
)
self.assertEqual(
self.domain_contact.administrative_contact.email, expected_contact.email
)
expected_contact_db = PublicContact.objects.filter(
registry_id=self.domain_contact.administrative_contact.registry_id,
contact_type=administrative,
).get()
# Checks if we grab the correct PublicContact
self.assertEqual(
self.domain_contact.administrative_contact, expected_contact_db
)
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoContact(id="adminContact", auth_info=None),
cleaned=True,
),
]
)
# Checks if we are receiving the cache we expect
cache = self.domain_contact._cache["contacts"]
self.assertEqual(cache.get(administrative), "adminContact")
def test_contact_getter_registrant(self):
expected_contact = self.domain.map_epp_contact_to_public_contact(
self.mockRegistrantContact,
contact_id="regContact",
contact_type=PublicContact.ContactTypeChoices.REGISTRANT,
)
self.assertEqual(
self.domain_contact.registrant_contact.email, expected_contact.email
)
expected_contact_db = PublicContact.objects.filter(
registry_id=self.domain_contact.registrant_contact.registry_id,
contact_type=PublicContact.ContactTypeChoices.REGISTRANT,
).get()
# Checks if we grab the correct PublicContact
self.assertEqual(self.domain_contact.registrant_contact, expected_contact_db)
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoContact(id="regContact", auth_info=None),
cleaned=True,
),
]
)
# Checks if we are receiving the cache we expect.
self.assertEqual(self.domain_contact._cache["registrant"], expected_contact_db)
class TestRegistrantNameservers(TestCase): class TestRegistrantNameservers(TestCase):
"""Rule: Registrants may modify their nameservers""" """Rule: Registrants may modify their nameservers"""
@ -664,44 +981,372 @@ class TestRegistrantNameservers(TestCase):
raise raise
class TestRegistrantDNSSEC(TestCase): class TestRegistrantDNSSEC(MockEppLib):
"""Rule: Registrants may modify their secure DNS data""" """Rule: Registrants may modify their secure DNS data"""
# helper function to create UpdateDomainDNSSECExtention object for verification
def createUpdateExtension(self, dnssecdata: extensions.DNSSECExtension):
return commands.UpdateDomainDNSSECExtension(
maxSigLife=dnssecdata.maxSigLife,
dsData=dnssecdata.dsData,
keyData=dnssecdata.keyData,
remDsData=None,
remKeyData=None,
remAllDsKeyData=True,
)
def setUp(self): def setUp(self):
""" """
Background: Background:
Given the registrant is logged in Given the analyst is logged in
And the registrant is the admin on a domain And a domain exists in the registry
""" """
pass super().setUp()
# for the tests, need a domain in the unknown state
self.domain, _ = Domain.objects.get_or_create(name="fake.gov")
self.addDsData1 = {
"keyTag": 1234,
"alg": 3,
"digestType": 1,
"digest": "ec0bdd990b39feead889f0ba613db4adec0bdd99",
}
self.addDsData2 = {
"keyTag": 2345,
"alg": 3,
"digestType": 1,
"digest": "ec0bdd990b39feead889f0ba613db4adecb4adec",
}
self.keyDataDict = {
"flags": 257,
"protocol": 3,
"alg": 1,
"pubKey": "AQPJ////4Q==",
}
self.dnssecExtensionWithDsData: Mapping[str, Any] = {
"dsData": [common.DSData(**self.addDsData1)]
}
self.dnssecExtensionWithMultDsData: Mapping[str, Any] = {
"dsData": [
common.DSData(**self.addDsData1),
common.DSData(**self.addDsData2),
],
}
self.dnssecExtensionWithKeyData: Mapping[str, Any] = {
"maxSigLife": 3215,
"keyData": [common.DNSSECKeyData(**self.keyDataDict)],
}
@skip("not implemented yet") def tearDown(self):
def test_user_adds_dns_data(self): Domain.objects.all().delete()
super().tearDown()
def test_user_adds_dnssec_data(self):
""" """
Scenario: Registrant adds DNS data Scenario: Registrant adds DNSSEC data.
Verify that both the setter and getter are functioning properly
This test verifies:
1 - setter calls UpdateDomain command
2 - setter adds the UpdateDNSSECExtension extension to the command
3 - setter causes the getter to call info domain on next get from cache
4 - getter properly parses dnssecdata from InfoDomain response and sets to cache
""" """
raise
@skip("not implemented yet") # make sure to stop any other patcher so there are no conflicts
self.mockSendPatch.stop()
def side_effect(_request, cleaned):
return MagicMock(
res_data=[self.mockDataInfoDomain],
extensions=[
extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
],
)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
self.domain.dnssecdata = self.dnssecExtensionWithDsData
# get the DNS SEC extension added to the UpdateDomain command and
# verify that it is properly sent
# args[0] is the _request sent to registry
args, _ = mocked_send.call_args
# assert that the extension matches
self.assertEquals(
args[0].extensions[0],
self.createUpdateExtension(
extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
),
)
# test that the dnssecdata getter is functioning properly
dnssecdata_get = self.domain.dnssecdata
mocked_send.assert_has_calls(
[
call(
commands.UpdateDomain(
name="fake.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call(
commands.InfoDomain(
name="fake.gov",
),
cleaned=True,
),
]
)
self.assertEquals(
dnssecdata_get.dsData, self.dnssecExtensionWithDsData["dsData"]
)
patcher.stop()
def test_dnssec_is_idempotent(self): def test_dnssec_is_idempotent(self):
""" """
Scenario: Registrant adds DNS data twice, due to a UI glitch Scenario: Registrant adds DNS data twice, due to a UI glitch
"""
# implementation note: this requires seeing what happens when these are actually # implementation note: this requires seeing what happens when these are actually
# sent like this, and then implementing appropriate mocks for any errors the # sent like this, and then implementing appropriate mocks for any errors the
# registry normally sends in this case # registry normally sends in this case
raise
@skip("not implemented yet") This test verifies:
1 - UpdateDomain command called twice
2 - setter causes the getter to call info domain on next get from cache
3 - getter properly parses dnssecdata from InfoDomain response and sets to cache
"""
# make sure to stop any other patcher so there are no conflicts
self.mockSendPatch.stop()
def side_effect(_request, cleaned):
return MagicMock(
res_data=[self.mockDataInfoDomain],
extensions=[
extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
],
)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
# set the dnssecdata once
self.domain.dnssecdata = self.dnssecExtensionWithDsData
# set the dnssecdata again
self.domain.dnssecdata = self.dnssecExtensionWithDsData
# test that the dnssecdata getter is functioning properly
dnssecdata_get = self.domain.dnssecdata
mocked_send.assert_has_calls(
[
call(
commands.UpdateDomain(
name="fake.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call(
commands.UpdateDomain(
name="fake.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call(
commands.InfoDomain(
name="fake.gov",
),
cleaned=True,
),
]
)
self.assertEquals(
dnssecdata_get.dsData, self.dnssecExtensionWithDsData["dsData"]
)
patcher.stop()
def test_user_adds_dnssec_data_multiple_dsdata(self):
"""
Scenario: Registrant adds DNSSEC data with multiple DSData.
Verify that both the setter and getter are functioning properly
This test verifies:
1 - setter calls UpdateDomain command
2 - setter adds the UpdateDNSSECExtension extension to the command
3 - setter causes the getter to call info domain on next get from cache
4 - getter properly parses dnssecdata from InfoDomain response and sets to cache
"""
# make sure to stop any other patcher so there are no conflicts
self.mockSendPatch.stop()
def side_effect(_request, cleaned):
return MagicMock(
res_data=[self.mockDataInfoDomain],
extensions=[
extensions.DNSSECExtension(**self.dnssecExtensionWithMultDsData)
],
)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
self.domain.dnssecdata = self.dnssecExtensionWithMultDsData
# get the DNS SEC extension added to the UpdateDomain command
# and verify that it is properly sent
# args[0] is the _request sent to registry
args, _ = mocked_send.call_args
# assert that the extension matches
self.assertEquals(
args[0].extensions[0],
self.createUpdateExtension(
extensions.DNSSECExtension(**self.dnssecExtensionWithMultDsData)
),
)
# test that the dnssecdata getter is functioning properly
dnssecdata_get = self.domain.dnssecdata
mocked_send.assert_has_calls(
[
call(
commands.UpdateDomain(
name="fake.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call(
commands.InfoDomain(
name="fake.gov",
),
cleaned=True,
),
]
)
self.assertEquals(
dnssecdata_get.dsData, self.dnssecExtensionWithMultDsData["dsData"]
)
patcher.stop()
def test_user_adds_dnssec_keydata(self):
"""
Scenario: Registrant adds DNSSEC data.
Verify that both the setter and getter are functioning properly
This test verifies:
1 - setter calls UpdateDomain command
2 - setter adds the UpdateDNSSECExtension extension to the command
3 - setter causes the getter to call info domain on next get from cache
4 - getter properly parses dnssecdata from InfoDomain response and sets to cache
"""
# make sure to stop any other patcher so there are no conflicts
self.mockSendPatch.stop()
def side_effect(_request, cleaned):
return MagicMock(
res_data=[self.mockDataInfoDomain],
extensions=[
extensions.DNSSECExtension(**self.dnssecExtensionWithKeyData)
],
)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
self.domain.dnssecdata = self.dnssecExtensionWithKeyData
# get the DNS SEC extension added to the UpdateDomain command
# and verify that it is properly sent
# args[0] is the _request sent to registry
args, _ = mocked_send.call_args
# assert that the extension matches
self.assertEquals(
args[0].extensions[0],
self.createUpdateExtension(
extensions.DNSSECExtension(**self.dnssecExtensionWithKeyData)
),
)
# test that the dnssecdata getter is functioning properly
dnssecdata_get = self.domain.dnssecdata
mocked_send.assert_has_calls(
[
call(
commands.UpdateDomain(
name="fake.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call(
commands.InfoDomain(
name="fake.gov",
),
cleaned=True,
),
]
)
self.assertEquals(
dnssecdata_get.keyData, self.dnssecExtensionWithKeyData["keyData"]
)
patcher.stop()
def test_update_is_unsuccessful(self): def test_update_is_unsuccessful(self):
""" """
Scenario: An update to the dns data is unsuccessful Scenario: An update to the dns data is unsuccessful
When an error is returned from epplibwrapper When an error is returned from epplibwrapper
Then a user-friendly error message is returned for displaying on the web Then a user-friendly error message is returned for displaying on the web
""" """
raise
# make sure to stop any other patcher so there are no conflicts
self.mockSendPatch.stop()
def side_effect(_request, cleaned):
raise RegistryError(code=ErrorCode.PARAMETER_VALUE_RANGE_ERROR)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
# if RegistryError is raised, view formats user-friendly
# error message if error is_client_error, is_session_error, or
# is_server_error; so test for those conditions
with self.assertRaises(RegistryError) as err:
self.domain.dnssecdata = self.dnssecExtensionWithDsData
self.assertTrue(
err.is_client_error() or err.is_session_error() or err.is_server_error()
)
patcher.stop()
class TestAnalystClientHold(MockEppLib): class TestAnalystClientHold(MockEppLib):
@ -940,7 +1585,7 @@ class TestAnalystLock(TestCase):
raise raise
class TestAnalystDelete(TestCase): class TestAnalystDelete(MockEppLib):
"""Rule: Analysts may delete a domain""" """Rule: Analysts may delete a domain"""
def setUp(self): def setUp(self):
@ -949,35 +1594,99 @@ class TestAnalystDelete(TestCase):
Given the analyst is logged in Given the analyst is logged in
And a domain exists in the registry And a domain exists in the registry
""" """
pass super().setUp()
self.domain, _ = Domain.objects.get_or_create(
name="fake.gov", state=Domain.State.READY
)
self.domain_on_hold, _ = Domain.objects.get_or_create(
name="fake-on-hold.gov", state=Domain.State.ON_HOLD
)
def tearDown(self):
Domain.objects.all().delete()
super().tearDown()
@skip("not implemented yet")
def test_analyst_deletes_domain(self): def test_analyst_deletes_domain(self):
""" """
Scenario: Analyst permanently deletes a domain Scenario: Analyst permanently deletes a domain
When `domain.delete()` is called When `domain.deletedInEpp()` is called
Then `commands.DeleteDomain` is sent to the registry Then `commands.DeleteDomain` is sent to the registry
And `state` is set to `DELETED` And `state` is set to `DELETED`
""" """
raise # Put the domain in client hold
self.domain.place_client_hold()
# Delete it...
self.domain.deletedInEpp()
self.mockedSendFunction.assert_has_calls(
[
call(
commands.DeleteDomain(name="fake.gov"),
cleaned=True,
)
]
)
@skip("not implemented yet") # Domain itself should not be deleted
def test_analyst_deletes_domain_idempotent(self): self.assertNotEqual(self.domain, None)
"""
Scenario: Analyst tries to delete an already deleted domain # Domain should have the right state
Given `state` is already `DELETED` self.assertEqual(self.domain.state, Domain.State.DELETED)
When `domain.delete()` is called
Then `commands.DeleteDomain` is sent to the registry # Cache should be invalidated
And Domain returns normally (without error) self.assertEqual(self.domain._cache, {})
"""
raise
@skip("not implemented yet")
def test_deletion_is_unsuccessful(self): def test_deletion_is_unsuccessful(self):
""" """
Scenario: Domain deletion is unsuccessful Scenario: Domain deletion is unsuccessful
When an error is returned from epplibwrapper When a subdomain exists
Then a user-friendly error message is returned for displaying on the web Then a client error is returned of code 2305
And `state` is not set to `DELETED` And `state` is not set to `DELETED`
""" """
raise # Desired domain
domain, _ = Domain.objects.get_or_create(
name="failDelete.gov", state=Domain.State.ON_HOLD
)
# Put the domain in client hold
domain.place_client_hold()
# Delete it
with self.assertRaises(RegistryError) as err:
domain.deletedInEpp()
self.assertTrue(
err.is_client_error()
and err.code == ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION
)
self.mockedSendFunction.assert_has_calls(
[
call(
commands.DeleteDomain(name="failDelete.gov"),
cleaned=True,
)
]
)
# Domain itself should not be deleted
self.assertNotEqual(domain, None)
# State should not have changed
self.assertEqual(domain.state, Domain.State.ON_HOLD)
def test_deletion_ready_fsm_failure(self):
"""
Scenario: Domain deletion is unsuccessful due to FSM rules
Given state is 'ready'
When `domain.deletedInEpp()` is called
and domain is of `state` is `READY`
Then an FSM error is returned
And `state` is not set to `DELETED`
"""
self.assertEqual(self.domain.state, Domain.State.READY)
with self.assertRaises(TransitionNotAllowed) as err:
self.domain.deletedInEpp()
self.assertTrue(
err.is_client_error()
and err.code == ErrorCode.OBJECT_STATUS_PROHIBITS_OPERATION
)
# Domain should not be deleted
self.assertNotEqual(self.domain, None)
# Domain should have the right state
self.assertEqual(self.domain.state, Domain.State.READY)

View file

@ -1,11 +1,11 @@
from unittest import skip from unittest import skip
from unittest.mock import MagicMock, ANY from unittest.mock import MagicMock, ANY, patch
from django.conf import settings from django.conf import settings
from django.test import Client, TestCase from django.test import Client, TestCase
from django.urls import reverse from django.urls import reverse
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
from .common import completed_application from .common import MockEppLib, completed_application # type: ignore
from django_webtest import WebTest # type: ignore from django_webtest import WebTest # type: ignore
import boto3_mocking # type: ignore import boto3_mocking # type: ignore
@ -25,7 +25,6 @@ from registrar.models import (
from registrar.views.application import ApplicationWizard, Step from registrar.views.application import ApplicationWizard, Step
from .common import less_console_noise from .common import less_console_noise
from .common import MockEppLib
class TestViews(TestCase): class TestViews(TestCase):
@ -1133,7 +1132,7 @@ class TestDomainPermissions(TestWithDomainPermissions):
self.assertEqual(response.status_code, 403) self.assertEqual(response.status_code, 403)
class TestDomainDetail(TestWithDomainPermissions, WebTest): class TestDomainDetail(TestWithDomainPermissions, WebTest, MockEppLib):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
self.app.set_user(self.user.username) self.app.set_user(self.user.username)
@ -1426,6 +1425,40 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest):
) )
self.assertContains(page, "Testy") self.assertContains(page, "Testy")
def test_domain_security_email_existing_security_contact(self):
"""Can load domain's security email page."""
self.mockSendPatch = patch("registrar.models.domain.registry.send")
self.mockedSendFunction = self.mockSendPatch.start()
self.mockedSendFunction.side_effect = self.mockSend
domain_contact, _ = Domain.objects.get_or_create(name="freeman.gov")
# Add current user to this domain
_ = UserDomainRole(user=self.user, domain=domain_contact, role="admin").save()
page = self.client.get(
reverse("domain-security-email", kwargs={"pk": domain_contact.id})
)
# Loads correctly
self.assertContains(page, "Domain security email")
self.assertContains(page, "security@mail.gov")
self.mockSendPatch.stop()
def test_domain_security_email_no_security_contact(self):
"""Loads a domain with no defined security email.
We should not show the default."""
self.mockSendPatch = patch("registrar.models.domain.registry.send")
self.mockedSendFunction = self.mockSendPatch.start()
self.mockedSendFunction.side_effect = self.mockSend
page = self.client.get(
reverse("domain-security-email", kwargs={"pk": self.domain.id})
)
# Loads correctly
self.assertContains(page, "Domain security email")
self.assertNotContains(page, "dotgov@cisa.dhs.gov")
self.mockSendPatch.stop()
def test_domain_security_email(self): def test_domain_security_email(self):
"""Can load domain's security email page.""" """Can load domain's security email page."""
page = self.client.get( page = self.client.get(
@ -1433,10 +1466,8 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest):
) )
self.assertContains(page, "Domain security email") self.assertContains(page, "Domain security email")
@skip("Ticket 912 needs to fix this one")
def test_domain_security_email_form(self): def test_domain_security_email_form(self):
"""Adding a security email works. """Adding a security email works.
Uses self.app WebTest because we need to interact with forms. Uses self.app WebTest because we need to interact with forms.
""" """
security_email_page = self.app.get( security_email_page = self.app.get(
@ -1456,7 +1487,7 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest):
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_page = result.follow() success_page = result.follow()
self.assertContains( self.assertContains(
success_page, "The security email for this domain have been updated" success_page, "The security email for this domain has been updated"
) )
def test_domain_overview_blocked_for_ineligible_user(self): def test_domain_overview_blocked_for_ineligible_user(self):

View file

@ -259,7 +259,11 @@ class DomainSecurityEmailView(DomainPermissionView, FormMixin):
"""The initial value for the form.""" """The initial value for the form."""
domain = self.get_object() domain = self.get_object()
initial = super().get_initial() initial = super().get_initial()
initial["security_email"] = domain.security_contact.email security_contact = domain.security_contact
if security_contact is None or security_contact.email == "dotgov@cisa.dhs.gov":
initial["security_email"] = None
return initial
initial["security_email"] = security_contact.email
return initial return initial
def get_success_url(self): def get_success_url(self):
@ -288,7 +292,7 @@ class DomainSecurityEmailView(DomainPermissionView, FormMixin):
contact.save() contact.save()
messages.success( messages.success(
self.request, "The security email for this domain have been updated." self.request, "The security email for this domain has been updated."
) )
# superclass has the redirect # superclass has the redirect

View file

@ -7,7 +7,7 @@ certifi==2023.7.22 ; python_version >= '3.6'
cfenv==0.5.3 cfenv==0.5.3
cffi==1.15.1 cffi==1.15.1
charset-normalizer==3.1.0 ; python_full_version >= '3.7.0' charset-normalizer==3.1.0 ; python_full_version >= '3.7.0'
cryptography==41.0.3 ; python_version >= '3.7' cryptography==41.0.4 ; python_version >= '3.7'
defusedxml==0.7.1 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' defusedxml==0.7.1 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'
dj-database-url==2.0.0 dj-database-url==2.0.0
dj-email-url==1.0.6 dj-email-url==1.0.6
@ -22,7 +22,7 @@ django-phonenumber-field[phonenumberslite]==7.1.0
django-widget-tweaks==1.4.12 django-widget-tweaks==1.4.12
environs[django]==9.5.0 environs[django]==9.5.0
faker==18.10.0 faker==18.10.0
git+https://github.com/cisagov/epplib.git@f818cbf0b069a12f03e1d72e4b9f4900924b832d#egg=fred-epplib git+https://github.com/cisagov/epplib.git@d56d183f1664f34c40ca9716a3a9a345f0ef561c#egg=fred-epplib
furl==2.1.3 furl==2.1.3
future==0.18.3 ; python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3' future==0.18.3 ; python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'
gunicorn==20.1.0 gunicorn==20.1.0
@ -49,5 +49,5 @@ setuptools==67.8.0 ; python_version >= '3.7'
six==1.16.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' six==1.16.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
sqlparse==0.4.4 ; python_version >= '3.5' sqlparse==0.4.4 ; python_version >= '3.5'
typing-extensions==4.6.3 typing-extensions==4.6.3
urllib3==1.26.16 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5' urllib3==1.26.17 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5'
whitenoise==6.4.0 whitenoise==6.4.0