Merge branch 'main' into dk/1028-domain-available

This commit is contained in:
David Kennedy 2023-10-02 04:34:27 -04:00
commit e686eafb0b
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B
3 changed files with 75 additions and 67 deletions

View file

@ -974,75 +974,38 @@ class Domain(TimeStampedModel, DomainHelper):
# statuses can just be a list no need to keep the epp object
if "statuses" in cleaned.keys():
cleaned["statuses"] = [status.state for status in cleaned["statuses"]]
# Capture and store old hosts and contacts from cache if they exist
old_cache_hosts = self._cache.get("hosts")
old_cache_contacts = self._cache.get("contacts")
# get contact info, if there are any
if (
# fetch_contacts and
"_contacts" in cleaned
fetch_contacts
and "_contacts" in cleaned
and isinstance(cleaned["_contacts"], list)
and len(cleaned["_contacts"])
):
cleaned["contacts"] = []
for domainContact in cleaned["_contacts"]:
# we do not use _get_or_create_* because we expect the object we
# just asked the registry for still exists --
# if not, that's a problem
# TODO- discuss-should we check if contact is in public contacts
# and add it if not- this is really to keep in mine the transisiton
req = commands.InfoContact(id=domainContact.contact)
data = registry.send(req, cleaned=True).res_data[0]
# extract properties from response
# (Ellipsis is used to mean "null")
# convert this to use PublicContactInstead
contact = {
"id": domainContact.contact,
"type": domainContact.type,
"auth_info": getattr(data, "auth_info", ...),
"cr_date": getattr(data, "cr_date", ...),
"disclose": getattr(data, "disclose", ...),
"email": getattr(data, "email", ...),
"fax": getattr(data, "fax", ...),
"postal_info": getattr(data, "postal_info", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
"voice": getattr(data, "voice", ...),
}
cleaned["contacts"].append(
{k: v for k, v in contact.items() if v is not ...}
)
cleaned["contacts"] = self._fetch_contacts(cleaned["_contacts"])
# We're only getting contacts, so retain the old
# hosts that existed in cache (if they existed)
# and pass them along.
if old_cache_hosts is not None:
cleaned["hosts"] = old_cache_hosts
# get nameserver info, if there are any
if (
# fetch_hosts and
"_hosts" in cleaned
fetch_hosts
and "_hosts" in cleaned
and isinstance(cleaned["_hosts"], list)
and len(cleaned["_hosts"])
):
# TODO- add elif in cache set it to be the old cache value
# no point in removing
cleaned["hosts"] = []
for name in cleaned["_hosts"]:
# we do not use _get_or_create_* because we expect the object we
# just asked the registry for still exists --
# if not, that's a problem
req = commands.InfoHost(name=name)
data = registry.send(req, cleaned=True).res_data[0]
# extract properties from response
# (Ellipsis is used to mean "null")
host = {
"name": name,
"addrs": getattr(data, "addrs", ...),
"cr_date": getattr(data, "cr_date", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
}
cleaned["hosts"].append(
{k: v for k, v in host.items() if v is not ...}
)
cleaned["hosts"] = self._fetch_hosts(cleaned["_hosts"])
# We're only getting hosts, so retain the old
# contacts that existed in cache (if they existed)
# and pass them along.
if old_cache_contacts is not None:
cleaned["contacts"] = old_cache_contacts
# replace the prior cache with new data
self._cache = cleaned
@ -1050,6 +1013,46 @@ class Domain(TimeStampedModel, DomainHelper):
except RegistryError as e:
logger.error(e)
def _fetch_contacts(self, contact_data):
"""Fetch contact info."""
contacts = []
for domainContact in contact_data:
req = commands.InfoContact(id=domainContact.contact)
data = registry.send(req, cleaned=True).res_data[0]
contact = {
"id": domainContact.contact,
"type": domainContact.type,
"auth_info": getattr(data, "auth_info", ...),
"cr_date": getattr(data, "cr_date", ...),
"disclose": getattr(data, "disclose", ...),
"email": getattr(data, "email", ...),
"fax": getattr(data, "fax", ...),
"postal_info": getattr(data, "postal_info", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
"voice": getattr(data, "voice", ...),
}
contacts.append({k: v for k, v in contact.items() if v is not ...})
return contacts
def _fetch_hosts(self, host_data):
"""Fetch host info."""
hosts = []
for name in host_data:
req = commands.InfoHost(name=name)
data = registry.send(req, cleaned=True).res_data[0]
host = {
"name": name,
"addrs": getattr(data, "addrs", ...),
"cr_date": getattr(data, "cr_date", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
}
hosts.append({k: v for k, v in host.items() if v is not ...})
return hosts
def _invalidate_cache(self):
"""Remove cache data when updates are made."""
self._cache = {}

View file

@ -52,8 +52,6 @@ class TestDomainCache(MockEppLib):
commands.InfoDomain(name="igorville.gov", auth_info=None),
cleaned=True,
),
call(commands.InfoContact(id="123", auth_info=None), cleaned=True),
call(commands.InfoHost(name="fake.host.com"), cleaned=True),
],
any_order=False, # Ensure calls are in the specified order
)
@ -75,8 +73,6 @@ class TestDomainCache(MockEppLib):
call(
commands.InfoDomain(name="igorville.gov", auth_info=None), cleaned=True
),
call(commands.InfoContact(id="123", auth_info=None), cleaned=True),
call(commands.InfoHost(name="fake.host.com"), cleaned=True),
]
self.mockedSendFunction.assert_has_calls(expectedCalls)
@ -111,6 +107,19 @@ class TestDomainCache(MockEppLib):
# get and check hosts is set correctly
domain._get_property("hosts")
self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
self.assertEqual(domain._cache["contacts"], [expectedContactsDict])
# invalidate cache
domain._cache = {}
# get host
domain._get_property("hosts")
self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
# get contacts
domain._get_property("contacts")
self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
self.assertEqual(domain._cache["contacts"], [expectedContactsDict])
def tearDown(self) -> None:
Domain.objects.all().delete()
@ -171,8 +180,6 @@ class TestDomainCreation(MockEppLib):
commands.InfoDomain(name="beef-tongue.gov", auth_info=None),
cleaned=True,
),
call(commands.InfoContact(id="123", auth_info=None), cleaned=True),
call(commands.InfoHost(name="fake.host.com"), cleaned=True),
],
any_order=False, # Ensure calls are in the specified order
)
@ -222,8 +229,6 @@ class TestDomainStatuses(MockEppLib):
commands.InfoDomain(name="chicken-liver.gov", auth_info=None),
cleaned=True,
),
call(commands.InfoContact(id="123", auth_info=None), cleaned=True),
call(commands.InfoHost(name="fake.host.com"), cleaned=True),
],
any_order=False, # Ensure calls are in the specified order
)

View file

@ -7,7 +7,7 @@ certifi==2023.7.22 ; python_version >= '3.6'
cfenv==0.5.3
cffi==1.15.1
charset-normalizer==3.1.0 ; python_full_version >= '3.7.0'
cryptography==41.0.3 ; python_version >= '3.7'
cryptography==41.0.4 ; python_version >= '3.7'
defusedxml==0.7.1 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'
dj-database-url==2.0.0
dj-email-url==1.0.6