diff --git a/.github/workflows/deploy-sandbox.yaml b/.github/workflows/deploy-sandbox.yaml
index 5d9000401..01079a670 100644
--- a/.github/workflows/deploy-sandbox.yaml
+++ b/.github/workflows/deploy-sandbox.yaml
@@ -19,6 +19,7 @@ jobs:
|| startsWith(github.head_ref, 'rh/')
|| startsWith(github.head_ref, 'nl/')
|| startsWith(github.head_ref, 'dk/')
+ || startsWith(github.head_ref, 'es/')
outputs:
environment: ${{ steps.var.outputs.environment}}
runs-on: "ubuntu-latest"
diff --git a/.github/workflows/migrate.yaml b/.github/workflows/migrate.yaml
index 705014af1..3b1035657 100644
--- a/.github/workflows/migrate.yaml
+++ b/.github/workflows/migrate.yaml
@@ -15,6 +15,7 @@ on:
options:
- stable
- staging
+ - es
- nl
- rh
- za
diff --git a/.github/workflows/reset-db.yaml b/.github/workflows/reset-db.yaml
index 0bf1af2d9..654fa27b5 100644
--- a/.github/workflows/reset-db.yaml
+++ b/.github/workflows/reset-db.yaml
@@ -16,6 +16,7 @@ on:
options:
- stable
- staging
+ - es
- nl
- rh
- za
diff --git a/ops/manifests/manifest-es.yaml b/ops/manifests/manifest-es.yaml
new file mode 100644
index 000000000..c4847553f
--- /dev/null
+++ b/ops/manifests/manifest-es.yaml
@@ -0,0 +1,30 @@
+---
+applications:
+- name: getgov-es
+ buildpacks:
+ - python_buildpack
+ path: ../../src
+ instances: 1
+ memory: 512M
+ stack: cflinuxfs4
+ timeout: 180
+ command: ./run.sh
+ health-check-type: http
+ health-check-http-endpoint: /health
+ health-check-invocation-timeout: 40
+ env:
+ # Send stdout and stderr straight to the terminal without buffering
+ PYTHONUNBUFFERED: yup
+ # Tell Django where to find its configuration
+ DJANGO_SETTINGS_MODULE: registrar.config.settings
+ # Tell Django where it is being hosted
+ DJANGO_BASE_URL: https://getgov-es.app.cloud.gov
+ # Tell Django how much stuff to log
+ DJANGO_LOG_LEVEL: INFO
+ # default public site location
+ GETGOV_PUBLIC_SITE_URL: https://beta.get.gov
+ routes:
+ - route: getgov-es.app.cloud.gov
+ services:
+ - getgov-credentials
+ - getgov-es-database
diff --git a/src/Pipfile.lock b/src/Pipfile.lock
index d13ed6382..3e7ae367d 100644
--- a/src/Pipfile.lock
+++ b/src/Pipfile.lock
@@ -353,7 +353,7 @@
},
"fred-epplib": {
"git": "https://github.com/cisagov/epplib.git",
- "ref": "f818cbf0b069a12f03e1d72e4b9f4900924b832d"
+ "ref": "d56d183f1664f34c40ca9716a3a9a345f0ef561c"
},
"furl": {
"hashes": [
diff --git a/src/api/tests/test_available.py b/src/api/tests/test_available.py
index 39ddba071..0bbe01f03 100644
--- a/src/api/tests/test_available.py
+++ b/src/api/tests/test_available.py
@@ -39,7 +39,7 @@ class AvailableViewTest(TestCase):
self.assertIn("gsa.gov", domains)
# entries are all lowercase so GSA.GOV is not in the set
self.assertNotIn("GSA.GOV", domains)
- self.assertNotIn("igorville.gov", domains)
+ self.assertNotIn("igorvilleremixed.gov", domains)
# all the entries have dots
self.assertNotIn("gsa", domains)
@@ -48,7 +48,7 @@ class AvailableViewTest(TestCase):
# input is lowercased so GSA.GOV should be found
self.assertTrue(in_domains("GSA.GOV"))
# This domain should not have been registered
- self.assertFalse(in_domains("igorville.gov"))
+ self.assertFalse(in_domains("igorvilleremixed.gov"))
def test_in_domains_dotgov(self):
"""Domain searches work without trailing .gov"""
@@ -56,7 +56,7 @@ class AvailableViewTest(TestCase):
# input is lowercased so GSA.GOV should be found
self.assertTrue(in_domains("GSA"))
# This domain should not have been registered
- self.assertFalse(in_domains("igorville"))
+ self.assertFalse(in_domains("igorvilleremixed"))
def test_not_available_domain(self):
"""gsa.gov is not available"""
@@ -66,17 +66,17 @@ class AvailableViewTest(TestCase):
self.assertFalse(json.loads(response.content)["available"])
def test_available_domain(self):
- """igorville.gov is still available"""
- request = self.factory.get(API_BASE_PATH + "igorville.gov")
+ """igorvilleremixed.gov is still available"""
+ request = self.factory.get(API_BASE_PATH + "igorvilleremixed.gov")
request.user = self.user
- response = available(request, domain="igorville.gov")
+ response = available(request, domain="igorvilleremixed.gov")
self.assertTrue(json.loads(response.content)["available"])
def test_available_domain_dotgov(self):
- """igorville.gov is still available even without the .gov suffix"""
- request = self.factory.get(API_BASE_PATH + "igorville")
+ """igorvilleremixed.gov is still available even without the .gov suffix"""
+ request = self.factory.get(API_BASE_PATH + "igorvilleremixed")
request.user = self.user
- response = available(request, domain="igorville")
+ response = available(request, domain="igorvilleremixed")
self.assertTrue(json.loads(response.content)["available"])
def test_error_handling(self):
diff --git a/src/epplibwrapper/__init__.py b/src/epplibwrapper/__init__.py
index b306dbd0e..dd6664a3a 100644
--- a/src/epplibwrapper/__init__.py
+++ b/src/epplibwrapper/__init__.py
@@ -11,6 +11,7 @@ logger = logging.getLogger(__name__)
NAMESPACE = SimpleNamespace(
EPP="urn:ietf:params:xml:ns:epp-1.0",
+ SEC_DNS="urn:ietf:params:xml:ns:secDNS-1.1",
XSI="http://www.w3.org/2001/XMLSchema-instance",
FRED="noop",
NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0",
@@ -25,6 +26,7 @@ NAMESPACE = SimpleNamespace(
SCHEMA_LOCATION = SimpleNamespace(
XSI="urn:ietf:params:xml:ns:epp-1.0 epp-1.0.xsd",
FRED="noop fred-1.5.0.xsd",
+ SEC_DNS="urn:ietf:params:xml:ns:secDNS-1.1 secDNS-1.1.xsd",
NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0 contact-1.0.xsd",
NIC_DOMAIN="urn:ietf:params:xml:ns:domain-1.0 domain-1.0.xsd",
NIC_ENUMVAL="noop enumval-1.2.0.xsd",
@@ -44,7 +46,9 @@ except NameError:
try:
from .client import CLIENT, commands
from .errors import RegistryError, ErrorCode
- from epplib.models import common
+ from epplib.models import common, info
+ from epplib.responses import extensions
+ from epplib import responses
except ImportError:
pass
@@ -52,6 +56,9 @@ __all__ = [
"CLIENT",
"commands",
"common",
+ "extensions",
+ "responses",
+ "info",
"ErrorCode",
"RegistryError",
]
diff --git a/src/registrar/admin.py b/src/registrar/admin.py
index e99e767bd..275f67bb3 100644
--- a/src/registrar/admin.py
+++ b/src/registrar/admin.py
@@ -6,10 +6,13 @@ from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
from django.contrib.contenttypes.models import ContentType
from django.http.response import HttpResponseRedirect
from django.urls import reverse
+from epplibwrapper.errors import ErrorCode, RegistryError
+from registrar.models.domain import Domain
from registrar.models.utility.admin_sort_fields import AdminSortFields
from . import models
from auditlog.models import LogEntry # type: ignore
from auditlog.admin import LogEntryAdmin # type: ignore
+from django_fsm import TransitionNotAllowed # type: ignore
logger = logging.getLogger(__name__)
@@ -716,16 +719,61 @@ class DomainAdmin(ListHeaderAdmin):
return super().response_change(request, obj)
def do_delete_domain(self, request, obj):
+ if not isinstance(obj, Domain):
+ # Could be problematic if the type is similar,
+ # but not the same (same field/func names).
+ # We do not want to accidentally delete records.
+ self.message_user(request, "Object is not of type Domain", messages.ERROR)
+ return
+
try:
- obj.deleted()
+ obj.deletedInEpp()
obj.save()
- except Exception as err:
- self.message_user(request, err, messages.ERROR)
+ except RegistryError as err:
+ # Using variables to get past the linter
+ message1 = f"Cannot delete Domain when in state {obj.state}"
+ message2 = "This subdomain is being used as a hostname on another domain"
+ # Human-readable mappings of ErrorCodes. Can be expanded.
+ error_messages = {
+ # noqa on these items as black wants to reformat to an invalid length
+ ErrorCode.OBJECT_STATUS_PROHIBITS_OPERATION: message1,
+ ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION: message2,
+ }
+
+ message = "Cannot connect to the registry"
+ if not err.is_connection_error():
+ # If nothing is found, will default to returned err
+ message = error_messages.get(err.code, err)
+ self.message_user(
+ request, f"Error deleting this Domain: {message}", messages.ERROR
+ )
+ except TransitionNotAllowed:
+ if obj.state == Domain.State.DELETED:
+ self.message_user(
+ request,
+ "This domain is already deleted",
+ messages.INFO,
+ )
+ else:
+ self.message_user(
+ request,
+ "Error deleting this Domain: "
+ f"Can't switch from state '{obj.state}' to 'deleted'"
+ ", must be either 'dns_needed' or 'on_hold'",
+ messages.ERROR,
+ )
+ except Exception:
+ self.message_user(
+ request,
+ "Could not delete: An unspecified error occured",
+ messages.ERROR,
+ )
else:
self.message_user(
request,
- ("Domain %s Should now be deleted " ". Thanks!") % obj.name,
+ ("Domain %s has been deleted. Thanks!") % obj.name,
)
+
return HttpResponseRedirect(".")
def do_get_status(self, request, obj):
diff --git a/src/registrar/config/settings.py b/src/registrar/config/settings.py
index e272e6622..ceb215a4d 100644
--- a/src/registrar/config/settings.py
+++ b/src/registrar/config/settings.py
@@ -570,6 +570,7 @@ SECURE_SSL_REDIRECT = True
ALLOWED_HOSTS = [
"getgov-stable.app.cloud.gov",
"getgov-staging.app.cloud.gov",
+ "getgov-es.app.cloud.gov",
"getgov-nl.app.cloud.gov",
"getgov-rh.app.cloud.gov",
"getgov-za.app.cloud.gov",
diff --git a/src/registrar/fixtures.py b/src/registrar/fixtures.py
index 6b18f37e1..51ee469df 100644
--- a/src/registrar/fixtures.py
+++ b/src/registrar/fixtures.py
@@ -83,6 +83,16 @@ class UserFixture:
"first_name": "Nicolle",
"last_name": "LeClair",
},
+ {
+ "username": "24840450-bf47-4d89-8aa9-c612fe68f9da",
+ "first_name": "Erin",
+ "last_name": "Song",
+ },
+ {
+ "username": "e0ea8b94-6e53-4430-814a-849a7ca45f21",
+ "first_name": "Kristina",
+ "last_name": "Yin",
+ },
]
STAFF = [
@@ -134,6 +144,18 @@ class UserFixture:
"last_name": "LeClair-Analyst",
"email": "nicolle.leclair@ecstech.com",
},
+ {
+ "username": "378d0bc4-d5a7-461b-bd84-3ae6f6864af9",
+ "first_name": "Erin-Analyst",
+ "last_name": "Song-Analyst",
+ "email": "erin.song+1@gsa.gov",
+ },
+ {
+ "username": "9a98e4c9-9409-479d-964e-4aec7799107f",
+ "first_name": "Kristina-Analyst",
+ "last_name": "Yin-Analyst",
+ "email": "kristina.yin+1@gsa.gov",
+ },
]
STAFF_PERMISSIONS = [
diff --git a/src/registrar/migrations/0033_alter_userdomainrole_role.py b/src/registrar/migrations/0033_alter_userdomainrole_role.py
new file mode 100644
index 000000000..bdfcb6257
--- /dev/null
+++ b/src/registrar/migrations/0033_alter_userdomainrole_role.py
@@ -0,0 +1,17 @@
+# Generated by Django 4.2.1 on 2023-10-02 22:07
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+ dependencies = [
+ ("registrar", "0032_alter_transitiondomain_status"),
+ ]
+
+ operations = [
+ migrations.AlterField(
+ model_name="userdomainrole",
+ name="role",
+ field=models.TextField(choices=[("manager", "Admin")]),
+ ),
+ ]
diff --git a/src/registrar/models/domain.py b/src/registrar/models/domain.py
index 2c7f8703c..59edb707a 100644
--- a/src/registrar/models/domain.py
+++ b/src/registrar/models/domain.py
@@ -1,8 +1,8 @@
+from itertools import zip_longest
import logging
-
from datetime import date
from string import digits
-from django_fsm import FSMField, transition # type: ignore
+from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore
from django.db import models
@@ -10,9 +10,12 @@ from epplibwrapper import (
CLIENT as registry,
commands,
common as epp,
+ extensions,
+ info as eppInfo,
RegistryError,
ErrorCode,
)
+from registrar.models.utility.contact_error import ContactError
from .utility.domain_field import DomainField
from .utility.domain_helper import DomainHelper
@@ -279,6 +282,27 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error("Error _create_host, code was %s error was %s" % (e.code, e))
return e.code
+ @Cache
+ def dnssecdata(self) -> extensions.DNSSECExtension:
+ return self._get_property("dnssecdata")
+
+ @dnssecdata.setter # type: ignore
+ def dnssecdata(self, _dnssecdata: extensions.DNSSECExtension):
+ updateParams = {
+ "maxSigLife": _dnssecdata.get("maxSigLife", None),
+ "dsData": _dnssecdata.get("dsData", None),
+ "keyData": _dnssecdata.get("keyData", None),
+ "remAllDsKeyData": True,
+ }
+ request = commands.UpdateDomain(name=self.name)
+ extension = commands.UpdateDomainDNSSECExtension(**updateParams)
+ request.add_extension(extension)
+ try:
+ registry.send(request, cleaned=True)
+ except RegistryError as e:
+ logger.error("Error adding DNSSEC, code was %s error was %s" % (e.code, e))
+ raise e
+
@nameservers.setter # type: ignore
def nameservers(self, hosts: list[tuple[str]]):
"""host should be a tuple of type str, str,... where the elements are
@@ -352,9 +376,9 @@ class Domain(TimeStampedModel, DomainHelper):
raise NotImplementedError()
@Cache
- def registrant_contact(self) -> PublicContact:
- """Get or set the registrant for this domain."""
- raise NotImplementedError()
+ def registrant_contact(self) -> PublicContact | None:
+ registrant = PublicContact.ContactTypeChoices.REGISTRANT
+ return self.generic_contact_getter(registrant)
@registrant_contact.setter # type: ignore
def registrant_contact(self, contact: PublicContact):
@@ -367,9 +391,10 @@ class Domain(TimeStampedModel, DomainHelper):
)
@Cache
- def administrative_contact(self) -> PublicContact:
- """Get or set the admin contact for this domain."""
- raise NotImplementedError()
+ def administrative_contact(self) -> PublicContact | None:
+ """Get the admin contact for this domain."""
+ admin = PublicContact.ContactTypeChoices.ADMINISTRATIVE
+ return self.generic_contact_getter(admin)
@administrative_contact.setter # type: ignore
def administrative_contact(self, contact: PublicContact):
@@ -381,12 +406,6 @@ class Domain(TimeStampedModel, DomainHelper):
self._make_contact_in_registry(contact=contact)
self._update_domain_with_contact(contact, rem=False)
- def get_default_security_contact(self):
- logger.info("getting default sec contact")
- contact = PublicContact.get_default_security()
- contact.domain = self
- return contact
-
def _update_epp_contact(self, contact: PublicContact):
"""Sends UpdateContact to update the actual contact object,
domain object remains unaffected
@@ -440,26 +459,10 @@ class Domain(TimeStampedModel, DomainHelper):
)
@Cache
- def security_contact(self) -> PublicContact:
+ def security_contact(self) -> PublicContact | None:
"""Get or set the security contact for this domain."""
- try:
- contacts = self._get_property("contacts")
- for contact in contacts:
- if (
- "type" in contact.keys()
- and contact["type"] == PublicContact.ContactTypeChoices.SECURITY
- ):
- tempContact = self.get_default_security_contact()
- tempContact.email = contact["email"]
- return tempContact
-
- except Exception as err: # use better error handling
- logger.info("Couldn't get contact %s" % err)
-
- # TODO - remove this ideally it should return None,
- # but error handling needs to be
- # added on the security email page so that it can handle it being none
- return self.get_default_security_contact()
+ security = PublicContact.ContactTypeChoices.SECURITY
+ return self.generic_contact_getter(security)
def _add_registrant_to_existing_domain(self, contact: PublicContact):
"""Used to change the registrant contact on an existing domain"""
@@ -533,6 +536,7 @@ class Domain(TimeStampedModel, DomainHelper):
.filter(domain=self, contact_type=contact.contact_type)
.get()
)
+
if isRegistrant:
# send update domain only for registant contacts
existing_contact.delete()
@@ -589,9 +593,10 @@ class Domain(TimeStampedModel, DomainHelper):
)
@Cache
- def technical_contact(self) -> PublicContact:
+ def technical_contact(self) -> PublicContact | None:
"""Get or set the tech contact for this domain."""
- raise NotImplementedError()
+ tech = PublicContact.ContactTypeChoices.TECHNICAL
+ return self.generic_contact_getter(tech)
@technical_contact.setter # type: ignore
def technical_contact(self, contact: PublicContact):
@@ -609,11 +614,6 @@ class Domain(TimeStampedModel, DomainHelper):
"""
return self.state == self.State.READY
- def delete_request(self):
- """Delete from host. Possibly a duplicate of _delete_host?"""
- # TODO fix in ticket #901
- pass
-
def transfer(self):
"""Going somewhere. Not implemented."""
raise NotImplementedError()
@@ -658,7 +658,7 @@ class Domain(TimeStampedModel, DomainHelper):
"""This domain should be deleted from the registry
may raises RegistryError, should be caught or handled correctly by caller"""
request = commands.DeleteDomain(name=self.name)
- registry.send(request)
+ registry.send(request, cleaned=True)
def __str__(self) -> str:
return self.name
@@ -679,6 +679,231 @@ class Domain(TimeStampedModel, DomainHelper):
help_text="Very basic info about the lifecycle of this domain object",
)
+ def isActive(self):
+ return self.state == Domain.State.CREATED
+
+ def map_epp_contact_to_public_contact(
+ self, contact: eppInfo.InfoContactResultData, contact_id, contact_type
+ ):
+ """Maps the Epp contact representation to a PublicContact object.
+
+ contact -> eppInfo.InfoContactResultData: The converted contact object
+
+ contact_id -> str: The given registry_id of the object (i.e "cheese@cia.gov")
+
+ contact_type -> str: The given contact type, (i.e. "tech" or "registrant")
+ """
+
+ if contact is None:
+ return None
+
+ if contact_type is None:
+ raise ContactError("contact_type is None")
+
+ if contact_id is None:
+ raise ContactError("contact_id is None")
+
+ # Since contact_id is registry_id,
+ # check that its the right length
+ contact_id_length = len(contact_id)
+ if (
+ contact_id_length > PublicContact.get_max_id_length()
+ or contact_id_length < 1
+ ):
+ raise ContactError(
+ "contact_id is of invalid length. "
+ "Cannot exceed 16 characters, "
+ f"got {contact_id} with a length of {contact_id_length}"
+ )
+
+ if not isinstance(contact, eppInfo.InfoContactResultData):
+ raise ContactError("Contact must be of type InfoContactResultData")
+
+ auth_info = contact.auth_info
+ postal_info = contact.postal_info
+ addr = postal_info.addr
+ streets = None
+ if addr is not None:
+ streets = addr.street
+ streets_kwargs = self._convert_streets_to_dict(streets)
+ desired_contact = PublicContact(
+ domain=self,
+ contact_type=contact_type,
+ registry_id=contact_id,
+ email=contact.email or "",
+ voice=contact.voice or "",
+ fax=contact.fax,
+ name=postal_info.name or "",
+ org=postal_info.org,
+ # For linter - default to "" instead of None
+ pw=getattr(auth_info, "pw", ""),
+ city=getattr(addr, "city", ""),
+ pc=getattr(addr, "pc", ""),
+ cc=getattr(addr, "cc", ""),
+ sp=getattr(addr, "sp", ""),
+ **streets_kwargs,
+ ) # type: ignore
+
+ return desired_contact
+
+ def _convert_streets_to_dict(self, streets):
+ """
+ Converts EPPLibs street representation
+ to PublicContacts.
+
+ Args:
+ streets (Sequence[str]): Streets from EPPLib.
+
+ Returns:
+ dict: {
+ "street1": str or "",
+
+ "street2": str or None,
+
+ "street3": str or None,
+ }
+
+ EPPLib returns 'street' as an sequence of strings.
+ Meanwhile, PublicContact has this split into three
+ seperate properties: street1, street2, street3.
+
+ Handles this disparity.
+ """
+ # 'zips' two lists together.
+ # For instance, (('street1', 'some_value_here'),
+ # ('street2', 'some_value_here'))
+ # Dict then converts this to a useable kwarg which we can pass in
+ street_dict = dict(
+ zip_longest(
+ ["street1", "street2", "street3"],
+ streets if streets is not None else [""],
+ fillvalue=None,
+ )
+ )
+ return street_dict
+
+ def _request_contact_info(self, contact: PublicContact):
+ try:
+ req = commands.InfoContact(id=contact.registry_id)
+ return registry.send(req, cleaned=True).res_data[0]
+ except RegistryError as error:
+ logger.error(
+ "Registry threw error for contact id %s contact type is %s, error code is\n %s full error is %s", # noqa
+ contact.registry_id,
+ contact.contact_type,
+ error.code,
+ error,
+ )
+ raise error
+
+ def generic_contact_getter(
+ self, contact_type_choice: PublicContact.ContactTypeChoices
+ ) -> PublicContact | None:
+ """Retrieves the desired PublicContact from the registry.
+ This abstracts the caching and EPP retrieval for
+ all contact items and thus may result in EPP calls being sent.
+
+ contact_type_choice is a literal in PublicContact.ContactTypeChoices,
+ for instance: PublicContact.ContactTypeChoices.SECURITY.
+
+ If you wanted to setup getter logic for Security, you would call:
+ cache_contact_helper(PublicContact.ContactTypeChoices.SECURITY),
+ or cache_contact_helper("security").
+
+ """
+ # registrant_contact(s) are an edge case. They exist on
+ # the "registrant" property as opposed to contacts.
+ desired_property = "contacts"
+ if contact_type_choice == PublicContact.ContactTypeChoices.REGISTRANT:
+ desired_property = "registrant"
+
+ try:
+ # Grab from cache
+ contacts = self._get_property(desired_property)
+ except KeyError as error:
+ logger.error(f"Could not find {contact_type_choice}: {error}")
+ return None
+ else:
+ cached_contact = self.get_contact_in_keys(contacts, contact_type_choice)
+ if cached_contact is None:
+ # TODO - #1103
+ raise ContactError("No contact was found in cache or the registry")
+
+ return cached_contact
+
+ def get_default_security_contact(self):
+ """Gets the default security contact."""
+ contact = PublicContact.get_default_security()
+ contact.domain = self
+ return contact
+
+ def get_default_administrative_contact(self):
+ """Gets the default administrative contact."""
+ contact = PublicContact.get_default_administrative()
+ contact.domain = self
+ return contact
+
+ def get_default_technical_contact(self):
+ """Gets the default technical contact."""
+ contact = PublicContact.get_default_technical()
+ contact.domain = self
+ return contact
+
+ def get_default_registrant_contact(self):
+ """Gets the default registrant contact."""
+ contact = PublicContact.get_default_registrant()
+ contact.domain = self
+ return contact
+
+ def get_contact_in_keys(self, contacts, contact_type):
+ """Gets a contact object.
+
+ Args:
+ contacts ([PublicContact]): List of PublicContacts
+ contact_type (literal): Which PublicContact to get
+ Returns:
+ PublicContact | None
+ """
+ # Registrant doesn't exist as an array, and is of
+ # a special data type, so we need to handle that.
+ if contact_type == PublicContact.ContactTypeChoices.REGISTRANT:
+ desired_contact = None
+ if isinstance(contacts, str):
+ desired_contact = self._registrant_to_public_contact(
+ self._cache["registrant"]
+ )
+ # Set the cache with the updated object
+ # for performance reasons.
+ if "registrant" in self._cache:
+ self._cache["registrant"] = desired_contact
+ elif isinstance(contacts, PublicContact):
+ desired_contact = contacts
+
+ return self._handle_registrant_contact(desired_contact)
+
+ _registry_id: str
+ if contact_type in contacts:
+ _registry_id = contacts.get(contact_type)
+
+ desired = PublicContact.objects.filter(
+ registry_id=_registry_id, domain=self, contact_type=contact_type
+ )
+
+ if desired.count() == 1:
+ return desired.get()
+
+ logger.info(f"Requested contact {_registry_id} does not exist in cache.")
+ return None
+
+ def _handle_registrant_contact(self, contact):
+ if (
+ contact.contact_type is not None
+ and contact.contact_type == PublicContact.ContactTypeChoices.REGISTRANT
+ ):
+ return contact
+ else:
+ raise ValueError("Invalid contact object for registrant_contact")
+
# ForeignKey on UserDomainRole creates a "permissions" member for
# all of the user-roles that are in place for this domain
@@ -725,9 +950,9 @@ class Domain(TimeStampedModel, DomainHelper):
try:
logger.info("Getting domain info from epp")
req = commands.InfoDomain(name=self.name)
- domainInfo = registry.send(req, cleaned=True).res_data[0]
+ domainInfoResponse = registry.send(req, cleaned=True)
exitEarly = True
- return domainInfo
+ return domainInfoResponse
except RegistryError as e:
count += 1
@@ -777,12 +1002,10 @@ class Domain(TimeStampedModel, DomainHelper):
security_contact = self.get_default_security_contact()
security_contact.save()
- technical_contact = PublicContact.get_default_technical()
- technical_contact.domain = self
+ technical_contact = self.get_default_technical_contact()
technical_contact.save()
- administrative_contact = PublicContact.get_default_administrative()
- administrative_contact.domain = self
+ administrative_contact = self.get_default_administrative_contact()
administrative_contact.save()
@transition(
@@ -804,16 +1027,32 @@ class Domain(TimeStampedModel, DomainHelper):
self._remove_client_hold()
# TODO -on the client hold ticket any additional error handling here
- @transition(field="state", source=State.ON_HOLD, target=State.DELETED)
- def deleted(self):
- """domain is deleted in epp but is saved in our database"""
- # TODO Domains may not be deleted if:
- # a child host is being used by
- # another .gov domains. The host must be first removed
- # and/or renamed before the parent domain may be deleted.
- logger.info("pendingCreate()-> inside pending create")
- self._delete_domain()
- # TODO - delete ticket any additional error handling here
+ @transition(
+ field="state", source=[State.ON_HOLD, State.DNS_NEEDED], target=State.DELETED
+ )
+ def deletedInEpp(self):
+ """Domain is deleted in epp but is saved in our database.
+ Error handling should be provided by the caller."""
+ # While we want to log errors, we want to preserve
+ # that information when this function is called.
+ # Human-readable errors are introduced at the admin.py level,
+ # as doing everything here would reduce reliablity.
+ try:
+ logger.info("deletedInEpp()-> inside _delete_domain")
+ self._delete_domain()
+ except RegistryError as err:
+ logger.error(f"Could not delete domain. Registry returned error: {err}")
+ raise err
+ except TransitionNotAllowed as err:
+ logger.error("Could not delete domain. FSM failure: {err}")
+ raise err
+ except Exception as err:
+ logger.error(
+ f"Could not delete domain. An unspecified error occured: {err}"
+ )
+ raise err
+ else:
+ self._invalidate_cache()
@transition(
field="state",
@@ -910,16 +1149,34 @@ class Domain(TimeStampedModel, DomainHelper):
)
return err.code
- def _request_contact_info(self, contact: PublicContact):
- req = commands.InfoContact(id=contact.registry_id)
- return registry.send(req, cleaned=True).res_data[0]
+ def _fetch_contacts(self, contact_data):
+ """Fetch contact info."""
+ choices = PublicContact.ContactTypeChoices
+ # We expect that all these fields get populated,
+ # so we can create these early, rather than waiting.
+ contacts_dict = {
+ choices.ADMINISTRATIVE: None,
+ choices.SECURITY: None,
+ choices.TECHNICAL: None,
+ }
+ for domainContact in contact_data:
+ req = commands.InfoContact(id=domainContact.contact)
+ data = registry.send(req, cleaned=True).res_data[0]
+
+ # Map the object we recieved from EPP to a PublicContact
+ mapped_object = self.map_epp_contact_to_public_contact(
+ data, domainContact.contact, domainContact.type
+ )
+
+ # Find/create it in the DB
+ in_db = self._get_or_create_public_contact(mapped_object)
+ contacts_dict[in_db.contact_type] = in_db.registry_id
+ return contacts_dict
def _get_or_create_contact(self, contact: PublicContact):
"""Try to fetch info about a contact. Create it if it does not exist."""
-
try:
return self._request_contact_info(contact)
-
except RegistryError as e:
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST:
logger.info(
@@ -942,6 +1199,23 @@ class Domain(TimeStampedModel, DomainHelper):
raise e
+ def _fetch_hosts(self, host_data):
+ """Fetch host info."""
+ hosts = []
+ for name in host_data:
+ req = commands.InfoHost(name=name)
+ data = registry.send(req, cleaned=True).res_data[0]
+ host = {
+ "name": name,
+ "addrs": getattr(data, "addrs", ...),
+ "cr_date": getattr(data, "cr_date", ...),
+ "statuses": getattr(data, "statuses", ...),
+ "tr_date": getattr(data, "tr_date", ...),
+ "up_date": getattr(data, "up_date", ...),
+ }
+ hosts.append({k: v for k, v in host.items() if v is not ...})
+ return hosts
+
def _update_or_create_host(self, host):
raise NotImplementedError()
@@ -952,7 +1226,8 @@ class Domain(TimeStampedModel, DomainHelper):
"""Contact registry for info about a domain."""
try:
# get info from registry
- data = self._get_or_create_domain()
+ dataResponse = self._get_or_create_domain()
+ data = dataResponse.res_data[0]
# extract properties from response
# (Ellipsis is used to mean "null")
cache = {
@@ -967,89 +1242,116 @@ class Domain(TimeStampedModel, DomainHelper):
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
}
-
# remove null properties (to distinguish between "a value of None" and null)
cleaned = {k: v for k, v in cache.items() if v is not ...}
# statuses can just be a list no need to keep the epp object
- if "statuses" in cleaned.keys():
+ if "statuses" in cleaned:
cleaned["statuses"] = [status.state for status in cleaned["statuses"]]
+
+ # get extensions info, if there is any
+ # DNSSECExtension is one possible extension, make sure to handle
+ # only DNSSECExtension and not other type extensions
+ returned_extensions = dataResponse.extensions
+ cleaned["dnssecdata"] = None
+ for extension in returned_extensions:
+ if isinstance(extension, extensions.DNSSECExtension):
+ cleaned["dnssecdata"] = extension
+ # Capture and store old hosts and contacts from cache if they exist
+ old_cache_hosts = self._cache.get("hosts")
+ old_cache_contacts = self._cache.get("contacts")
+
# get contact info, if there are any
if (
- # fetch_contacts and
- "_contacts" in cleaned
+ fetch_contacts
+ and "_contacts" in cleaned
and isinstance(cleaned["_contacts"], list)
- and len(cleaned["_contacts"])
+ and len(cleaned["_contacts"]) > 0
):
- cleaned["contacts"] = []
- for domainContact in cleaned["_contacts"]:
- # we do not use _get_or_create_* because we expect the object we
- # just asked the registry for still exists --
- # if not, that's a problem
-
- # TODO- discuss-should we check if contact is in public contacts
- # and add it if not- this is really to keep in mine the transisiton
- req = commands.InfoContact(id=domainContact.contact)
- data = registry.send(req, cleaned=True).res_data[0]
-
- # extract properties from response
- # (Ellipsis is used to mean "null")
- # convert this to use PublicContactInstead
- contact = {
- "id": domainContact.contact,
- "type": domainContact.type,
- "auth_info": getattr(data, "auth_info", ...),
- "cr_date": getattr(data, "cr_date", ...),
- "disclose": getattr(data, "disclose", ...),
- "email": getattr(data, "email", ...),
- "fax": getattr(data, "fax", ...),
- "postal_info": getattr(data, "postal_info", ...),
- "statuses": getattr(data, "statuses", ...),
- "tr_date": getattr(data, "tr_date", ...),
- "up_date": getattr(data, "up_date", ...),
- "voice": getattr(data, "voice", ...),
- }
-
- cleaned["contacts"].append(
- {k: v for k, v in contact.items() if v is not ...}
- )
+ cleaned["contacts"] = self._fetch_contacts(cleaned["_contacts"])
+ # We're only getting contacts, so retain the old
+ # hosts that existed in cache (if they existed)
+ # and pass them along.
+ if old_cache_hosts is not None:
+ cleaned["hosts"] = old_cache_hosts
# get nameserver info, if there are any
if (
- # fetch_hosts and
- "_hosts" in cleaned
+ fetch_hosts
+ and "_hosts" in cleaned
and isinstance(cleaned["_hosts"], list)
and len(cleaned["_hosts"])
):
- # TODO- add elif in cache set it to be the old cache value
- # no point in removing
- cleaned["hosts"] = []
- for name in cleaned["_hosts"]:
- # we do not use _get_or_create_* because we expect the object we
- # just asked the registry for still exists --
- # if not, that's a problem
- req = commands.InfoHost(name=name)
- data = registry.send(req, cleaned=True).res_data[0]
- # extract properties from response
- # (Ellipsis is used to mean "null")
- host = {
- "name": name,
- "addrs": getattr(data, "addrs", ...),
- "cr_date": getattr(data, "cr_date", ...),
- "statuses": getattr(data, "statuses", ...),
- "tr_date": getattr(data, "tr_date", ...),
- "up_date": getattr(data, "up_date", ...),
- }
- cleaned["hosts"].append(
- {k: v for k, v in host.items() if v is not ...}
- )
-
+ cleaned["hosts"] = self._fetch_hosts(cleaned["_hosts"])
+ # We're only getting hosts, so retain the old
+ # contacts that existed in cache (if they existed)
+ # and pass them along.
+ if old_cache_contacts is not None:
+ cleaned["contacts"] = old_cache_contacts
# replace the prior cache with new data
self._cache = cleaned
except RegistryError as e:
logger.error(e)
+ def _get_or_create_public_contact(self, public_contact: PublicContact):
+ """Tries to find a PublicContact object in our DB.
+ If it can't, it'll create it. Returns PublicContact"""
+ db_contact = PublicContact.objects.filter(
+ registry_id=public_contact.registry_id,
+ contact_type=public_contact.contact_type,
+ domain=self,
+ )
+
+ # Raise an error if we find duplicates.
+ # This should not occur
+ if db_contact.count() > 1:
+ raise Exception(
+ f"Multiple contacts found for {public_contact.contact_type}"
+ )
+
+ # Save to DB if it doesn't exist already.
+ if db_contact.count() == 0:
+ # Doesn't run custom save logic, just saves to DB
+ public_contact.save(skip_epp_save=True)
+ logger.info(f"Created a new PublicContact: {public_contact}")
+ # Append the item we just created
+ return public_contact
+
+ existing_contact = db_contact.get()
+
+ # Does the item we're grabbing match
+ # what we have in our DB?
+ if (
+ existing_contact.email != public_contact.email
+ or existing_contact.registry_id != public_contact.registry_id
+ ):
+ existing_contact.delete()
+ public_contact.save()
+ logger.warning("Requested PublicContact is out of sync " "with DB.")
+ return public_contact
+ # If it already exists, we can
+ # assume that the DB instance was updated
+ # during set, so we should just use that.
+ return existing_contact
+
+ def _registrant_to_public_contact(self, registry_id: str):
+ """EPPLib returns the registrant as a string,
+ which is the registrants associated registry_id. This function is used to
+ convert that id to a useable object by calling commands.InfoContact
+ on that ID, then mapping that object to type PublicContact."""
+ contact = PublicContact(
+ registry_id=registry_id,
+ contact_type=PublicContact.ContactTypeChoices.REGISTRANT,
+ )
+ # Grabs the expanded contact
+ full_object = self._request_contact_info(contact)
+ # Maps it to type PublicContact
+ mapped_object = self.map_epp_contact_to_public_contact(
+ full_object, contact.registry_id, contact.contact_type
+ )
+ return self._get_or_create_public_contact(mapped_object)
+
def _invalidate_cache(self):
"""Remove cache data when updates are made."""
self._cache = {}
diff --git a/src/registrar/models/domain_application.py b/src/registrar/models/domain_application.py
index 7df51baf4..68429d381 100644
--- a/src/registrar/models/domain_application.py
+++ b/src/registrar/models/domain_application.py
@@ -6,6 +6,7 @@ import logging
from django.apps import apps
from django.db import models
from django_fsm import FSMField, transition # type: ignore
+from registrar.models.domain import Domain
from .utility.time_stamped_model import TimeStampedModel
from ..utility.email import send_templated_email, EmailSendingError
@@ -610,9 +611,11 @@ class DomainApplication(TimeStampedModel):
As side effects this will delete the domain and domain_information
(will cascade), and send an email notification."""
-
if self.status == self.APPROVED:
- self.approved_domain.delete_request()
+ domain_state = self.approved_domain.state
+ # Only reject if it exists on EPP
+ if domain_state != Domain.State.UNKNOWN:
+ self.approved_domain.deletedInEpp()
self.approved_domain.delete()
self.approved_domain = None
@@ -638,7 +641,10 @@ class DomainApplication(TimeStampedModel):
and domain_information (will cascade) when they exist."""
if self.status == self.APPROVED:
- self.approved_domain.delete_request()
+ domain_state = self.approved_domain.state
+ # Only reject if it exists on EPP
+ if domain_state != Domain.State.UNKNOWN:
+ self.approved_domain.deletedInEpp()
self.approved_domain.delete()
self.approved_domain = None
diff --git a/src/registrar/models/public_contact.py b/src/registrar/models/public_contact.py
index d9ddecad4..4afe3c467 100644
--- a/src/registrar/models/public_contact.py
+++ b/src/registrar/models/public_contact.py
@@ -29,7 +29,8 @@ class PublicContact(TimeStampedModel):
def save(self, *args, **kwargs):
"""Save to the registry and also locally in the registrar database."""
- if hasattr(self, "domain"):
+ skip_epp_save = kwargs.pop("skip_epp_save", False)
+ if hasattr(self, "domain") and not skip_epp_save:
match self.contact_type:
case PublicContact.ContactTypeChoices.REGISTRANT:
self.domain.registrant_contact = self
@@ -148,6 +149,10 @@ class PublicContact(TimeStampedModel):
pw="thisisnotapassword",
)
+ @classmethod
+ def get_max_id_length(cls):
+ return cls._meta.get_field("registry_id").max_length
+
def __str__(self):
return (
f"{self.name} <{self.email}>"
diff --git a/src/registrar/models/user_domain_role.py b/src/registrar/models/user_domain_role.py
index 5a5219543..e5cb01cc1 100644
--- a/src/registrar/models/user_domain_role.py
+++ b/src/registrar/models/user_domain_role.py
@@ -15,7 +15,7 @@ class UserDomainRole(TimeStampedModel):
elsewhere.
"""
- ADMIN = "admin"
+ ADMIN = "manager"
user = models.ForeignKey(
"registrar.User",
diff --git a/src/registrar/models/utility/contact_error.py b/src/registrar/models/utility/contact_error.py
new file mode 100644
index 000000000..93084eca2
--- /dev/null
+++ b/src/registrar/models/utility/contact_error.py
@@ -0,0 +1,2 @@
+class ContactError(Exception):
+ ...
diff --git a/src/registrar/templates/django/admin/domain_change_form.html b/src/registrar/templates/django/admin/domain_change_form.html
index 1b8b90930..ac26fc922 100644
--- a/src/registrar/templates/django/admin/domain_change_form.html
+++ b/src/registrar/templates/django/admin/domain_change_form.html
@@ -15,7 +15,9 @@
{% endif %}
-
+ {% if original.state != original.State.DELETED %}
+
+ {% endif %}
{{ block.super }}
{% endblock %}
diff --git a/src/registrar/templates/domain_security_email.html b/src/registrar/templates/domain_security_email.html
index e20d67355..8175fa394 100644
--- a/src/registrar/templates/domain_security_email.html
+++ b/src/registrar/templates/domain_security_email.html
@@ -21,7 +21,7 @@
+ >{% if form.security_email.value is None or form.security_email.value == "dotgov@cisa.dhs.gov"%}Add security email{% else %}Save{% endif %}
{% endblock %} {# domain_content #}
diff --git a/src/registrar/tests/common.py b/src/registrar/tests/common.py
index fe41647f9..0dd1ee231 100644
--- a/src/registrar/tests/common.py
+++ b/src/registrar/tests/common.py
@@ -26,6 +26,7 @@ from registrar.models import (
from epplibwrapper import (
commands,
common,
+ info,
RegistryError,
ErrorCode,
)
@@ -555,32 +556,117 @@ class MockEppLib(TestCase):
contacts=...,
hosts=...,
statuses=...,
+ registrant=...,
):
self.auth_info = auth_info
self.cr_date = cr_date
self.contacts = contacts
self.hosts = hosts
self.statuses = statuses
+ self.registrant = registrant
+
+ def dummyInfoContactResultData(
+ self,
+ id,
+ email,
+ cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
+ pw="thisisnotapassword",
+ ):
+ fake = info.InfoContactResultData(
+ id=id,
+ postal_info=common.PostalInfo(
+ name="Registry Customer Service",
+ addr=common.ContactAddr(
+ street=["4200 Wilson Blvd."],
+ city="Arlington",
+ pc="22201",
+ cc="US",
+ sp="VA",
+ ),
+ org="Cybersecurity and Infrastructure Security Agency",
+ type="type",
+ ),
+ voice="+1.8882820870",
+ fax="+1-212-9876543",
+ email=email,
+ auth_info=common.ContactAuthInfo(pw=pw),
+ roid=...,
+ statuses=[],
+ cl_id=...,
+ cr_id=...,
+ cr_date=cr_date,
+ up_id=...,
+ up_date=...,
+ tr_date=...,
+ disclose=...,
+ vat=...,
+ ident=...,
+ notify_email=...,
+ )
+ return fake
mockDataInfoDomain = fakedEppObject(
- "fakepw",
+ "fakePw",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
- contacts=[common.DomainContact(contact="123", type="security")],
+ contacts=[
+ common.DomainContact(
+ contact="123", type=PublicContact.ContactTypeChoices.SECURITY
+ )
+ ],
hosts=["fake.host.com"],
statuses=[
common.Status(state="serverTransferProhibited", description="", lang="en"),
common.Status(state="inactive", description="", lang="en"),
],
)
+ mockDataInfoContact = mockDataInfoDomain.dummyInfoContactResultData(
+ "123", "123@mail.gov", datetime.datetime(2023, 5, 25, 19, 45, 35), "lastPw"
+ )
+ InfoDomainWithContacts = fakedEppObject(
+ "fakepw",
+ cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
+ contacts=[
+ common.DomainContact(
+ contact="securityContact",
+ type=PublicContact.ContactTypeChoices.SECURITY,
+ ),
+ common.DomainContact(
+ contact="technicalContact",
+ type=PublicContact.ContactTypeChoices.TECHNICAL,
+ ),
+ common.DomainContact(
+ contact="adminContact",
+ type=PublicContact.ContactTypeChoices.ADMINISTRATIVE,
+ ),
+ ],
+ hosts=["fake.host.com"],
+ statuses=[
+ common.Status(state="serverTransferProhibited", description="", lang="en"),
+ common.Status(state="inactive", description="", lang="en"),
+ ],
+ registrant="regContact",
+ )
+
+ mockSecurityContact = InfoDomainWithContacts.dummyInfoContactResultData(
+ "securityContact", "security@mail.gov"
+ )
+ mockTechnicalContact = InfoDomainWithContacts.dummyInfoContactResultData(
+ "technicalContact", "tech@mail.gov"
+ )
+ mockAdministrativeContact = InfoDomainWithContacts.dummyInfoContactResultData(
+ "adminContact", "admin@mail.gov"
+ )
+ mockRegistrantContact = InfoDomainWithContacts.dummyInfoContactResultData(
+ "regContact", "registrant@mail.gov"
+ )
+
infoDomainNoContact = fakedEppObject(
"security",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
contacts=[],
hosts=["fake.host.com"],
)
- mockDataInfoContact = fakedEppObject(
- "anotherPw", cr_date=datetime.datetime(2023, 7, 25, 19, 45, 35)
- )
+
mockDataInfoHosts = fakedEppObject(
"lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35)
)
@@ -593,9 +679,28 @@ class MockEppLib(TestCase):
if isinstance(_request, commands.InfoDomain):
if getattr(_request, "name", None) == "security.gov":
return MagicMock(res_data=[self.infoDomainNoContact])
- return MagicMock(res_data=[self.mockDataInfoDomain])
+ elif getattr(_request, "name", None) == "freeman.gov":
+ return MagicMock(res_data=[self.InfoDomainWithContacts])
+ else:
+ return MagicMock(res_data=[self.mockDataInfoDomain])
elif isinstance(_request, commands.InfoContact):
- return MagicMock(res_data=[self.mockDataInfoContact])
+ mocked_result: info.InfoContactResultData
+
+ # For testing contact types
+ match getattr(_request, "id", None):
+ case "securityContact":
+ mocked_result = self.mockSecurityContact
+ case "technicalContact":
+ mocked_result = self.mockTechnicalContact
+ case "adminContact":
+ mocked_result = self.mockAdministrativeContact
+ case "regContact":
+ mocked_result = self.mockRegistrantContact
+ case _:
+ # Default contact return
+ mocked_result = self.mockDataInfoContact
+
+ return MagicMock(res_data=[mocked_result])
elif (
isinstance(_request, commands.CreateContact)
and getattr(_request, "id", None) == "fail"
@@ -604,6 +709,16 @@ class MockEppLib(TestCase):
# use this for when a contact is being updated
# sets the second send() to fail
raise RegistryError(code=ErrorCode.OBJECT_EXISTS)
+ elif (
+ isinstance(_request, commands.DeleteDomain)
+ and getattr(_request, "name", None) == "failDelete.gov"
+ ):
+ name = getattr(_request, "name", None)
+ fake_nameserver = "ns1.failDelete.gov"
+ if name in fake_nameserver:
+ raise RegistryError(
+ code=ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION
+ )
return MagicMock(res_data=[self.mockDataInfoHosts])
def setUp(self):
diff --git a/src/registrar/tests/test_admin.py b/src/registrar/tests/test_admin.py
index 9ff9ce451..def475536 100644
--- a/src/registrar/tests/test_admin.py
+++ b/src/registrar/tests/test_admin.py
@@ -49,6 +49,7 @@ class TestDomainAdmin(MockEppLib):
self.client = Client(HTTP_HOST="localhost:8080")
self.superuser = create_superuser()
self.staffuser = create_user()
+ self.factory = RequestFactory()
super().setUp()
def test_place_and_remove_hold(self):
@@ -87,6 +88,155 @@ class TestDomainAdmin(MockEppLib):
self.assertContains(response, "Place hold")
self.assertNotContains(response, "Remove hold")
+ def test_deletion_is_successful(self):
+ """
+ Scenario: Domain deletion is unsuccessful
+ When the domain is deleted
+ Then a user-friendly success message is returned for displaying on the web
+ And `state` is et to `DELETED`
+ """
+ domain = create_ready_domain()
+ # Put in client hold
+ domain.place_client_hold()
+ p = "userpass"
+ self.client.login(username="staffuser", password=p)
+
+ # Ensure everything is displaying correctly
+ response = self.client.get(
+ "/admin/registrar/domain/{}/change/".format(domain.pk),
+ follow=True,
+ )
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, domain.name)
+ self.assertContains(response, "Delete Domain in Registry")
+
+ # Test the info dialog
+ request = self.factory.post(
+ "/admin/registrar/domain/{}/change/".format(domain.pk),
+ {"_delete_domain": "Delete Domain in Registry", "name": domain.name},
+ follow=True,
+ )
+ request.user = self.client
+
+ with patch("django.contrib.messages.add_message") as mock_add_message:
+ self.admin.do_delete_domain(request, domain)
+ mock_add_message.assert_called_once_with(
+ request,
+ messages.INFO,
+ "Domain city.gov has been deleted. Thanks!",
+ extra_tags="",
+ fail_silently=False,
+ )
+
+ self.assertEqual(domain.state, Domain.State.DELETED)
+
+ def test_deletion_ready_fsm_failure(self):
+ """
+ Scenario: Domain deletion is unsuccessful
+ When an error is returned from epplibwrapper
+ Then a user-friendly error message is returned for displaying on the web
+ And `state` is not set to `DELETED`
+ """
+ domain = create_ready_domain()
+ p = "userpass"
+ self.client.login(username="staffuser", password=p)
+
+ # Ensure everything is displaying correctly
+ response = self.client.get(
+ "/admin/registrar/domain/{}/change/".format(domain.pk),
+ follow=True,
+ )
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, domain.name)
+ self.assertContains(response, "Delete Domain in Registry")
+
+ # Test the error
+ request = self.factory.post(
+ "/admin/registrar/domain/{}/change/".format(domain.pk),
+ {"_delete_domain": "Delete Domain in Registry", "name": domain.name},
+ follow=True,
+ )
+ request.user = self.client
+
+ with patch("django.contrib.messages.add_message") as mock_add_message:
+ self.admin.do_delete_domain(request, domain)
+ mock_add_message.assert_called_once_with(
+ request,
+ messages.ERROR,
+ "Error deleting this Domain: "
+ "Can't switch from state 'ready' to 'deleted'"
+ ", must be either 'dns_needed' or 'on_hold'",
+ extra_tags="",
+ fail_silently=False,
+ )
+
+ self.assertEqual(domain.state, Domain.State.READY)
+
+ def test_analyst_deletes_domain_idempotent(self):
+ """
+ Scenario: Analyst tries to delete an already deleted domain
+ Given `state` is already `DELETED`
+ When `domain.deletedInEpp()` is called
+ Then `commands.DeleteDomain` is sent to the registry
+ And Domain returns normally without an error dialog
+ """
+ domain = create_ready_domain()
+ # Put in client hold
+ domain.place_client_hold()
+ p = "userpass"
+ self.client.login(username="staffuser", password=p)
+
+ # Ensure everything is displaying correctly
+ response = self.client.get(
+ "/admin/registrar/domain/{}/change/".format(domain.pk),
+ follow=True,
+ )
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, domain.name)
+ self.assertContains(response, "Delete Domain in Registry")
+
+ # Test the info dialog
+ request = self.factory.post(
+ "/admin/registrar/domain/{}/change/".format(domain.pk),
+ {"_delete_domain": "Delete Domain in Registry", "name": domain.name},
+ follow=True,
+ )
+ request.user = self.client
+
+ # Delete it once
+ with patch("django.contrib.messages.add_message") as mock_add_message:
+ self.admin.do_delete_domain(request, domain)
+ mock_add_message.assert_called_once_with(
+ request,
+ messages.INFO,
+ "Domain city.gov has been deleted. Thanks!",
+ extra_tags="",
+ fail_silently=False,
+ )
+
+ self.assertEqual(domain.state, Domain.State.DELETED)
+
+ # Try to delete it again
+ # Test the info dialog
+ request = self.factory.post(
+ "/admin/registrar/domain/{}/change/".format(domain.pk),
+ {"_delete_domain": "Delete Domain in Registry", "name": domain.name},
+ follow=True,
+ )
+ request.user = self.client
+
+ with patch("django.contrib.messages.add_message") as mock_add_message:
+ self.admin.do_delete_domain(request, domain)
+ mock_add_message.assert_called_once_with(
+ request,
+ messages.INFO,
+ "This domain is already deleted",
+ extra_tags="",
+ fail_silently=False,
+ )
+
+ self.assertEqual(domain.state, Domain.State.DELETED)
+
@skip("Waiting on epp lib to implement")
def test_place_and_remove_hold_epp(self):
raise
@@ -138,8 +288,9 @@ class TestDomainApplicationAdminForm(TestCase):
)
-class TestDomainApplicationAdmin(TestCase):
+class TestDomainApplicationAdmin(MockEppLib):
def setUp(self):
+ super().setUp()
self.site = AdminSite()
self.factory = RequestFactory()
self.admin = DomainApplicationAdmin(
@@ -690,6 +841,7 @@ class TestDomainApplicationAdmin(TestCase):
domain_information.refresh_from_db()
def tearDown(self):
+ super().tearDown()
Domain.objects.all().delete()
DomainInformation.objects.all().delete()
DomainApplication.objects.all().delete()
diff --git a/src/registrar/tests/test_models_domain.py b/src/registrar/tests/test_models_domain.py
index 54045bb32..50456c2d5 100644
--- a/src/registrar/tests/test_models_domain.py
+++ b/src/registrar/tests/test_models_domain.py
@@ -3,9 +3,10 @@ Feature being tested: Registry Integration
This file tests the various ways in which the registrar interacts with the registry.
"""
+from typing import Mapping, Any
from django.test import TestCase
from django.db.utils import IntegrityError
-from unittest.mock import patch, call
+from unittest.mock import MagicMock, patch, call
import datetime
from registrar.models import Domain
@@ -16,22 +17,32 @@ from registrar.models.draft_domain import DraftDomain
from registrar.models.public_contact import PublicContact
from registrar.models.user import User
from .common import MockEppLib
-
+from django_fsm import TransitionNotAllowed # type: ignore
from epplibwrapper import (
commands,
common,
+ extensions,
+ responses,
RegistryError,
ErrorCode,
)
+import logging
+
+logger = logging.getLogger(__name__)
class TestDomainCache(MockEppLib):
+ def tearDown(self):
+ PublicContact.objects.all().delete()
+ Domain.objects.all().delete()
+ super().tearDown()
+
def test_cache_sets_resets(self):
"""Cache should be set on getter and reset on setter calls"""
domain, _ = Domain.objects.get_or_create(name="igorville.gov")
# trigger getter
_ = domain.creation_date
-
+ domain._get_property("contacts")
# getter should set the domain cache with a InfoDomain object
# (see InfoDomainResult)
self.assertEquals(domain._cache["auth_info"], self.mockDataInfoDomain.auth_info)
@@ -51,8 +62,6 @@ class TestDomainCache(MockEppLib):
commands.InfoDomain(name="igorville.gov", auth_info=None),
cleaned=True,
),
- call(commands.InfoContact(id="123", auth_info=None), cleaned=True),
- call(commands.InfoHost(name="fake.host.com"), cleaned=True),
],
any_order=False, # Ensure calls are in the specified order
)
@@ -74,8 +83,6 @@ class TestDomainCache(MockEppLib):
call(
commands.InfoDomain(name="igorville.gov", auth_info=None), cleaned=True
),
- call(commands.InfoContact(id="123", auth_info=None), cleaned=True),
- call(commands.InfoHost(name="fake.host.com"), cleaned=True),
]
self.mockedSendFunction.assert_has_calls(expectedCalls)
@@ -83,13 +90,16 @@ class TestDomainCache(MockEppLib):
def test_cache_nested_elements(self):
"""Cache works correctly with the nested objects cache and hosts"""
domain, _ = Domain.objects.get_or_create(name="igorville.gov")
-
- # the cached contacts and hosts should be dictionaries of what is passed to them
+ # The contact list will initially contain objects of type 'DomainContact'
+ # this is then transformed into PublicContact, and cache should NOT
+ # hold onto the DomainContact object
+ expectedUnfurledContactsList = [
+ common.DomainContact(contact="123", type="security"),
+ ]
expectedContactsDict = {
- "id": self.mockDataInfoDomain.contacts[0].contact,
- "type": self.mockDataInfoDomain.contacts[0].type,
- "auth_info": self.mockDataInfoContact.auth_info,
- "cr_date": self.mockDataInfoContact.cr_date,
+ PublicContact.ContactTypeChoices.ADMINISTRATIVE: None,
+ PublicContact.ContactTypeChoices.SECURITY: "123",
+ PublicContact.ContactTypeChoices.TECHNICAL: None,
}
expectedHostsDict = {
"name": self.mockDataInfoDomain.hosts[0],
@@ -105,15 +115,83 @@ class TestDomainCache(MockEppLib):
# check contacts
self.assertEqual(domain._cache["_contacts"], self.mockDataInfoDomain.contacts)
- self.assertEqual(domain._cache["contacts"], [expectedContactsDict])
+ # The contact list should not contain what is sent by the registry by default,
+ # as _fetch_cache will transform the type to PublicContact
+ self.assertNotEqual(domain._cache["contacts"], expectedUnfurledContactsList)
+ self.assertEqual(domain._cache["contacts"], expectedContactsDict)
# get and check hosts is set correctly
domain._get_property("hosts")
self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
+ self.assertEqual(domain._cache["contacts"], expectedContactsDict)
+ # invalidate cache
+ domain._cache = {}
- def tearDown(self) -> None:
- Domain.objects.all().delete()
- super().tearDown()
+ # get host
+ domain._get_property("hosts")
+ self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
+
+ # get contacts
+ domain._get_property("contacts")
+ self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
+ self.assertEqual(domain._cache["contacts"], expectedContactsDict)
+
+ def test_map_epp_contact_to_public_contact(self):
+ # Tests that the mapper is working how we expect
+ domain, _ = Domain.objects.get_or_create(name="registry.gov")
+ security = PublicContact.ContactTypeChoices.SECURITY
+ mapped = domain.map_epp_contact_to_public_contact(
+ self.mockDataInfoContact,
+ self.mockDataInfoContact.id,
+ security,
+ )
+
+ expected_contact = PublicContact(
+ domain=domain,
+ contact_type=security,
+ registry_id="123",
+ email="123@mail.gov",
+ voice="+1.8882820870",
+ fax="+1-212-9876543",
+ pw="lastPw",
+ name="Registry Customer Service",
+ org="Cybersecurity and Infrastructure Security Agency",
+ city="Arlington",
+ pc="22201",
+ cc="US",
+ sp="VA",
+ street1="4200 Wilson Blvd.",
+ )
+
+ # Test purposes only, since we're comparing
+ # two duplicate objects. We would expect
+ # these not to have the same state.
+ expected_contact._state = mapped._state
+
+ # Mapped object is what we expect
+ self.assertEqual(mapped.__dict__, expected_contact.__dict__)
+
+ # The mapped object should correctly translate to a DB
+ # object. If not, something else went wrong.
+ db_object = domain._get_or_create_public_contact(mapped)
+ in_db = PublicContact.objects.filter(
+ registry_id=domain.security_contact.registry_id,
+ contact_type=security,
+ ).get()
+ # DB Object is the same as the mapped object
+ self.assertEqual(db_object, in_db)
+
+ domain.security_contact = in_db
+ # Trigger the getter
+ _ = domain.security_contact
+ # Check to see that changes made
+ # to DB objects persist in cache correctly
+ in_db.email = "123test@mail.gov"
+ in_db.save()
+
+ cached_contact = domain._cache["contacts"].get(security)
+ self.assertEqual(cached_contact, in_db.registry_id)
+ self.assertEqual(domain.security_contact.email, "123test@mail.gov")
class TestDomainCreation(MockEppLib):
@@ -170,8 +248,6 @@ class TestDomainCreation(MockEppLib):
commands.InfoDomain(name="beef-tongue.gov", auth_info=None),
cleaned=True,
),
- call(commands.InfoContact(id="123", auth_info=None), cleaned=True),
- call(commands.InfoHost(name="fake.host.com"), cleaned=True),
],
any_order=False, # Ensure calls are in the specified order
)
@@ -199,7 +275,10 @@ class TestDomainCreation(MockEppLib):
def tearDown(self) -> None:
DomainInformation.objects.all().delete()
DomainApplication.objects.all().delete()
+ PublicContact.objects.all().delete()
Domain.objects.all().delete()
+ User.objects.all().delete()
+ DraftDomain.objects.all().delete()
super().tearDown()
@@ -213,7 +292,6 @@ class TestDomainStatuses(MockEppLib):
_ = domain.statuses
status_list = [status.state for status in self.mockDataInfoDomain.statuses]
self.assertEquals(domain._cache["statuses"], status_list)
-
# Called in _fetch_cache
self.mockedSendFunction.assert_has_calls(
[
@@ -221,8 +299,6 @@ class TestDomainStatuses(MockEppLib):
commands.InfoDomain(name="chicken-liver.gov", auth_info=None),
cleaned=True,
),
- call(commands.InfoContact(id="123", auth_info=None), cleaned=True),
- call(commands.InfoHost(name="fake.host.com"), cleaned=True),
],
any_order=False, # Ensure calls are in the specified order
)
@@ -259,10 +335,119 @@ class TestDomainStatuses(MockEppLib):
raise
def tearDown(self) -> None:
+ PublicContact.objects.all().delete()
Domain.objects.all().delete()
super().tearDown()
+class TestDomainAvailable(MockEppLib):
+ """Test Domain.available"""
+
+ # No SetUp or tearDown necessary for these tests
+
+ def test_domain_available(self):
+ """
+ Scenario: Testing whether an available domain is available
+ Should return True
+
+ Mock response to mimic EPP Response
+ Validate CheckDomain command is called
+ Validate response given mock
+ """
+
+ def side_effect(_request, cleaned):
+ return MagicMock(
+ res_data=[
+ responses.check.CheckDomainResultData(
+ name="available.gov", avail=True, reason=None
+ )
+ ],
+ )
+
+ patcher = patch("registrar.models.domain.registry.send")
+ mocked_send = patcher.start()
+ mocked_send.side_effect = side_effect
+
+ available = Domain.available("available.gov")
+ mocked_send.assert_has_calls(
+ [
+ call(
+ commands.CheckDomain(
+ ["available.gov"],
+ ),
+ cleaned=True,
+ )
+ ]
+ )
+ self.assertTrue(available)
+ patcher.stop()
+
+ def test_domain_unavailable(self):
+ """
+ Scenario: Testing whether an unavailable domain is available
+ Should return False
+
+ Mock response to mimic EPP Response
+ Validate CheckDomain command is called
+ Validate response given mock
+ """
+
+ def side_effect(_request, cleaned):
+ return MagicMock(
+ res_data=[
+ responses.check.CheckDomainResultData(
+ name="unavailable.gov", avail=False, reason="In Use"
+ )
+ ],
+ )
+
+ patcher = patch("registrar.models.domain.registry.send")
+ mocked_send = patcher.start()
+ mocked_send.side_effect = side_effect
+
+ available = Domain.available("unavailable.gov")
+ mocked_send.assert_has_calls(
+ [
+ call(
+ commands.CheckDomain(
+ ["unavailable.gov"],
+ ),
+ cleaned=True,
+ )
+ ]
+ )
+ self.assertFalse(available)
+ patcher.stop()
+
+ def test_domain_available_with_value_error(self):
+ """
+ Scenario: Testing whether an invalid domain is available
+ Should throw ValueError
+
+ Validate ValueError is raised
+ """
+ with self.assertRaises(ValueError):
+ Domain.available("invalid-string")
+
+ def test_domain_available_unsuccessful(self):
+ """
+ Scenario: Testing behavior when registry raises a RegistryError
+
+ Validate RegistryError is raised
+ """
+
+ def side_effect(_request, cleaned):
+ raise RegistryError(code=ErrorCode.COMMAND_SYNTAX_ERROR)
+
+ patcher = patch("registrar.models.domain.registry.send")
+ mocked_send = patcher.start()
+ mocked_send.side_effect = side_effect
+
+ with self.assertRaises(RegistryError):
+ Domain.available("raises-error.gov")
+ patcher.stop()
+
+
class TestRegistrantContacts(MockEppLib):
"""Rule: Registrants may modify their WHOIS data"""
@@ -273,12 +458,17 @@ class TestRegistrantContacts(MockEppLib):
And the registrant is the admin on a domain
"""
super().setUp()
+ # Creates a domain with no contact associated to it
self.domain, _ = Domain.objects.get_or_create(name="security.gov")
+ # Creates a domain with an associated contact
+ self.domain_contact, _ = Domain.objects.get_or_create(name="freeman.gov")
def tearDown(self):
super().tearDown()
- # self.contactMailingAddressPatch.stop()
- # self.createContactPatch.stop()
+ self.domain._invalidate_cache()
+ self.domain_contact._invalidate_cache()
+ PublicContact.objects.all().delete()
+ Domain.objects.all().delete()
def test_no_security_email(self):
"""
@@ -524,6 +714,133 @@ class TestRegistrantContacts(MockEppLib):
"""
raise
+ def test_contact_getter_security(self):
+ security = PublicContact.ContactTypeChoices.SECURITY
+ # Create prexisting object
+ expected_contact = self.domain.map_epp_contact_to_public_contact(
+ self.mockSecurityContact,
+ contact_id="securityContact",
+ contact_type=security,
+ )
+
+ # Checks if we grabbed the correct PublicContact
+ self.assertEqual(
+ self.domain_contact.security_contact.email, expected_contact.email
+ )
+
+ expected_contact_db = PublicContact.objects.filter(
+ registry_id=self.domain_contact.security_contact.registry_id,
+ contact_type=security,
+ ).get()
+
+ self.assertEqual(self.domain_contact.security_contact, expected_contact_db)
+
+ self.mockedSendFunction.assert_has_calls(
+ [
+ call(
+ commands.InfoContact(id="securityContact", auth_info=None),
+ cleaned=True,
+ ),
+ ]
+ )
+ # Checks if we are receiving the cache we expect
+ cache = self.domain_contact._cache["contacts"]
+ self.assertEqual(cache.get(security), "securityContact")
+
+ def test_contact_getter_technical(self):
+ technical = PublicContact.ContactTypeChoices.TECHNICAL
+ expected_contact = self.domain.map_epp_contact_to_public_contact(
+ self.mockTechnicalContact,
+ contact_id="technicalContact",
+ contact_type=technical,
+ )
+
+ self.assertEqual(
+ self.domain_contact.technical_contact.email, expected_contact.email
+ )
+
+ # Checks if we grab the correct PublicContact
+ expected_contact_db = PublicContact.objects.filter(
+ registry_id=self.domain_contact.technical_contact.registry_id,
+ contact_type=technical,
+ ).get()
+
+ # Checks if we grab the correct PublicContact
+ self.assertEqual(self.domain_contact.technical_contact, expected_contact_db)
+ self.mockedSendFunction.assert_has_calls(
+ [
+ call(
+ commands.InfoContact(id="technicalContact", auth_info=None),
+ cleaned=True,
+ ),
+ ]
+ )
+ # Checks if we are receiving the cache we expect
+ cache = self.domain_contact._cache["contacts"]
+ self.assertEqual(cache.get(technical), "technicalContact")
+
+ def test_contact_getter_administrative(self):
+ administrative = PublicContact.ContactTypeChoices.ADMINISTRATIVE
+ expected_contact = self.domain.map_epp_contact_to_public_contact(
+ self.mockAdministrativeContact,
+ contact_id="adminContact",
+ contact_type=administrative,
+ )
+
+ self.assertEqual(
+ self.domain_contact.administrative_contact.email, expected_contact.email
+ )
+
+ expected_contact_db = PublicContact.objects.filter(
+ registry_id=self.domain_contact.administrative_contact.registry_id,
+ contact_type=administrative,
+ ).get()
+
+ # Checks if we grab the correct PublicContact
+ self.assertEqual(
+ self.domain_contact.administrative_contact, expected_contact_db
+ )
+ self.mockedSendFunction.assert_has_calls(
+ [
+ call(
+ commands.InfoContact(id="adminContact", auth_info=None),
+ cleaned=True,
+ ),
+ ]
+ )
+ # Checks if we are receiving the cache we expect
+ cache = self.domain_contact._cache["contacts"]
+ self.assertEqual(cache.get(administrative), "adminContact")
+
+ def test_contact_getter_registrant(self):
+ expected_contact = self.domain.map_epp_contact_to_public_contact(
+ self.mockRegistrantContact,
+ contact_id="regContact",
+ contact_type=PublicContact.ContactTypeChoices.REGISTRANT,
+ )
+
+ self.assertEqual(
+ self.domain_contact.registrant_contact.email, expected_contact.email
+ )
+
+ expected_contact_db = PublicContact.objects.filter(
+ registry_id=self.domain_contact.registrant_contact.registry_id,
+ contact_type=PublicContact.ContactTypeChoices.REGISTRANT,
+ ).get()
+
+ # Checks if we grab the correct PublicContact
+ self.assertEqual(self.domain_contact.registrant_contact, expected_contact_db)
+ self.mockedSendFunction.assert_has_calls(
+ [
+ call(
+ commands.InfoContact(id="regContact", auth_info=None),
+ cleaned=True,
+ ),
+ ]
+ )
+ # Checks if we are receiving the cache we expect.
+ self.assertEqual(self.domain_contact._cache["registrant"], expected_contact_db)
+
class TestRegistrantNameservers(TestCase):
"""Rule: Registrants may modify their nameservers"""
@@ -664,44 +981,372 @@ class TestRegistrantNameservers(TestCase):
raise
-class TestRegistrantDNSSEC(TestCase):
+class TestRegistrantDNSSEC(MockEppLib):
"""Rule: Registrants may modify their secure DNS data"""
+ # helper function to create UpdateDomainDNSSECExtention object for verification
+ def createUpdateExtension(self, dnssecdata: extensions.DNSSECExtension):
+ return commands.UpdateDomainDNSSECExtension(
+ maxSigLife=dnssecdata.maxSigLife,
+ dsData=dnssecdata.dsData,
+ keyData=dnssecdata.keyData,
+ remDsData=None,
+ remKeyData=None,
+ remAllDsKeyData=True,
+ )
+
def setUp(self):
"""
Background:
- Given the registrant is logged in
- And the registrant is the admin on a domain
+ Given the analyst is logged in
+ And a domain exists in the registry
"""
- pass
+ super().setUp()
+ # for the tests, need a domain in the unknown state
+ self.domain, _ = Domain.objects.get_or_create(name="fake.gov")
+ self.addDsData1 = {
+ "keyTag": 1234,
+ "alg": 3,
+ "digestType": 1,
+ "digest": "ec0bdd990b39feead889f0ba613db4adec0bdd99",
+ }
+ self.addDsData2 = {
+ "keyTag": 2345,
+ "alg": 3,
+ "digestType": 1,
+ "digest": "ec0bdd990b39feead889f0ba613db4adecb4adec",
+ }
+ self.keyDataDict = {
+ "flags": 257,
+ "protocol": 3,
+ "alg": 1,
+ "pubKey": "AQPJ////4Q==",
+ }
+ self.dnssecExtensionWithDsData: Mapping[str, Any] = {
+ "dsData": [common.DSData(**self.addDsData1)]
+ }
+ self.dnssecExtensionWithMultDsData: Mapping[str, Any] = {
+ "dsData": [
+ common.DSData(**self.addDsData1),
+ common.DSData(**self.addDsData2),
+ ],
+ }
+ self.dnssecExtensionWithKeyData: Mapping[str, Any] = {
+ "maxSigLife": 3215,
+ "keyData": [common.DNSSECKeyData(**self.keyDataDict)],
+ }
- @skip("not implemented yet")
- def test_user_adds_dns_data(self):
+ def tearDown(self):
+ Domain.objects.all().delete()
+ super().tearDown()
+
+ def test_user_adds_dnssec_data(self):
"""
- Scenario: Registrant adds DNS data
+ Scenario: Registrant adds DNSSEC data.
+ Verify that both the setter and getter are functioning properly
+
+ This test verifies:
+ 1 - setter calls UpdateDomain command
+ 2 - setter adds the UpdateDNSSECExtension extension to the command
+ 3 - setter causes the getter to call info domain on next get from cache
+ 4 - getter properly parses dnssecdata from InfoDomain response and sets to cache
"""
- raise
- @skip("not implemented yet")
+ # make sure to stop any other patcher so there are no conflicts
+ self.mockSendPatch.stop()
+
+ def side_effect(_request, cleaned):
+ return MagicMock(
+ res_data=[self.mockDataInfoDomain],
+ extensions=[
+ extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
+ ],
+ )
+
+ patcher = patch("registrar.models.domain.registry.send")
+ mocked_send = patcher.start()
+ mocked_send.side_effect = side_effect
+
+ self.domain.dnssecdata = self.dnssecExtensionWithDsData
+ # get the DNS SEC extension added to the UpdateDomain command and
+ # verify that it is properly sent
+ # args[0] is the _request sent to registry
+ args, _ = mocked_send.call_args
+ # assert that the extension matches
+ self.assertEquals(
+ args[0].extensions[0],
+ self.createUpdateExtension(
+ extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
+ ),
+ )
+ # test that the dnssecdata getter is functioning properly
+ dnssecdata_get = self.domain.dnssecdata
+ mocked_send.assert_has_calls(
+ [
+ call(
+ commands.UpdateDomain(
+ name="fake.gov",
+ nsset=None,
+ keyset=None,
+ registrant=None,
+ auth_info=None,
+ ),
+ cleaned=True,
+ ),
+ call(
+ commands.InfoDomain(
+ name="fake.gov",
+ ),
+ cleaned=True,
+ ),
+ ]
+ )
+
+ self.assertEquals(
+ dnssecdata_get.dsData, self.dnssecExtensionWithDsData["dsData"]
+ )
+
+ patcher.stop()
+
def test_dnssec_is_idempotent(self):
"""
Scenario: Registrant adds DNS data twice, due to a UI glitch
- """
# implementation note: this requires seeing what happens when these are actually
# sent like this, and then implementing appropriate mocks for any errors the
# registry normally sends in this case
- raise
- @skip("not implemented yet")
+ This test verifies:
+ 1 - UpdateDomain command called twice
+ 2 - setter causes the getter to call info domain on next get from cache
+ 3 - getter properly parses dnssecdata from InfoDomain response and sets to cache
+
+ """
+
+ # make sure to stop any other patcher so there are no conflicts
+ self.mockSendPatch.stop()
+
+ def side_effect(_request, cleaned):
+ return MagicMock(
+ res_data=[self.mockDataInfoDomain],
+ extensions=[
+ extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
+ ],
+ )
+
+ patcher = patch("registrar.models.domain.registry.send")
+ mocked_send = patcher.start()
+ mocked_send.side_effect = side_effect
+
+ # set the dnssecdata once
+ self.domain.dnssecdata = self.dnssecExtensionWithDsData
+ # set the dnssecdata again
+ self.domain.dnssecdata = self.dnssecExtensionWithDsData
+ # test that the dnssecdata getter is functioning properly
+ dnssecdata_get = self.domain.dnssecdata
+ mocked_send.assert_has_calls(
+ [
+ call(
+ commands.UpdateDomain(
+ name="fake.gov",
+ nsset=None,
+ keyset=None,
+ registrant=None,
+ auth_info=None,
+ ),
+ cleaned=True,
+ ),
+ call(
+ commands.UpdateDomain(
+ name="fake.gov",
+ nsset=None,
+ keyset=None,
+ registrant=None,
+ auth_info=None,
+ ),
+ cleaned=True,
+ ),
+ call(
+ commands.InfoDomain(
+ name="fake.gov",
+ ),
+ cleaned=True,
+ ),
+ ]
+ )
+
+ self.assertEquals(
+ dnssecdata_get.dsData, self.dnssecExtensionWithDsData["dsData"]
+ )
+
+ patcher.stop()
+
+ def test_user_adds_dnssec_data_multiple_dsdata(self):
+ """
+ Scenario: Registrant adds DNSSEC data with multiple DSData.
+ Verify that both the setter and getter are functioning properly
+
+ This test verifies:
+ 1 - setter calls UpdateDomain command
+ 2 - setter adds the UpdateDNSSECExtension extension to the command
+ 3 - setter causes the getter to call info domain on next get from cache
+ 4 - getter properly parses dnssecdata from InfoDomain response and sets to cache
+
+ """
+
+ # make sure to stop any other patcher so there are no conflicts
+ self.mockSendPatch.stop()
+
+ def side_effect(_request, cleaned):
+ return MagicMock(
+ res_data=[self.mockDataInfoDomain],
+ extensions=[
+ extensions.DNSSECExtension(**self.dnssecExtensionWithMultDsData)
+ ],
+ )
+
+ patcher = patch("registrar.models.domain.registry.send")
+ mocked_send = patcher.start()
+ mocked_send.side_effect = side_effect
+
+ self.domain.dnssecdata = self.dnssecExtensionWithMultDsData
+ # get the DNS SEC extension added to the UpdateDomain command
+ # and verify that it is properly sent
+ # args[0] is the _request sent to registry
+ args, _ = mocked_send.call_args
+ # assert that the extension matches
+ self.assertEquals(
+ args[0].extensions[0],
+ self.createUpdateExtension(
+ extensions.DNSSECExtension(**self.dnssecExtensionWithMultDsData)
+ ),
+ )
+ # test that the dnssecdata getter is functioning properly
+ dnssecdata_get = self.domain.dnssecdata
+ mocked_send.assert_has_calls(
+ [
+ call(
+ commands.UpdateDomain(
+ name="fake.gov",
+ nsset=None,
+ keyset=None,
+ registrant=None,
+ auth_info=None,
+ ),
+ cleaned=True,
+ ),
+ call(
+ commands.InfoDomain(
+ name="fake.gov",
+ ),
+ cleaned=True,
+ ),
+ ]
+ )
+
+ self.assertEquals(
+ dnssecdata_get.dsData, self.dnssecExtensionWithMultDsData["dsData"]
+ )
+
+ patcher.stop()
+
+ def test_user_adds_dnssec_keydata(self):
+ """
+ Scenario: Registrant adds DNSSEC data.
+ Verify that both the setter and getter are functioning properly
+
+ This test verifies:
+ 1 - setter calls UpdateDomain command
+ 2 - setter adds the UpdateDNSSECExtension extension to the command
+ 3 - setter causes the getter to call info domain on next get from cache
+ 4 - getter properly parses dnssecdata from InfoDomain response and sets to cache
+
+ """
+
+ # make sure to stop any other patcher so there are no conflicts
+ self.mockSendPatch.stop()
+
+ def side_effect(_request, cleaned):
+ return MagicMock(
+ res_data=[self.mockDataInfoDomain],
+ extensions=[
+ extensions.DNSSECExtension(**self.dnssecExtensionWithKeyData)
+ ],
+ )
+
+ patcher = patch("registrar.models.domain.registry.send")
+ mocked_send = patcher.start()
+ mocked_send.side_effect = side_effect
+
+ self.domain.dnssecdata = self.dnssecExtensionWithKeyData
+ # get the DNS SEC extension added to the UpdateDomain command
+ # and verify that it is properly sent
+ # args[0] is the _request sent to registry
+ args, _ = mocked_send.call_args
+ # assert that the extension matches
+ self.assertEquals(
+ args[0].extensions[0],
+ self.createUpdateExtension(
+ extensions.DNSSECExtension(**self.dnssecExtensionWithKeyData)
+ ),
+ )
+ # test that the dnssecdata getter is functioning properly
+ dnssecdata_get = self.domain.dnssecdata
+ mocked_send.assert_has_calls(
+ [
+ call(
+ commands.UpdateDomain(
+ name="fake.gov",
+ nsset=None,
+ keyset=None,
+ registrant=None,
+ auth_info=None,
+ ),
+ cleaned=True,
+ ),
+ call(
+ commands.InfoDomain(
+ name="fake.gov",
+ ),
+ cleaned=True,
+ ),
+ ]
+ )
+
+ self.assertEquals(
+ dnssecdata_get.keyData, self.dnssecExtensionWithKeyData["keyData"]
+ )
+
+ patcher.stop()
+
def test_update_is_unsuccessful(self):
"""
Scenario: An update to the dns data is unsuccessful
When an error is returned from epplibwrapper
Then a user-friendly error message is returned for displaying on the web
"""
- raise
+
+ # make sure to stop any other patcher so there are no conflicts
+ self.mockSendPatch.stop()
+
+ def side_effect(_request, cleaned):
+ raise RegistryError(code=ErrorCode.PARAMETER_VALUE_RANGE_ERROR)
+
+ patcher = patch("registrar.models.domain.registry.send")
+ mocked_send = patcher.start()
+ mocked_send.side_effect = side_effect
+
+ # if RegistryError is raised, view formats user-friendly
+ # error message if error is_client_error, is_session_error, or
+ # is_server_error; so test for those conditions
+ with self.assertRaises(RegistryError) as err:
+ self.domain.dnssecdata = self.dnssecExtensionWithDsData
+ self.assertTrue(
+ err.is_client_error() or err.is_session_error() or err.is_server_error()
+ )
+
+ patcher.stop()
class TestAnalystClientHold(MockEppLib):
@@ -940,7 +1585,7 @@ class TestAnalystLock(TestCase):
raise
-class TestAnalystDelete(TestCase):
+class TestAnalystDelete(MockEppLib):
"""Rule: Analysts may delete a domain"""
def setUp(self):
@@ -949,35 +1594,99 @@ class TestAnalystDelete(TestCase):
Given the analyst is logged in
And a domain exists in the registry
"""
- pass
+ super().setUp()
+ self.domain, _ = Domain.objects.get_or_create(
+ name="fake.gov", state=Domain.State.READY
+ )
+ self.domain_on_hold, _ = Domain.objects.get_or_create(
+ name="fake-on-hold.gov", state=Domain.State.ON_HOLD
+ )
+
+ def tearDown(self):
+ Domain.objects.all().delete()
+ super().tearDown()
- @skip("not implemented yet")
def test_analyst_deletes_domain(self):
"""
Scenario: Analyst permanently deletes a domain
- When `domain.delete()` is called
+ When `domain.deletedInEpp()` is called
Then `commands.DeleteDomain` is sent to the registry
And `state` is set to `DELETED`
"""
- raise
+ # Put the domain in client hold
+ self.domain.place_client_hold()
+ # Delete it...
+ self.domain.deletedInEpp()
+ self.mockedSendFunction.assert_has_calls(
+ [
+ call(
+ commands.DeleteDomain(name="fake.gov"),
+ cleaned=True,
+ )
+ ]
+ )
- @skip("not implemented yet")
- def test_analyst_deletes_domain_idempotent(self):
- """
- Scenario: Analyst tries to delete an already deleted domain
- Given `state` is already `DELETED`
- When `domain.delete()` is called
- Then `commands.DeleteDomain` is sent to the registry
- And Domain returns normally (without error)
- """
- raise
+ # Domain itself should not be deleted
+ self.assertNotEqual(self.domain, None)
+
+ # Domain should have the right state
+ self.assertEqual(self.domain.state, Domain.State.DELETED)
+
+ # Cache should be invalidated
+ self.assertEqual(self.domain._cache, {})
- @skip("not implemented yet")
def test_deletion_is_unsuccessful(self):
"""
Scenario: Domain deletion is unsuccessful
- When an error is returned from epplibwrapper
- Then a user-friendly error message is returned for displaying on the web
+ When a subdomain exists
+ Then a client error is returned of code 2305
And `state` is not set to `DELETED`
"""
- raise
+ # Desired domain
+ domain, _ = Domain.objects.get_or_create(
+ name="failDelete.gov", state=Domain.State.ON_HOLD
+ )
+ # Put the domain in client hold
+ domain.place_client_hold()
+
+ # Delete it
+ with self.assertRaises(RegistryError) as err:
+ domain.deletedInEpp()
+ self.assertTrue(
+ err.is_client_error()
+ and err.code == ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION
+ )
+ self.mockedSendFunction.assert_has_calls(
+ [
+ call(
+ commands.DeleteDomain(name="failDelete.gov"),
+ cleaned=True,
+ )
+ ]
+ )
+
+ # Domain itself should not be deleted
+ self.assertNotEqual(domain, None)
+ # State should not have changed
+ self.assertEqual(domain.state, Domain.State.ON_HOLD)
+
+ def test_deletion_ready_fsm_failure(self):
+ """
+ Scenario: Domain deletion is unsuccessful due to FSM rules
+ Given state is 'ready'
+ When `domain.deletedInEpp()` is called
+ and domain is of `state` is `READY`
+ Then an FSM error is returned
+ And `state` is not set to `DELETED`
+ """
+ self.assertEqual(self.domain.state, Domain.State.READY)
+ with self.assertRaises(TransitionNotAllowed) as err:
+ self.domain.deletedInEpp()
+ self.assertTrue(
+ err.is_client_error()
+ and err.code == ErrorCode.OBJECT_STATUS_PROHIBITS_OPERATION
+ )
+ # Domain should not be deleted
+ self.assertNotEqual(self.domain, None)
+ # Domain should have the right state
+ self.assertEqual(self.domain.state, Domain.State.READY)
diff --git a/src/registrar/tests/test_views.py b/src/registrar/tests/test_views.py
index c78d3c7fa..68aaf0ed8 100644
--- a/src/registrar/tests/test_views.py
+++ b/src/registrar/tests/test_views.py
@@ -1,11 +1,11 @@
from unittest import skip
-from unittest.mock import MagicMock, ANY
+from unittest.mock import MagicMock, ANY, patch
from django.conf import settings
from django.test import Client, TestCase
from django.urls import reverse
from django.contrib.auth import get_user_model
-from .common import completed_application
+from .common import MockEppLib, completed_application # type: ignore
from django_webtest import WebTest # type: ignore
import boto3_mocking # type: ignore
@@ -25,7 +25,6 @@ from registrar.models import (
from registrar.views.application import ApplicationWizard, Step
from .common import less_console_noise
-from .common import MockEppLib
class TestViews(TestCase):
@@ -1133,7 +1132,7 @@ class TestDomainPermissions(TestWithDomainPermissions):
self.assertEqual(response.status_code, 403)
-class TestDomainDetail(TestWithDomainPermissions, WebTest):
+class TestDomainDetail(TestWithDomainPermissions, WebTest, MockEppLib):
def setUp(self):
super().setUp()
self.app.set_user(self.user.username)
@@ -1426,6 +1425,40 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest):
)
self.assertContains(page, "Testy")
+ def test_domain_security_email_existing_security_contact(self):
+ """Can load domain's security email page."""
+ self.mockSendPatch = patch("registrar.models.domain.registry.send")
+ self.mockedSendFunction = self.mockSendPatch.start()
+ self.mockedSendFunction.side_effect = self.mockSend
+
+ domain_contact, _ = Domain.objects.get_or_create(name="freeman.gov")
+ # Add current user to this domain
+ _ = UserDomainRole(user=self.user, domain=domain_contact, role="admin").save()
+ page = self.client.get(
+ reverse("domain-security-email", kwargs={"pk": domain_contact.id})
+ )
+
+ # Loads correctly
+ self.assertContains(page, "Domain security email")
+ self.assertContains(page, "security@mail.gov")
+ self.mockSendPatch.stop()
+
+ def test_domain_security_email_no_security_contact(self):
+ """Loads a domain with no defined security email.
+ We should not show the default."""
+ self.mockSendPatch = patch("registrar.models.domain.registry.send")
+ self.mockedSendFunction = self.mockSendPatch.start()
+ self.mockedSendFunction.side_effect = self.mockSend
+
+ page = self.client.get(
+ reverse("domain-security-email", kwargs={"pk": self.domain.id})
+ )
+
+ # Loads correctly
+ self.assertContains(page, "Domain security email")
+ self.assertNotContains(page, "dotgov@cisa.dhs.gov")
+ self.mockSendPatch.stop()
+
def test_domain_security_email(self):
"""Can load domain's security email page."""
page = self.client.get(
@@ -1433,10 +1466,8 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest):
)
self.assertContains(page, "Domain security email")
- @skip("Ticket 912 needs to fix this one")
def test_domain_security_email_form(self):
"""Adding a security email works.
-
Uses self.app WebTest because we need to interact with forms.
"""
security_email_page = self.app.get(
@@ -1456,7 +1487,7 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest):
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_page = result.follow()
self.assertContains(
- success_page, "The security email for this domain have been updated"
+ success_page, "The security email for this domain has been updated"
)
def test_domain_overview_blocked_for_ineligible_user(self):
diff --git a/src/registrar/views/domain.py b/src/registrar/views/domain.py
index d88e0d443..442057d02 100644
--- a/src/registrar/views/domain.py
+++ b/src/registrar/views/domain.py
@@ -259,7 +259,11 @@ class DomainSecurityEmailView(DomainPermissionView, FormMixin):
"""The initial value for the form."""
domain = self.get_object()
initial = super().get_initial()
- initial["security_email"] = domain.security_contact.email
+ security_contact = domain.security_contact
+ if security_contact is None or security_contact.email == "dotgov@cisa.dhs.gov":
+ initial["security_email"] = None
+ return initial
+ initial["security_email"] = security_contact.email
return initial
def get_success_url(self):
@@ -288,7 +292,7 @@ class DomainSecurityEmailView(DomainPermissionView, FormMixin):
contact.save()
messages.success(
- self.request, "The security email for this domain have been updated."
+ self.request, "The security email for this domain has been updated."
)
# superclass has the redirect
diff --git a/src/requirements.txt b/src/requirements.txt
index 52ded59fc..ae6ed90df 100644
--- a/src/requirements.txt
+++ b/src/requirements.txt
@@ -7,7 +7,7 @@ certifi==2023.7.22 ; python_version >= '3.6'
cfenv==0.5.3
cffi==1.15.1
charset-normalizer==3.1.0 ; python_full_version >= '3.7.0'
-cryptography==41.0.3 ; python_version >= '3.7'
+cryptography==41.0.4 ; python_version >= '3.7'
defusedxml==0.7.1 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'
dj-database-url==2.0.0
dj-email-url==1.0.6
@@ -22,7 +22,7 @@ django-phonenumber-field[phonenumberslite]==7.1.0
django-widget-tweaks==1.4.12
environs[django]==9.5.0
faker==18.10.0
-git+https://github.com/cisagov/epplib.git@f818cbf0b069a12f03e1d72e4b9f4900924b832d#egg=fred-epplib
+git+https://github.com/cisagov/epplib.git@d56d183f1664f34c40ca9716a3a9a345f0ef561c#egg=fred-epplib
furl==2.1.3
future==0.18.3 ; python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'
gunicorn==20.1.0
@@ -49,5 +49,5 @@ setuptools==67.8.0 ; python_version >= '3.7'
six==1.16.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
sqlparse==0.4.4 ; python_version >= '3.5'
typing-extensions==4.6.3
-urllib3==1.26.16 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5'
+urllib3==1.26.17 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5'
whitenoise==6.4.0