mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-08-13 13:09:41 +02:00
Merge pull request #724 from cisagov/ab/domain-cache
Add domain caching layer and tests
This commit is contained in:
commit
abf198d5b2
2 changed files with 402 additions and 20 deletions
|
@ -1,6 +1,7 @@
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from datetime import date
|
from datetime import date
|
||||||
|
from string import digits
|
||||||
from django_fsm import FSMField # type: ignore
|
from django_fsm import FSMField # type: ignore
|
||||||
|
|
||||||
from django.db import models
|
from django.db import models
|
||||||
|
@ -8,6 +9,9 @@ from django.db import models
|
||||||
from epplibwrapper import (
|
from epplibwrapper import (
|
||||||
CLIENT as registry,
|
CLIENT as registry,
|
||||||
commands,
|
commands,
|
||||||
|
common as epp,
|
||||||
|
RegistryError,
|
||||||
|
ErrorCode,
|
||||||
)
|
)
|
||||||
|
|
||||||
from .utility.domain_field import DomainField
|
from .utility.domain_field import DomainField
|
||||||
|
@ -41,6 +45,10 @@ class Domain(TimeStampedModel, DomainHelper):
|
||||||
domain meets the required checks.
|
domain meets the required checks.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
self._cache = {}
|
||||||
|
super(Domain, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
class Status(models.TextChoices):
|
class Status(models.TextChoices):
|
||||||
"""
|
"""
|
||||||
The status codes we can receive from the registry.
|
The status codes we can receive from the registry.
|
||||||
|
@ -106,20 +114,52 @@ class Domain(TimeStampedModel, DomainHelper):
|
||||||
# the state is indeterminate
|
# the state is indeterminate
|
||||||
UNKNOWN = "unknown"
|
UNKNOWN = "unknown"
|
||||||
|
|
||||||
|
class Cache(property):
|
||||||
|
"""
|
||||||
|
Python descriptor to turn class methods into properties.
|
||||||
|
|
||||||
|
The purpose of subclassing `property` rather than using it directly
|
||||||
|
as a decorator (`@Cache`) is to insert generic code to run
|
||||||
|
before or after _all_ properties are accessed, modified, or deleted.
|
||||||
|
|
||||||
|
As an example:
|
||||||
|
|
||||||
|
domain = Domain(name="example.gov")
|
||||||
|
domain.save()
|
||||||
|
<--- insert code here
|
||||||
|
date = domain.creation_date
|
||||||
|
<--- or here
|
||||||
|
(...other stuff...)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __get__(self, obj, objtype=None):
|
||||||
|
"""Called during get. Example: `r = domain.registrant`."""
|
||||||
|
return super().__get__(obj, objtype)
|
||||||
|
|
||||||
|
def __set__(self, obj, value):
|
||||||
|
"""Called during set. Example: `domain.registrant = 'abc123'`."""
|
||||||
|
super().__set__(obj, value)
|
||||||
|
# always invalidate cache after sending updates to the registry
|
||||||
|
obj._invalidate_cache()
|
||||||
|
|
||||||
|
def __delete__(self, obj):
|
||||||
|
"""Called during delete. Example: `del domain.registrant`."""
|
||||||
|
super().__delete__(obj)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def available(cls, domain: str) -> bool:
|
def available(cls, domain: str) -> bool:
|
||||||
"""Check if a domain is available."""
|
"""Check if a domain is available."""
|
||||||
if not cls.string_could_be_domain(domain):
|
if not cls.string_could_be_domain(domain):
|
||||||
raise ValueError("Not a valid domain: %s" % str(domain))
|
raise ValueError("Not a valid domain: %s" % str(domain))
|
||||||
req = commands.CheckDomain([domain])
|
req = commands.CheckDomain([domain])
|
||||||
return registry.send(req).res_data[0].avail
|
return registry.send(req, cleaned=True).res_data[0].avail
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def registered(cls, domain: str) -> bool:
|
def registered(cls, domain: str) -> bool:
|
||||||
"""Check if a domain is _not_ available."""
|
"""Check if a domain is _not_ available."""
|
||||||
return not cls.available(domain)
|
return not cls.available(domain)
|
||||||
|
|
||||||
@property
|
@Cache
|
||||||
def contacts(self) -> dict[str, str]:
|
def contacts(self) -> dict[str, str]:
|
||||||
"""
|
"""
|
||||||
Get a dictionary of registry IDs for the contacts for this domain.
|
Get a dictionary of registry IDs for the contacts for this domain.
|
||||||
|
@ -131,31 +171,31 @@ class Domain(TimeStampedModel, DomainHelper):
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@property
|
@Cache
|
||||||
def creation_date(self) -> date:
|
def creation_date(self) -> date:
|
||||||
"""Get the `cr_date` element from the registry."""
|
"""Get the `cr_date` element from the registry."""
|
||||||
raise NotImplementedError()
|
return self._get_property("cr_date")
|
||||||
|
|
||||||
@property
|
@Cache
|
||||||
def last_transferred_date(self) -> date:
|
def last_transferred_date(self) -> date:
|
||||||
"""Get the `tr_date` element from the registry."""
|
"""Get the `tr_date` element from the registry."""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@property
|
@Cache
|
||||||
def last_updated_date(self) -> date:
|
def last_updated_date(self) -> date:
|
||||||
"""Get the `up_date` element from the registry."""
|
"""Get the `up_date` element from the registry."""
|
||||||
raise NotImplementedError()
|
return self._get_property("up_date")
|
||||||
|
|
||||||
@property
|
@Cache
|
||||||
def expiration_date(self) -> date:
|
def expiration_date(self) -> date:
|
||||||
"""Get or set the `ex_date` element from the registry."""
|
"""Get or set the `ex_date` element from the registry."""
|
||||||
raise NotImplementedError()
|
return self._get_property("ex_date")
|
||||||
|
|
||||||
@expiration_date.setter # type: ignore
|
@expiration_date.setter # type: ignore
|
||||||
def expiration_date(self, ex_date: date):
|
def expiration_date(self, ex_date: date):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@property
|
@Cache
|
||||||
def password(self) -> str:
|
def password(self) -> str:
|
||||||
"""
|
"""
|
||||||
Get the `auth_info.pw` element from the registry. Not a real password.
|
Get the `auth_info.pw` element from the registry. Not a real password.
|
||||||
|
@ -167,7 +207,7 @@ class Domain(TimeStampedModel, DomainHelper):
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@property
|
@Cache
|
||||||
def nameservers(self) -> list[tuple[str]]:
|
def nameservers(self) -> list[tuple[str]]:
|
||||||
"""
|
"""
|
||||||
Get or set a complete list of nameservers for this domain.
|
Get or set a complete list of nameservers for this domain.
|
||||||
|
@ -191,7 +231,7 @@ class Domain(TimeStampedModel, DomainHelper):
|
||||||
# TODO: call EPP to set this info.
|
# TODO: call EPP to set this info.
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@property
|
@Cache
|
||||||
def statuses(self) -> list[str]:
|
def statuses(self) -> list[str]:
|
||||||
"""
|
"""
|
||||||
Get or set the domain `status` elements from the registry.
|
Get or set the domain `status` elements from the registry.
|
||||||
|
@ -209,25 +249,31 @@ class Domain(TimeStampedModel, DomainHelper):
|
||||||
# some statuses cannot be set by the client at all
|
# some statuses cannot be set by the client at all
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@property
|
@Cache
|
||||||
def registrant_contact(self) -> PublicContact:
|
def registrant_contact(self) -> PublicContact:
|
||||||
"""Get or set the registrant for this domain."""
|
"""Get or set the registrant for this domain."""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@registrant_contact.setter # type: ignore
|
@registrant_contact.setter # type: ignore
|
||||||
def registrant_contact(self, contact: PublicContact):
|
def registrant_contact(self, contact: PublicContact):
|
||||||
|
# get id from PublicContact->.registry_id
|
||||||
|
# call UpdateDomain() command with registrant as parameter
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@property
|
@Cache
|
||||||
def administrative_contact(self) -> PublicContact:
|
def administrative_contact(self) -> PublicContact:
|
||||||
"""Get or set the admin contact for this domain."""
|
"""Get or set the admin contact for this domain."""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@administrative_contact.setter # type: ignore
|
@administrative_contact.setter # type: ignore
|
||||||
def administrative_contact(self, contact: PublicContact):
|
def administrative_contact(self, contact: PublicContact):
|
||||||
|
# call CreateContact, if contact doesn't exist yet for domain
|
||||||
|
# call UpdateDomain with contact,
|
||||||
|
# type options are[admin, billing, tech, security]
|
||||||
|
# use admin as type parameter for this contact
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@property
|
@Cache
|
||||||
def security_contact(self) -> PublicContact:
|
def security_contact(self) -> PublicContact:
|
||||||
"""Get or set the security contact for this domain."""
|
"""Get or set the security contact for this domain."""
|
||||||
# TODO: replace this with a real implementation
|
# TODO: replace this with a real implementation
|
||||||
|
@ -241,7 +287,7 @@ class Domain(TimeStampedModel, DomainHelper):
|
||||||
# TODO: replace this with a real implementation
|
# TODO: replace this with a real implementation
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@property
|
@Cache
|
||||||
def technical_contact(self) -> PublicContact:
|
def technical_contact(self) -> PublicContact:
|
||||||
"""Get or set the tech contact for this domain."""
|
"""Get or set the tech contact for this domain."""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
@ -300,3 +346,224 @@ class Domain(TimeStampedModel, DomainHelper):
|
||||||
|
|
||||||
# ForeignKey on DomainInvitation creates an "invitations" member for
|
# ForeignKey on DomainInvitation creates an "invitations" member for
|
||||||
# all of the invitations that have been sent for this domain
|
# all of the invitations that have been sent for this domain
|
||||||
|
|
||||||
|
def _validate_host_tuples(self, hosts: list[tuple[str]]):
|
||||||
|
"""
|
||||||
|
Helper function. Validate hostnames and IP addresses.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError if hostname or IP address appears invalid or mismatched.
|
||||||
|
"""
|
||||||
|
for host in hosts:
|
||||||
|
hostname = host[0].lower()
|
||||||
|
addresses: tuple[str] = host[1:] # type: ignore
|
||||||
|
if not bool(Domain.HOST_REGEX.match(hostname)):
|
||||||
|
raise ValueError("Invalid hostname: %s." % hostname)
|
||||||
|
if len(hostname) > Domain.MAX_LENGTH:
|
||||||
|
raise ValueError("Too long hostname: %s" % hostname)
|
||||||
|
|
||||||
|
is_subordinate = hostname.split(".", 1)[-1] == self.name
|
||||||
|
if is_subordinate and len(addresses) == 0:
|
||||||
|
raise ValueError(
|
||||||
|
"Must supply IP addresses for subordinate host %s" % hostname
|
||||||
|
)
|
||||||
|
if not is_subordinate and len(addresses) > 0:
|
||||||
|
raise ValueError("Must not supply IP addresses for %s" % hostname)
|
||||||
|
|
||||||
|
for address in addresses:
|
||||||
|
allow = set(":." + digits)
|
||||||
|
if any(c not in allow for c in address):
|
||||||
|
raise ValueError("Invalid IP address: %s." % address)
|
||||||
|
|
||||||
|
def _get_or_create_domain(self):
|
||||||
|
"""Try to fetch info about this domain. Create it if it does not exist."""
|
||||||
|
already_tried_to_create = False
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
req = commands.InfoDomain(name=self.name)
|
||||||
|
return registry.send(req, cleaned=True).res_data[0]
|
||||||
|
except RegistryError as e:
|
||||||
|
if already_tried_to_create:
|
||||||
|
raise e
|
||||||
|
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST:
|
||||||
|
# avoid infinite loop
|
||||||
|
already_tried_to_create = True
|
||||||
|
registrant = self._get_or_create_contact(
|
||||||
|
PublicContact.get_default_registrant()
|
||||||
|
)
|
||||||
|
req = commands.CreateDomain(
|
||||||
|
name=self.name,
|
||||||
|
registrant=registrant.id,
|
||||||
|
auth_info=epp.DomainAuthInfo(
|
||||||
|
pw="2fooBAR123fooBaz"
|
||||||
|
), # not a password
|
||||||
|
)
|
||||||
|
registry.send(req, cleaned=True)
|
||||||
|
# no error, so go ahead and update state
|
||||||
|
self.state = Domain.State.CREATED
|
||||||
|
self.save()
|
||||||
|
else:
|
||||||
|
raise e
|
||||||
|
|
||||||
|
def _get_or_create_contact(self, contact: PublicContact):
|
||||||
|
"""Try to fetch info about a contact. Create it if it does not exist."""
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
req = commands.InfoContact(id=contact.registry_id)
|
||||||
|
return registry.send(req, cleaned=True).res_data[0]
|
||||||
|
except RegistryError as e:
|
||||||
|
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST:
|
||||||
|
create = commands.CreateContact(
|
||||||
|
id=contact.registry_id,
|
||||||
|
postal_info=epp.PostalInfo( # type: ignore
|
||||||
|
name=contact.name,
|
||||||
|
addr=epp.ContactAddr(
|
||||||
|
street=[
|
||||||
|
getattr(contact, street)
|
||||||
|
for street in ["street1", "street2", "street3"]
|
||||||
|
if hasattr(contact, street)
|
||||||
|
],
|
||||||
|
city=contact.city,
|
||||||
|
pc=contact.pc,
|
||||||
|
cc=contact.cc,
|
||||||
|
sp=contact.sp,
|
||||||
|
),
|
||||||
|
org=contact.org,
|
||||||
|
type="loc",
|
||||||
|
),
|
||||||
|
email=contact.email,
|
||||||
|
voice=contact.voice,
|
||||||
|
fax=contact.fax,
|
||||||
|
auth_info=epp.ContactAuthInfo(pw="2fooBAR123fooBaz"),
|
||||||
|
)
|
||||||
|
# security contacts should only show email addresses, for now
|
||||||
|
if (
|
||||||
|
contact.contact_type
|
||||||
|
== PublicContact.ContactTypeChoices.SECURITY
|
||||||
|
):
|
||||||
|
DF = epp.DiscloseField
|
||||||
|
create.disclose = epp.Disclose(
|
||||||
|
flag=False,
|
||||||
|
fields={DF.FAX, DF.VOICE, DF.ADDR},
|
||||||
|
types={DF.ADDR: "loc"},
|
||||||
|
)
|
||||||
|
registry.send(create)
|
||||||
|
else:
|
||||||
|
raise e
|
||||||
|
|
||||||
|
def _update_or_create_host(self, host):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def _delete_host(self, host):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def _fetch_cache(self, fetch_hosts=False, fetch_contacts=False):
|
||||||
|
"""Contact registry for info about a domain."""
|
||||||
|
try:
|
||||||
|
# get info from registry
|
||||||
|
data = self._get_or_create_domain()
|
||||||
|
# extract properties from response
|
||||||
|
# (Ellipsis is used to mean "null")
|
||||||
|
cache = {
|
||||||
|
"auth_info": getattr(data, "auth_info", ...),
|
||||||
|
"_contacts": getattr(data, "contacts", ...),
|
||||||
|
"cr_date": getattr(data, "cr_date", ...),
|
||||||
|
"ex_date": getattr(data, "ex_date", ...),
|
||||||
|
"_hosts": getattr(data, "hosts", ...),
|
||||||
|
"name": getattr(data, "name", ...),
|
||||||
|
"registrant": getattr(data, "registrant", ...),
|
||||||
|
"statuses": getattr(data, "statuses", ...),
|
||||||
|
"tr_date": getattr(data, "tr_date", ...),
|
||||||
|
"up_date": getattr(data, "up_date", ...),
|
||||||
|
}
|
||||||
|
|
||||||
|
# remove null properties (to distinguish between "a value of None" and null)
|
||||||
|
cleaned = {k: v for k, v in cache.items() if v is not ...}
|
||||||
|
|
||||||
|
# get contact info, if there are any
|
||||||
|
if (
|
||||||
|
fetch_contacts
|
||||||
|
and "_contacts" in cleaned
|
||||||
|
and isinstance(cleaned["_contacts"], list)
|
||||||
|
and len(cleaned["_contacts"])
|
||||||
|
):
|
||||||
|
cleaned["contacts"] = []
|
||||||
|
for id in cleaned["_contacts"]:
|
||||||
|
# we do not use _get_or_create_* because we expect the object we
|
||||||
|
# just asked the registry for still exists --
|
||||||
|
# if not, that's a problem
|
||||||
|
req = commands.InfoContact(id=id)
|
||||||
|
data = registry.send(req, cleaned=True).res_data[0]
|
||||||
|
|
||||||
|
# extract properties from response
|
||||||
|
# (Ellipsis is used to mean "null")
|
||||||
|
contact = {
|
||||||
|
"id": id,
|
||||||
|
"auth_info": getattr(data, "auth_info", ...),
|
||||||
|
"cr_date": getattr(data, "cr_date", ...),
|
||||||
|
"disclose": getattr(data, "disclose", ...),
|
||||||
|
"email": getattr(data, "email", ...),
|
||||||
|
"fax": getattr(data, "fax", ...),
|
||||||
|
"postal_info": getattr(data, "postal_info", ...),
|
||||||
|
"statuses": getattr(data, "statuses", ...),
|
||||||
|
"tr_date": getattr(data, "tr_date", ...),
|
||||||
|
"up_date": getattr(data, "up_date", ...),
|
||||||
|
"voice": getattr(data, "voice", ...),
|
||||||
|
}
|
||||||
|
|
||||||
|
cleaned["contacts"].append(
|
||||||
|
{k: v for k, v in contact.items() if v is not ...}
|
||||||
|
)
|
||||||
|
|
||||||
|
# get nameserver info, if there are any
|
||||||
|
if (
|
||||||
|
fetch_hosts
|
||||||
|
and "_hosts" in cleaned
|
||||||
|
and isinstance(cleaned["_hosts"], list)
|
||||||
|
and len(cleaned["_hosts"])
|
||||||
|
):
|
||||||
|
cleaned["hosts"] = []
|
||||||
|
for name in cleaned["_hosts"]:
|
||||||
|
# we do not use _get_or_create_* because we expect the object we
|
||||||
|
# just asked the registry for still exists --
|
||||||
|
# if not, that's a problem
|
||||||
|
req = commands.InfoHost(name=name)
|
||||||
|
data = registry.send(req, cleaned=True).res_data[0]
|
||||||
|
# extract properties from response
|
||||||
|
# (Ellipsis is used to mean "null")
|
||||||
|
host = {
|
||||||
|
"name": name,
|
||||||
|
"addrs": getattr(data, "addrs", ...),
|
||||||
|
"cr_date": getattr(data, "cr_date", ...),
|
||||||
|
"statuses": getattr(data, "statuses", ...),
|
||||||
|
"tr_date": getattr(data, "tr_date", ...),
|
||||||
|
"up_date": getattr(data, "up_date", ...),
|
||||||
|
}
|
||||||
|
cleaned["hosts"].append(
|
||||||
|
{k: v for k, v in host.items() if v is not ...}
|
||||||
|
)
|
||||||
|
|
||||||
|
# replace the prior cache with new data
|
||||||
|
self._cache = cleaned
|
||||||
|
|
||||||
|
except RegistryError as e:
|
||||||
|
logger.error(e)
|
||||||
|
|
||||||
|
def _invalidate_cache(self):
|
||||||
|
"""Remove cache data when updates are made."""
|
||||||
|
self._cache = {}
|
||||||
|
|
||||||
|
def _get_property(self, property):
|
||||||
|
"""Get some piece of info about a domain."""
|
||||||
|
if property not in self._cache:
|
||||||
|
self._fetch_cache(
|
||||||
|
fetch_hosts=(property == "hosts"),
|
||||||
|
fetch_contacts=(property == "contacts"),
|
||||||
|
)
|
||||||
|
|
||||||
|
if property in self._cache:
|
||||||
|
return self._cache[property]
|
||||||
|
else:
|
||||||
|
raise KeyError(
|
||||||
|
"Requested key %s was not found in registry cache." % str(property)
|
||||||
|
)
|
||||||
|
|
|
@ -5,11 +5,117 @@ This file tests the various ways in which the registrar interacts with the regis
|
||||||
"""
|
"""
|
||||||
from django.test import TestCase
|
from django.test import TestCase
|
||||||
from django.db.utils import IntegrityError
|
from django.db.utils import IntegrityError
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
import datetime
|
||||||
|
from registrar.models import Domain # add in DomainApplication, User,
|
||||||
|
|
||||||
from registrar.models import (
|
|
||||||
Domain,
|
|
||||||
)
|
|
||||||
from unittest import skip
|
from unittest import skip
|
||||||
|
from epplibwrapper import commands
|
||||||
|
|
||||||
|
|
||||||
|
class TestDomainCache(TestCase):
|
||||||
|
class fakedEppObject(object):
|
||||||
|
""""""
|
||||||
|
|
||||||
|
def __init__(self, auth_info=..., cr_date=..., contacts=..., hosts=...):
|
||||||
|
self.auth_info = auth_info
|
||||||
|
self.cr_date = cr_date
|
||||||
|
self.contacts = contacts
|
||||||
|
self.hosts = hosts
|
||||||
|
|
||||||
|
mockDataInfoDomain = fakedEppObject(
|
||||||
|
"fakepw",
|
||||||
|
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
|
||||||
|
contacts=["123"],
|
||||||
|
hosts=["fake.host.com"],
|
||||||
|
)
|
||||||
|
mockDataInfoContact = fakedEppObject(
|
||||||
|
"anotherPw", cr_date=datetime.datetime(2023, 7, 25, 19, 45, 35)
|
||||||
|
)
|
||||||
|
mockDataInfoHosts = fakedEppObject(
|
||||||
|
"lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35)
|
||||||
|
)
|
||||||
|
|
||||||
|
def mockSend(self, _request, cleaned):
|
||||||
|
""""""
|
||||||
|
if isinstance(_request, commands.InfoDomain):
|
||||||
|
return MagicMock(res_data=[self.mockDataInfoDomain])
|
||||||
|
elif isinstance(_request, commands.InfoContact):
|
||||||
|
return MagicMock(res_data=[self.mockDataInfoContact])
|
||||||
|
return MagicMock(res_data=[self.mockDataInfoHosts])
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""mock epp send function as this will fail locally"""
|
||||||
|
self.patcher = patch("registrar.models.domain.registry.send")
|
||||||
|
self.mock_foo = self.patcher.start()
|
||||||
|
self.mock_foo.side_effect = self.mockSend
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.patcher.stop()
|
||||||
|
|
||||||
|
def test_cache_sets_resets(self):
|
||||||
|
"""Cache should be set on getter and reset on setter calls"""
|
||||||
|
domain, _ = Domain.objects.get_or_create(name="igorville.gov")
|
||||||
|
# trigger getter
|
||||||
|
_ = domain.creation_date
|
||||||
|
|
||||||
|
# getter should set the domain cache with a InfoDomain object
|
||||||
|
# (see InfoDomainResult)
|
||||||
|
self.assertEquals(domain._cache["auth_info"], self.mockDataInfoDomain.auth_info)
|
||||||
|
self.assertEquals(domain._cache["cr_date"], self.mockDataInfoDomain.cr_date)
|
||||||
|
self.assertFalse("avail" in domain._cache.keys())
|
||||||
|
|
||||||
|
# using a setter should clear the cache
|
||||||
|
domain.nameservers = [("", "")]
|
||||||
|
self.assertEquals(domain._cache, {})
|
||||||
|
|
||||||
|
# send should have been called only once
|
||||||
|
self.mock_foo.assert_called_once()
|
||||||
|
|
||||||
|
def test_cache_used_when_avail(self):
|
||||||
|
"""Cache is pulled from if the object has already been accessed"""
|
||||||
|
domain, _ = Domain.objects.get_or_create(name="igorville.gov")
|
||||||
|
cr_date = domain.creation_date
|
||||||
|
|
||||||
|
# repeat the getter call
|
||||||
|
cr_date = domain.creation_date
|
||||||
|
|
||||||
|
# value should still be set correctly
|
||||||
|
self.assertEqual(cr_date, self.mockDataInfoDomain.cr_date)
|
||||||
|
self.assertEqual(domain._cache["cr_date"], self.mockDataInfoDomain.cr_date)
|
||||||
|
|
||||||
|
# send was only called once & not on the second getter call
|
||||||
|
self.mock_foo.assert_called_once()
|
||||||
|
|
||||||
|
def test_cache_nested_elements(self):
|
||||||
|
"""Cache works correctly with the nested objects cache and hosts"""
|
||||||
|
domain, _ = Domain.objects.get_or_create(name="igorville.gov")
|
||||||
|
|
||||||
|
# the cached contacts and hosts should be dictionaries of what is passed to them
|
||||||
|
expectedContactsDict = {
|
||||||
|
"id": self.mockDataInfoDomain.contacts[0],
|
||||||
|
"auth_info": self.mockDataInfoContact.auth_info,
|
||||||
|
"cr_date": self.mockDataInfoContact.cr_date,
|
||||||
|
}
|
||||||
|
expectedHostsDict = {
|
||||||
|
"name": self.mockDataInfoDomain.hosts[0],
|
||||||
|
"cr_date": self.mockDataInfoHosts.cr_date,
|
||||||
|
}
|
||||||
|
|
||||||
|
# this can be changed when the getter for contacts is implemented
|
||||||
|
domain._get_property("contacts")
|
||||||
|
|
||||||
|
# check domain info is still correct and not overridden
|
||||||
|
self.assertEqual(domain._cache["auth_info"], self.mockDataInfoDomain.auth_info)
|
||||||
|
self.assertEqual(domain._cache["cr_date"], self.mockDataInfoDomain.cr_date)
|
||||||
|
|
||||||
|
# check contacts
|
||||||
|
self.assertEqual(domain._cache["_contacts"], self.mockDataInfoDomain.contacts)
|
||||||
|
self.assertEqual(domain._cache["contacts"], [expectedContactsDict])
|
||||||
|
|
||||||
|
# get and check hosts is set correctly
|
||||||
|
domain._get_property("hosts")
|
||||||
|
self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
|
||||||
|
|
||||||
|
|
||||||
class TestDomainCreation(TestCase):
|
class TestDomainCreation(TestCase):
|
||||||
|
@ -20,7 +126,6 @@ class TestDomainCreation(TestCase):
|
||||||
Background:
|
Background:
|
||||||
Given that a valid domain application exists
|
Given that a valid domain application exists
|
||||||
"""
|
"""
|
||||||
pass
|
|
||||||
|
|
||||||
@skip("not implemented yet")
|
@skip("not implemented yet")
|
||||||
def test_approved_application_creates_domain_locally(self):
|
def test_approved_application_creates_domain_locally(self):
|
||||||
|
@ -59,6 +164,16 @@ class TestDomainCreation(TestCase):
|
||||||
with self.assertRaisesRegex(IntegrityError, "name"):
|
with self.assertRaisesRegex(IntegrityError, "name"):
|
||||||
Domain.objects.create(name="igorville.gov")
|
Domain.objects.create(name="igorville.gov")
|
||||||
|
|
||||||
|
@skip("cannot activate a domain without mock registry")
|
||||||
|
def test_get_status(self):
|
||||||
|
"""Returns proper status based on `state`."""
|
||||||
|
domain = Domain.objects.create(name="igorville.gov")
|
||||||
|
domain.save()
|
||||||
|
self.assertEqual(None, domain.status)
|
||||||
|
domain.activate()
|
||||||
|
domain.save()
|
||||||
|
self.assertIn("ok", domain.status)
|
||||||
|
|
||||||
|
|
||||||
class TestRegistrantContacts(TestCase):
|
class TestRegistrantContacts(TestCase):
|
||||||
"""Rule: Registrants may modify their WHOIS data"""
|
"""Rule: Registrants may modify their WHOIS data"""
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue