diff --git a/src/registrar/migrations/0059_domain_security_contact_registry_id.py b/src/registrar/migrations/0059_domain_security_contact_registry_id.py new file mode 100644 index 000000000..6f1204b2a --- /dev/null +++ b/src/registrar/migrations/0059_domain_security_contact_registry_id.py @@ -0,0 +1,21 @@ +# Generated by Django 4.2.7 on 2023-12-23 01:31 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("registrar", "0058_delete_nameserver"), + ] + + operations = [ + migrations.AddField( + model_name="domain", + name="security_contact_registry_id", + field=models.TextField( + editable=False, + help_text="Duplication of registry's security contact id for when registry unavailable", + null=True, + ), + ), + ] diff --git a/src/registrar/models/domain.py b/src/registrar/models/domain.py index 7c0566774..1dbe64087 100644 --- a/src/registrar/models/domain.py +++ b/src/registrar/models/domain.py @@ -31,7 +31,7 @@ from epplibwrapper import ( from registrar.models.utility.contact_error import ContactError, ContactErrorCodes -from django.db.models import DateField +from django.db.models import DateField, TextField from .utility.domain_field import DomainField from .utility.domain_helper import DomainHelper from .utility.time_stamped_model import TimeStampedModel @@ -972,6 +972,12 @@ class Domain(TimeStampedModel, DomainHelper): help_text=("Duplication of registry's expiration date saved for ease of reporting"), ) + security_contact_registry_id = TextField( + null=True, + help_text=("Duplication of registry's security contact id for when registry unavailable"), + editable=False, + ) + def isActive(self): return self.state == Domain.State.CREATED @@ -1113,15 +1119,21 @@ class Domain(TimeStampedModel, DomainHelper): # Grab from cache contacts = self._get_property(desired_property) except KeyError as error: - logger.error(f"Could not find {contact_type_choice}: {error}") - return None - else: - cached_contact = self.get_contact_in_keys(contacts, contact_type_choice) - if cached_contact is None: - # TODO - #1103 - raise ContactError("No contact was found in cache or the registry") + # if contact type is security, attempt to retrieve registry id + # for the security contact from domain.security_contact_registry_id + if contact_type_choice == PublicContact.ContactTypeChoices.SECURITY and self.security_contact_registry_id: + logger.info(self.security_contact_registry_id) + contacts = {PublicContact.ContactTypeChoices.SECURITY: self.security_contact_registry_id} + else: + logger.error(f"Could not find {contact_type_choice}: {error}") + return None - return cached_contact + cached_contact = self.get_contact_in_keys(contacts, contact_type_choice) + if cached_contact is None: + # TODO - #1103 + raise ContactError("No contact was found in cache or the registry") + + return cached_contact def get_default_security_contact(self): """Gets the default security contact.""" @@ -1609,6 +1621,7 @@ class Domain(TimeStampedModel, DomainHelper): cleaned = self._clean_cache(cache, data_response) self._update_hosts_and_contacts(cleaned, fetch_hosts, fetch_contacts) self._update_hosts_and_ips_in_db(cleaned, fetch_hosts) + self._update_security_contact_in_db(cleaned, fetch_contacts) self._update_dates(cleaned) self._cache = cleaned @@ -1715,6 +1728,24 @@ class Domain(TimeStampedModel, DomainHelper): for ip_address in cleaned_ips: HostIP.objects.get_or_create(address=ip_address, host=host_in_db) + def _update_security_contact_in_db(self, cleaned, fetch_contacts): + """Update security contact registry id in database if retrieved from registry. + If no value is retrieved from registry, set to empty string in db. + + Parameters: + self: the domain to be updated with security from cleaned + cleaned: dict containing contact registry ids. Security contact is of type + PublicContact.ContactTypeChoices.SECURITY + fetch_contacts: boolean indicating whether or not fetch_contacts was called + """ + if fetch_contacts: + cleaned_contacts = cleaned["contacts"] + security_contact_registry_id = "" + if cleaned_contacts[PublicContact.ContactTypeChoices.SECURITY]: + security_contact_registry_id = cleaned_contacts[PublicContact.ContactTypeChoices.SECURITY] + self.security_contact_registry_id = security_contact_registry_id + self.save() + def _update_dates(self, cleaned): """Update dates (expiration and creation) from cleaned""" requires_save = False diff --git a/src/registrar/tests/test_models_domain.py b/src/registrar/tests/test_models_domain.py index caea289a0..7b7c7acf8 100644 --- a/src/registrar/tests/test_models_domain.py +++ b/src/registrar/tests/test_models_domain.py @@ -711,6 +711,50 @@ class TestRegistrantContacts(MockEppLib): self.mockedSendFunction.assert_has_calls(expected_calls, any_order=True) self.assertEqual(PublicContact.objects.filter(domain=self.domain).count(), 1) + def test_security_email_returns_on_registry_error(self): + """ + Scenario: Security email previously set through EPP and stored in registrar's database. + Registry is unavailable and throws exception when attempting to build cache from + registry. Security email retrieved from database. + """ + # Use self.domain_contact which has been initialized with existing contacts, including securityContact + + # call get_security_email to initially set the security_contact_registry_id in the domain model + self.domain_contact.get_security_email() + # invalidate the cache so the next time get_security_email is called, it has to attempt to populate cache + self.domain_contact._invalidate_cache() + + # mock that registry throws an error on the EPP send + def side_effect(_request, cleaned): + raise RegistryError(code=ErrorCode.COMMAND_FAILED) + + patcher = patch("registrar.models.domain.registry.send") + mocked_send = patcher.start() + mocked_send.side_effect = side_effect + + # when get_security_email is called, the registry error will force the security contact + # to be retrieved using the security_contact_registry_id in the domain model + security_email = self.domain_contact.get_security_email() + + # assert that the proper security contact was retrieved by testing the email matches expected value + self.assertEqual(security_email, "security@mail.gov") + patcher.stop() + + def test_security_email_stored_on_fetch_cache(self): + """ + Scenario: Security email is stored in db when security contact is retrieved from fetch_cache. + Verify the success of this by asserting get_or_create calls to db. + The mocked data for the EPP calls for the freeman.gov domain returns a security + contact with registry id of securityContact when InfoContact is called + """ + # Use self.domain_contact which has been initialized with existing contacts, including securityContact + + # force fetch_cache to be called, which will return above documented mocked hosts + self.domain_contact.get_security_email() + + # assert that the security_contact_registry_id in the db matches "securityContact" + self.assertEqual(self.domain_contact.security_contact_registry_id, "securityContact") + def test_not_disclosed_on_other_contacts(self): """ Scenario: Registrant creates a new domain with multiple contacts