Test cases / Mapping

This commit is contained in:
zandercymatics 2023-09-14 15:54:54 -06:00
parent 43a636b286
commit 2e6a8198ac
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
4 changed files with 221 additions and 121 deletions

View file

@ -44,7 +44,7 @@ except NameError:
try:
from .client import CLIENT, commands
from .errors import RegistryError, ErrorCode
from epplib.models import common
from epplib.models import common, info
except ImportError:
pass
@ -52,6 +52,7 @@ __all__ = [
"CLIENT",
"commands",
"common",
"info",
"ErrorCode",
"RegistryError",
]

View file

@ -1,3 +1,4 @@
from itertools import zip_longest
import logging
from datetime import date
@ -10,8 +11,9 @@ from epplibwrapper import (
CLIENT as registry,
commands,
common as epp,
info as eppInfo,
RegistryError,
ErrorCode
ErrorCode,
)
from .utility.domain_field import DomainField
@ -446,7 +448,7 @@ class Domain(TimeStampedModel, DomainHelper):
"""Get or set the security contact for this domain."""
security = PublicContact.ContactTypeChoices.SECURITY
return self.generic_contact_getter(security)
def _add_registrant_to_existing_domain(self, contact: PublicContact):
"""Used to change the registrant contact on an existing domain"""
updateDomain = commands.UpdateDomain(
@ -654,95 +656,57 @@ class Domain(TimeStampedModel, DomainHelper):
# Q: I don't like this function name much,
# what would be better here?
def map_DomainContact_to_PublicContact(self, contact: epp.DomainContact, only_map_domain_contact = False):
"""Maps the Epps DomainContact Object to a PublicContact object
contact -> DomainContact: the returned contact for InfoDomain
def map_epp_contact_to_public_contact(
self, contact: eppInfo.InfoContactResultData, contact_type
):
"""Maps the Epp contact representation to a PublicContact object"""
only_map_domain_contact -> bool: DomainContact doesn't give enough information on
its own to fully qualify PublicContact, but if you only want the contact_type
and registry_id fields, then set this to True.
"""
if(contact is None or contact == {}):
raise ValueError("Contact cannot be empty or none")
if(contact.contact is None or contact.contact == ""):
raise ValueError("No contact id was provided")
if(contact.type is None or contact.type == ""):
raise ValueError("no contact_type was provided")
if contact is None:
return None
if(contact.type not in PublicContact.ContactTypeChoice.values()):
raise ValueError(f"Invalid contact_type of '{contact.type}' for object {contact}. Must exist within PublicContact.ContactTypeChoice")
mapped_contact: PublicContact = PublicContact(
# todo - check contact is valid type
domain=self,
contact_type=contact.type,
registry_id=contact.contact
)
if contact_type is None:
raise ValueError(f"contact_type is None")
if only_map_domain_contact:
return mapped_contact
extra_contact_info: epp.InfoContactResultData = self._request_contact_info(mapped_contact)
# For readability
return self.map_InfoContactResultData_to_PublicContact(extra_contact_info)
def map_InfoContactResultData_to_PublicContact(self, contact):
"""Maps the Epps InfoContactResultData Object to a PublicContact object"""
if(contact is None or contact == {}):
raise ValueError("Contact cannot be empty or none")
if(contact.id is None or contact.id == ""):
raise ValueError("No contact id was provided")
if(contact.type is None or contact.type == ""):
raise ValueError("no contact_type was provided")
logger.debug(f"map_epp_contact_to_public_contact contact -> {contact}")
logger.debug(f"What is the type? {type(contact)}")
if not isinstance(contact, eppInfo.InfoContactResultData):
raise ValueError("Contact must be of type InfoContactResultData")
if(contact.type not in PublicContact.ContactTypeChoice.values()):
raise ValueError(f"Invalid contact_type of '{contact.type}' for object {contact}. Must exist within PublicContact.ContactTypeChoice")
auth_info = contact.auth_info
postal_info = contact.postal_info
return PublicContact(
domain = self,
contact_type=contact.type,
addr = postal_info.addr
streets = {}
if addr is not None and addr.street is not None:
# 'zips' two lists together. For instance, (('street1', 'some_value_here'), ('street2', 'some_value_here'))
# Dict then converts this to a useable kwarg which we can pass in
streets = dict(
zip_longest(
["street1", "street2", "street3"],
addr.street,
fillvalue=None,
)
)
desired_contact = PublicContact(
domain=self,
contact_type=contact_type,
registry_id=contact.id,
email=contact.email,
voice=contact.voice,
fax=contact.fax,
pw=contact.auth_info.pw or None,
name = postal_info.name or None,
org = postal_info.org or None,
# TODO - street is a Sequence[str]
#street = postal_info.street,
city = postal_info.addr.city or None,
pc = postal_info.addr.pc or None,
cc = postal_info.addr.cc or None,
sp = postal_info.addr.sp or None
pw=auth_info.pw,
name=postal_info.name,
org=postal_info.org,
city=addr.city,
pc=addr.pc,
cc=addr.cc,
sp=addr.sp,
**streets,
)
logger.debug("lazy")
logger.debug(desired_contact.__dict__)
return desired_contact
def map_to_public_contact(self, contact):
""" Maps epp contact types to PublicContact. Can handle two types:
epp.DomainContact or epp.InfoContactResultData"""
if(isinstance(contact, epp.InfoContactResultData)):
return self.map_InfoContactResultData_to_PublicContact(contact)
# If contact is of type epp.DomainContact,
# grab as much data as possible.
elif(isinstance(contact, epp.DomainContact)):
# Runs command.InfoDomain, as epp.DomainContact
# on its own doesn't return enough data.
try:
return self.map_DomainContact_to_PublicContact(contact)
except RegistryError as error:
logger.warning(f"Contact {contact} does not exist on the registry")
logger.warning(error)
return self.map_DomainContact_to_PublicContact(contact, only_map_domain_contact=True)
else:
raise ValueError("Contact is not of the correct type. Must be epp.DomainContact or epp.InfoContactResultData")
def _request_contact_info(self, contact: PublicContact):
try:
req = commands.InfoContact(id=contact.registry_id)
@ -756,79 +720,86 @@ class Domain(TimeStampedModel, DomainHelper):
error,
)
raise error
def generic_contact_getter(self, contact_type_choice: PublicContact.ContactTypeChoices) -> PublicContact:
""" Abstracts the cache logic on EppLib contact items
def generic_contact_getter(
self, contact_type_choice: PublicContact.ContactTypeChoices
) -> PublicContact:
"""Abstracts the cache logic on EppLib contact items
contact_type_choice is a literal in PublicContact.ContactTypeChoices,
for instance: PublicContact.ContactTypeChoices.SECURITY.
If you wanted to setup getter logic for Security, you would call:
If you wanted to setup getter logic for Security, you would call:
cache_contact_helper(PublicContact.ContactTypeChoices.SECURITY),
or cache_contact_helper("security")
"""
try:
contacts = self._get_property("contacts")
desired_property = "contacts"
# The contact type 'registrant' is stored under a different property
if contact_type_choice == PublicContact.ContactTypeChoices.REGISTRANT:
desired_property = "registrant"
contacts = self._get_property(desired_property)
except KeyError as error:
logger.error("Contact does not exist")
raise error
else:
# TODO - is this even needed???????
print(f"generic_contact_getter -> contacts?? {contacts}")
# --> Map to public contact
cached_contact = self.grab_contact_in_keys(contacts, contact_type_choice)
if(cached_contact is None):
if cached_contact is None:
raise ValueError("No contact was found in cache or the registry")
# TODO - below line never executes with current logic
return cached_contact
# Convert it from an EppLib object to PublicContact
return self.map_epp_contact_to_public_contact(
cached_contact, contact_type_choice
)
def get_default_security_contact(self):
""" Gets the default security contact. """
"""Gets the default security contact."""
contact = PublicContact.get_default_security()
contact.domain = self
return contact
def get_default_administrative_contact(self):
""" Gets the default administrative contact. """
"""Gets the default administrative contact."""
contact = PublicContact.get_default_administrative()
contact.domain = self
return contact
def get_default_technical_contact(self):
""" Gets the default administrative contact. """
"""Gets the default administrative contact."""
contact = PublicContact.get_default_technical()
contact.domain = self
return contact
def get_default_registrant_contact(self):
""" Gets the default administrative contact. """
"""Gets the default administrative contact."""
contact = PublicContact.get_default_registrant()
contact.domain = self
return contact
def grab_contact_in_keys(self, contacts, check_type, get_latest_from_registry=True):
""" Grabs a contact object.
def grab_contact_in_keys(self, contacts, check_type):
"""Grabs a contact object.
Returns None if nothing is found.
check_type compares contact["type"] == check_type.
For example, check_type = 'security'
get_latest_from_registry --> bool which specifies if
a InfoContact command should be send to the
registry when grabbing the object.
If it is set to false, we just grab from cache.
Otherwise, we grab from the registry.
"""
for contact in contacts:
print(f"grab_contact_in_keys -> contact item {contact}")
print(f"grab_contact_in_keys -> isInstace {isinstance(contact, dict)}")
if (
isinstance(contact, dict)
and "id" in contact.keys()
and "type" in contact.keys()
and contact["type"] == check_type
):
return contact
item = PublicContact(
registry_id=contact["id"],
contact_type=contact["type"],
)
full_contact = self._request_contact_info(item)
return full_contact
# ForeignKey on UserDomainRole creates a "permissions" member for
# all of the user-roles that are in place for this domain
@ -927,12 +898,10 @@ class Domain(TimeStampedModel, DomainHelper):
security_contact = self.get_default_security_contact()
security_contact.save()
technical_contact = PublicContact.get_default_technical()
technical_contact.domain = self
technical_contact = self.get_default_technical_contact()
technical_contact.save()
administrative_contact = PublicContact.get_default_administrative()
administrative_contact.domain = self
administrative_contact = self.get_default_administrative_contact()
administrative_contact.save()
@transition(field="state", source=State.READY, target=State.ON_HOLD)
@ -1058,10 +1027,6 @@ class Domain(TimeStampedModel, DomainHelper):
)
return err.code
def _request_contact_info(self, contact: PublicContact):
req = commands.InfoContact(id=contact.registry_id)
return registry.send(req, cleaned=True).res_data[0]
def _get_or_create_contact(self, contact: PublicContact):
"""Try to fetch info about a contact. Create it if it does not exist."""
@ -1122,6 +1087,17 @@ class Domain(TimeStampedModel, DomainHelper):
# statuses can just be a list no need to keep the epp object
if "statuses" in cleaned.keys():
cleaned["statuses"] = [status.state for status in cleaned["statuses"]]
# Registrant should be of type PublicContact
if "registrant" in cleaned.keys():
try:
contact = PublicContact(
registry_id=cleaned["registrant"],
contact_type=PublicContact.ContactTypeChoices.REGISTRANT,
)
cleaned["registrant"] = self._request_contact_info(contact)
except RegistryError:
cleaned["registrant"] = None
# get contact info, if there are any
if (
# fetch_contacts and

View file

@ -1,3 +1,4 @@
from dataclasses import dataclass
import datetime
import os
import logging
@ -26,6 +27,7 @@ from registrar.models import (
from epplibwrapper import (
commands,
common,
info,
RegistryError,
ErrorCode,
)
@ -547,11 +549,48 @@ class MockEppLib(TestCase):
class fakedEppObject(object):
""""""
def __init__(self, auth_info=..., cr_date=..., contacts=..., hosts=...):
def __init__(
self, auth_info=..., cr_date=..., contacts=..., hosts=..., registrant=...
):
self.auth_info = auth_info
self.cr_date = cr_date
self.contacts = contacts
self.hosts = hosts
self.registrant = registrant
def dummyInfoContactResultData(self, id, email, contact_type):
fake = info.InfoContactResultData(
id=id,
postal_info=common.PostalInfo(
name="Robert The Villain",
addr=common.ContactAddr(
street=["street1", "street2", "street3"],
city="city",
pc="pc",
cc="cc",
sp="sp",
),
org="Skim Milk",
type="type",
),
voice="voice",
fax="+1-212-9876543",
email=email,
auth_info=common.ContactAuthInfo(pw="fakepw"),
roid=...,
statuses=[],
cl_id=...,
cr_id=...,
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
up_id=...,
up_date=...,
tr_date=...,
disclose=...,
vat=...,
ident=...,
notify_email=...,
)
return fake
mockDataInfoDomain = fakedEppObject(
"fakepw",
@ -559,6 +598,17 @@ class MockEppLib(TestCase):
contacts=[common.DomainContact(contact="123", type="security")],
hosts=["fake.host.com"],
)
InfoDomainWithContacts = fakedEppObject(
"fakepw",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
contacts=[
common.DomainContact(contact="security", type="security"),
common.DomainContact(contact="admin", type="admin"),
common.DomainContact(contact="tech", type="tech"),
],
hosts=["fake.host.com"],
registrant="registrant",
)
infoDomainNoContact = fakedEppObject(
"security",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
@ -580,9 +630,19 @@ class MockEppLib(TestCase):
if isinstance(_request, commands.InfoDomain):
if getattr(_request, "name", None) == "security.gov":
return MagicMock(res_data=[self.infoDomainNoContact])
return MagicMock(res_data=[self.mockDataInfoDomain])
elif getattr(_request, "name", None) == "freeman.gov":
return MagicMock(res_data=[self.InfoDomainWithContacts])
elif isinstance(_request, commands.InfoContact):
return MagicMock(res_data=[self.mockDataInfoContact])
mocked_result = self.mockDataInfoContact
l = getattr(_request, "contact_type", None)
logger.debug(f"unuiquq {_request.__dict__}")
if getattr(_request, "id", None) in PublicContact.ContactTypeChoices:
desired_type = getattr(_request, "id", None)
mocked_result = self.dummyInfoContactResultData(
id=desired_type, email=f"{desired_type}@mail.gov"
)
return MagicMock(res_data=[mocked_result])
elif (
isinstance(_request, commands.CreateContact)
and getattr(_request, "id", None) == "fail"

View file

@ -21,6 +21,9 @@ from epplibwrapper import (
commands,
common,
)
import logging
logger = logging.getLogger(__name__)
class TestDomainCache(MockEppLib):
@ -445,6 +448,66 @@ class TestRegistrantContacts(MockEppLib):
"""
raise
def test_contact_getters_cache(self):
"""
Scenario: A user is grabbing a domain, which is cached, that has multiple contact objects
When each contact is retrieved from cache
Then the user retrieves the correct contact objects
"""
domain, _ = Domain.objects.get_or_create(name="freeman.gov")
# the cached contacts and hosts should be dictionaries of what is passed to them
# expectedPublicContactDict = {'id': None, 'created_at': None, 'updated_at': None, 'contact_type': PublicContact.ContactTypeChoices.SECURITY, 'registry_id': 'freeman', 'domain_id': 2, 'name': 'Robert The Villain', 'org': 'Skim Milk', 'street1': 'Evil street1', 'street2': 'Evil street2', 'street3': 'evil street3', 'city': 'Cityofdoom', 'sp': 'sp', 'pc': 'pc', 'cc': 'cc', 'email': 'awful@skimmilk.com', 'voice': 'voice', 'fax': '+1-212-9876543', 'pw': 'fakepw'}
security = PublicContact.get_default_security()
security.email = "security@mail.gov"
security.domain = domain
security.save()
# expected_security_contact = PublicContact(**expectedPublicContactDict)
expected_security_contact = security
domain.security_contact = security
technical = PublicContact.get_default_technical()
technical.email = "technical@mail.gov"
technical.domain = domain
technical.save()
expected_technical_contact = technical
domain.technical_contact = technical
administrative = PublicContact.get_default_administrative()
administrative.email = "administrative@mail.gov"
administrative.domain = domain
administrative.save()
expected_administrative_contact = administrative
domain.administrative_contact = administrative
registrant = PublicContact.get_default_registrant()
registrant.email = "registrant@mail.gov"
registrant.domain = domain
registrant.save()
expected_registrant_contact = registrant
domain.registrant_contact = registrant
logger.debug(f"domain obj: {domain.security_contact.__dict__}")
logger.debug(f"expected: {expected_security_contact.__dict__}")
self.assertEqual(domain.security_contact, expected_security_contact)
self.assertEqual(domain.technical_contact, expected_technical_contact)
self.assertEqual(domain.administrative_contact, expected_administrative_contact)
self.assertEqual(domain.registrant_contact, expected_registrant_contact)
@skip("not implemented yet")
def test_contact_getters_registry(self):
"""
Scenario: A user is grabbing a domain, which does not exist in cache, that has multiple contact objects
When the domain is retrieved from cache
Then the user retrieves the correct domain object
"""
# Create something using infocontact for that domain
# Then just grab the domain object normally
# That 'something' doesn't exist on the local domain,
# so registry should be called
raise
class TestRegistrantNameservers(TestCase):
"""Rule: Registrants may modify their nameservers"""