Merge remote-tracking branch 'origin/main' into rjm/998-domain-growth-export

This commit is contained in:
Rachid Mrad 2023-12-27 15:25:43 -05:00
commit 9e7802c349
No known key found for this signature in database
GPG key ID: EF38E4CEC4A8F3CF
40 changed files with 1522 additions and 919 deletions

View file

@ -8,6 +8,7 @@ from typing import Optional
from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore
from django.db import models
from django.utils import timezone
from typing import Any
@ -201,6 +202,14 @@ class Domain(TimeStampedModel, DomainHelper):
"""Get the `cr_date` element from the registry."""
return self._get_property("cr_date")
@creation_date.setter # type: ignore
def creation_date(self, cr_date: date):
"""
Direct setting of the creation date in the registry is not implemented.
Creation date can only be set by registry."""
raise NotImplementedError()
@Cache
def last_transferred_date(self) -> date:
"""Get the `tr_date` element from the registry."""
@ -976,6 +985,16 @@ class Domain(TimeStampedModel, DomainHelper):
def isActive(self):
return self.state == Domain.State.CREATED
def is_expired(self):
"""
Check if the domain's expiration date is in the past.
Returns True if expired, False otherwise.
"""
if self.expiration_date is None:
return True
now = timezone.now().date()
return self.expiration_date < now
def map_epp_contact_to_public_contact(self, contact: eppInfo.InfoContactResultData, contact_id, contact_type):
"""Maps the Epp contact representation to a PublicContact object.
@ -1601,38 +1620,11 @@ class Domain(TimeStampedModel, DomainHelper):
def _fetch_cache(self, fetch_hosts=False, fetch_contacts=False):
"""Contact registry for info about a domain."""
try:
# get info from registry
data_response = self._get_or_create_domain()
cache = self._extract_data_from_response(data_response)
# remove null properties (to distinguish between "a value of None" and null)
cleaned = self._remove_null_properties(cache)
if "statuses" in cleaned:
cleaned["statuses"] = [status.state for status in cleaned["statuses"]]
cleaned["dnssecdata"] = self._get_dnssec_data(data_response.extensions)
# Capture and store old hosts and contacts from cache if they exist
old_cache_hosts = self._cache.get("hosts")
old_cache_contacts = self._cache.get("contacts")
if fetch_contacts:
cleaned["contacts"] = self._get_contacts(cleaned.get("_contacts", []))
if old_cache_hosts is not None:
logger.debug("resetting cleaned['hosts'] to old_cache_hosts")
cleaned["hosts"] = old_cache_hosts
if fetch_hosts:
cleaned["hosts"] = self._get_hosts(cleaned.get("_hosts", []))
if old_cache_contacts is not None:
cleaned["contacts"] = old_cache_contacts
# if expiration date from registry does not match what is in db,
# update the db
if "ex_date" in cleaned and cleaned["ex_date"] != self.expiration_date:
self.expiration_date = cleaned["ex_date"]
self.save()
cleaned = self._clean_cache(cache, data_response)
self._update_hosts_and_contacts(cleaned, fetch_hosts, fetch_contacts)
self._update_dates(cleaned)
self._cache = cleaned
@ -1640,6 +1632,7 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error(e)
def _extract_data_from_response(self, data_response):
"""extract data from response from registry"""
data = data_response.res_data[0]
return {
"auth_info": getattr(data, "auth_info", ...),
@ -1654,6 +1647,15 @@ class Domain(TimeStampedModel, DomainHelper):
"up_date": getattr(data, "up_date", ...),
}
def _clean_cache(self, cache, data_response):
"""clean up the cache"""
# remove null properties (to distinguish between "a value of None" and null)
cleaned = self._remove_null_properties(cache)
if "statuses" in cleaned:
cleaned["statuses"] = [status.state for status in cleaned["statuses"]]
cleaned["dnssecdata"] = self._get_dnssec_data(data_response.extensions)
return cleaned
def _remove_null_properties(self, cache):
return {k: v for k, v in cache.items() if v is not ...}
@ -1667,6 +1669,42 @@ class Domain(TimeStampedModel, DomainHelper):
dnssec_data = extension
return dnssec_data
def _update_hosts_and_contacts(self, cleaned, fetch_hosts, fetch_contacts):
"""Capture and store old hosts and contacts from cache if they don't exist"""
old_cache_hosts = self._cache.get("hosts")
old_cache_contacts = self._cache.get("contacts")
if fetch_contacts:
cleaned["contacts"] = self._get_contacts(cleaned.get("_contacts", []))
if old_cache_hosts is not None:
logger.debug("resetting cleaned['hosts'] to old_cache_hosts")
cleaned["hosts"] = old_cache_hosts
if fetch_hosts:
cleaned["hosts"] = self._get_hosts(cleaned.get("_hosts", []))
if old_cache_contacts is not None:
cleaned["contacts"] = old_cache_contacts
def _update_dates(self, cleaned):
"""Update dates (expiration and creation) from cleaned"""
requires_save = False
# if expiration date from registry does not match what is in db,
# update the db
if "ex_date" in cleaned and cleaned["ex_date"] != self.expiration_date:
self.expiration_date = cleaned["ex_date"]
requires_save = True
# if creation_date from registry does not match what is in db,
# update the db
if "cr_date" in cleaned and cleaned["cr_date"] != self.created_at:
self.created_at = cleaned["cr_date"]
requires_save = True
# if either registration date or creation date need updating
if requires_save:
self.save()
def _get_contacts(self, contacts):
choices = PublicContact.ContactTypeChoices
# We expect that all these fields get populated,