added security_contact_registry_id to domain model; modified fetch_cache to write to db; modified getter for security_contact to failover to db if epp down

This commit is contained in:
David Kennedy 2023-12-22 21:44:03 -05:00
parent ed7868efba
commit 274e4951ac
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B
3 changed files with 105 additions and 9 deletions

View file

@ -31,7 +31,7 @@ from epplibwrapper import (
from registrar.models.utility.contact_error import ContactError, ContactErrorCodes
from django.db.models import DateField
from django.db.models import DateField, TextField
from .utility.domain_field import DomainField
from .utility.domain_helper import DomainHelper
from .utility.time_stamped_model import TimeStampedModel
@ -972,6 +972,12 @@ class Domain(TimeStampedModel, DomainHelper):
help_text=("Duplication of registry's expiration date saved for ease of reporting"),
)
security_contact_registry_id = TextField(
null=True,
help_text=("Duplication of registry's security contact id for when registry unavailable"),
editable=False,
)
def isActive(self):
return self.state == Domain.State.CREATED
@ -1113,15 +1119,21 @@ class Domain(TimeStampedModel, DomainHelper):
# Grab from cache
contacts = self._get_property(desired_property)
except KeyError as error:
logger.error(f"Could not find {contact_type_choice}: {error}")
return None
else:
cached_contact = self.get_contact_in_keys(contacts, contact_type_choice)
if cached_contact is None:
# TODO - #1103
raise ContactError("No contact was found in cache or the registry")
# if contact type is security, attempt to retrieve registry id
# for the security contact from domain.security_contact_registry_id
if contact_type_choice == PublicContact.ContactTypeChoices.SECURITY and self.security_contact_registry_id:
logger.info(self.security_contact_registry_id)
contacts = {PublicContact.ContactTypeChoices.SECURITY: self.security_contact_registry_id}
else:
logger.error(f"Could not find {contact_type_choice}: {error}")
return None
return cached_contact
cached_contact = self.get_contact_in_keys(contacts, contact_type_choice)
if cached_contact is None:
# TODO - #1103
raise ContactError("No contact was found in cache or the registry")
return cached_contact
def get_default_security_contact(self):
"""Gets the default security contact."""
@ -1609,6 +1621,7 @@ class Domain(TimeStampedModel, DomainHelper):
cleaned = self._clean_cache(cache, data_response)
self._update_hosts_and_contacts(cleaned, fetch_hosts, fetch_contacts)
self._update_hosts_and_ips_in_db(cleaned, fetch_hosts)
self._update_security_contact_in_db(cleaned, fetch_contacts)
self._update_dates(cleaned)
self._cache = cleaned
@ -1715,6 +1728,24 @@ class Domain(TimeStampedModel, DomainHelper):
for ip_address in cleaned_ips:
HostIP.objects.get_or_create(address=ip_address, host=host_in_db)
def _update_security_contact_in_db(self, cleaned, fetch_contacts):
"""Update security contact registry id in database if retrieved from registry.
If no value is retrieved from registry, set to empty string in db.
Parameters:
self: the domain to be updated with security from cleaned
cleaned: dict containing contact registry ids. Security contact is of type
PublicContact.ContactTypeChoices.SECURITY
fetch_contacts: boolean indicating whether or not fetch_contacts was called
"""
if fetch_contacts:
cleaned_contacts = cleaned["contacts"]
security_contact_registry_id = ""
if cleaned_contacts[PublicContact.ContactTypeChoices.SECURITY]:
security_contact_registry_id = cleaned_contacts[PublicContact.ContactTypeChoices.SECURITY]
self.security_contact_registry_id = security_contact_registry_id
self.save()
def _update_dates(self, cleaned):
"""Update dates (expiration and creation) from cleaned"""
requires_save = False