Merge branch 'main' into rjm/1148-ds-data

This commit is contained in:
David Kennedy 2023-10-23 19:55:33 -04:00
commit bdbff9260e
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B
35 changed files with 1105 additions and 305 deletions

View file

@ -260,7 +260,6 @@ class Domain(TimeStampedModel, DomainHelper):
"""Creates the host object in the registry
doesn't add the created host to the domain
returns ErrorCode (int)"""
logger.info("Creating host")
if addrs is not None:
addresses = [epp.Ip(addr=addr) for addr in addrs]
request = commands.CreateHost(name=host, addrs=addresses)
@ -782,7 +781,7 @@ class Domain(TimeStampedModel, DomainHelper):
and errorCode != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY
):
# TODO- ticket #433 look here for error handling
raise Exception("Unable to add contact to registry")
raise RegistryError(code=errorCode)
# contact doesn't exist on the domain yet
logger.info("_set_singleton_contact()-> contact has been added to the registry")
@ -1209,7 +1208,6 @@ class Domain(TimeStampedModel, DomainHelper):
count = 0
while not exitEarly and count < 3:
try:
logger.info("Getting domain info from epp")
req = commands.InfoDomain(name=self.name)
domainInfoResponse = registry.send(req, cleaned=True)
exitEarly = True
@ -1376,18 +1374,16 @@ class Domain(TimeStampedModel, DomainHelper):
"""creates a disclose object that can be added to a contact Create using
.disclose= <this function> on the command before sending.
if item is security email then make sure email is visable"""
isSecurity = contact.contact_type == contact.ContactTypeChoices.SECURITY
is_security = contact.contact_type == contact.ContactTypeChoices.SECURITY
DF = epp.DiscloseField
fields = {DF.FAX, DF.VOICE, DF.ADDR}
if not isSecurity or (
isSecurity and contact.email == PublicContact.get_default_security().email
):
fields.add(DF.EMAIL)
fields = {DF.EMAIL}
disclose = (
is_security and contact.email != PublicContact.get_default_security().email
)
# Will only disclose DF.EMAIL if its not the default
return epp.Disclose(
flag=False,
flag=disclose,
fields=fields,
types={DF.ADDR: "loc"},
)
def _make_epp_contact_postal_info(self, contact: PublicContact): # type: ignore
@ -1648,74 +1644,84 @@ class Domain(TimeStampedModel, DomainHelper):
"""Contact registry for info about a domain."""
try:
# get info from registry
dataResponse = self._get_or_create_domain()
data = dataResponse.res_data[0]
# extract properties from response
# (Ellipsis is used to mean "null")
cache = {
"auth_info": getattr(data, "auth_info", ...),
"_contacts": getattr(data, "contacts", ...),
"cr_date": getattr(data, "cr_date", ...),
"ex_date": getattr(data, "ex_date", ...),
"_hosts": getattr(data, "hosts", ...),
"name": getattr(data, "name", ...),
"registrant": getattr(data, "registrant", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
}
# remove null properties (to distinguish between "a value of None" and null)
cleaned = {k: v for k, v in cache.items() if v is not ...}
data_response = self._get_or_create_domain()
cache = self._extract_data_from_response(data_response)
# remove null properties (to distinguish between "a value of None" and null)
cleaned = self._remove_null_properties(cache)
# statuses can just be a list no need to keep the epp object
if "statuses" in cleaned:
cleaned["statuses"] = [status.state for status in cleaned["statuses"]]
# get extensions info, if there is any
# DNSSECExtension is one possible extension, make sure to handle
# only DNSSECExtension and not other type extensions
returned_extensions = dataResponse.extensions
cleaned["dnssecdata"] = None
for extension in returned_extensions:
if isinstance(extension, extensions.DNSSECExtension):
cleaned["dnssecdata"] = extension
cleaned["dnssecdata"] = self._get_dnssec_data(data_response.extensions)
# Capture and store old hosts and contacts from cache if they exist
old_cache_hosts = self._cache.get("hosts")
old_cache_contacts = self._cache.get("contacts")
# get contact info, if there are any
if (
fetch_contacts
and "_contacts" in cleaned
and isinstance(cleaned["_contacts"], list)
and len(cleaned["_contacts"]) > 0
):
cleaned["contacts"] = self._fetch_contacts(cleaned["_contacts"])
# We're only getting contacts, so retain the old
# hosts that existed in cache (if they existed)
# and pass them along.
if fetch_contacts:
cleaned["contacts"] = self._get_contacts(cleaned.get("_contacts", []))
if old_cache_hosts is not None:
logger.debug("resetting cleaned['hosts'] to old_cache_hosts")
cleaned["hosts"] = old_cache_hosts
# get nameserver info, if there are any
if (
fetch_hosts
and "_hosts" in cleaned
and isinstance(cleaned["_hosts"], list)
and len(cleaned["_hosts"])
):
cleaned["hosts"] = self._fetch_hosts(cleaned["_hosts"])
# We're only getting hosts, so retain the old
# contacts that existed in cache (if they existed)
# and pass them along.
if fetch_hosts:
cleaned["hosts"] = self._get_hosts(cleaned.get("_hosts", []))
if old_cache_contacts is not None:
cleaned["contacts"] = old_cache_contacts
# replace the prior cache with new data
self._cache = cleaned
except RegistryError as e:
logger.error(e)
def _extract_data_from_response(self, data_response):
data = data_response.res_data[0]
return {
"auth_info": getattr(data, "auth_info", ...),
"_contacts": getattr(data, "contacts", ...),
"cr_date": getattr(data, "cr_date", ...),
"ex_date": getattr(data, "ex_date", ...),
"_hosts": getattr(data, "hosts", ...),
"name": getattr(data, "name", ...),
"registrant": getattr(data, "registrant", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
}
def _remove_null_properties(self, cache):
return {k: v for k, v in cache.items() if v is not ...}
def _get_dnssec_data(self, response_extensions):
# get extensions info, if there is any
# DNSSECExtension is one possible extension, make sure to handle
# only DNSSECExtension and not other type extensions
dnssec_data = None
for extension in response_extensions:
if isinstance(extension, extensions.DNSSECExtension):
dnssec_data = extension
return dnssec_data
def _get_contacts(self, contacts):
choices = PublicContact.ContactTypeChoices
# We expect that all these fields get populated,
# so we can create these early, rather than waiting.
cleaned_contacts = {
choices.ADMINISTRATIVE: None,
choices.SECURITY: None,
choices.TECHNICAL: None,
}
if contacts and isinstance(contacts, list) and len(contacts) > 0:
cleaned_contacts = self._fetch_contacts(contacts)
return cleaned_contacts
def _get_hosts(self, hosts):
cleaned_hosts = []
if hosts and isinstance(hosts, list):
cleaned_hosts = self._fetch_hosts(hosts)
return cleaned_hosts
def _get_or_create_public_contact(self, public_contact: PublicContact):
"""Tries to find a PublicContact object in our DB.
If it can't, it'll create it. Returns PublicContact"""