PR Changes

This commit is contained in:
zandercymatics 2023-10-02 11:38:41 -06:00
parent a84bbb5d3a
commit 1990f91e6a
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
5 changed files with 185 additions and 131 deletions

View file

@ -14,6 +14,7 @@ from epplibwrapper import (
RegistryError,
ErrorCode,
)
from registrar.models.utility.contact_error import ContactError
from .utility.domain_field import DomainField
from .utility.domain_helper import DomainHelper
@ -668,36 +669,31 @@ class Domain(TimeStampedModel, DomainHelper):
return None
if contact_type is None:
raise ValueError("contact_type is None")
raise ContactError("contact_type is None")
if contact_id is None:
raise ValueError("contact_id is None")
raise ContactError("contact_id is None")
if len(contact_id) > 16 or len(contact_id) < 1:
raise ValueError(
# Since contact_id is registry_id,
# check that its the right length
contact_id_length = len(contact_id)
if (
contact_id_length > PublicContact.get_max_id_length()
or contact_id_length < 1
):
raise ContactError(
"contact_id is of invalid length. "
"Cannot exceed 16 characters, "
f"got {contact_id} with a length of {len(contact_id)}"
f"got {contact_id} with a length of {contact_id_length}"
)
if not isinstance(contact, eppInfo.InfoContactResultData):
raise ValueError("Contact must be of type InfoContactResultData")
raise ContactError("Contact must be of type InfoContactResultData")
auth_info = contact.auth_info
postal_info = contact.postal_info
addr = postal_info.addr
# 'zips' two lists together.
# For instance, (('street1', 'some_value_here'),
# ('street2', 'some_value_here'))
# Dict then converts this to a useable kwarg which we can pass in
streets = dict(
zip_longest(
["street1", "street2", "street3"],
addr.street if addr is not None else [""],
fillvalue=None,
)
)
streets = self._convert_streets_to_dict(addr.street)
desired_contact = PublicContact(
domain=self,
contact_type=contact_type,
@ -718,6 +714,30 @@ class Domain(TimeStampedModel, DomainHelper):
return desired_contact
def _convert_streets_to_dict(self, streets):
"""
Converts EPPLibs street representation
to PublicContacts
EPPLib returns 'street' as an sequence of strings.
Meanwhile, PublicContact has this split into three
seperate properties: street1, street2, street3.
Handles this disparity
"""
# 'zips' two lists together.
# For instance, (('street1', 'some_value_here'),
# ('street2', 'some_value_here'))
# Dict then converts this to a useable kwarg which we can pass in
street_dict = dict(
zip_longest(
["street1", "street2", "street3"],
streets if streets is not None else [""],
fillvalue=None,
)
)
return street_dict
def _request_contact_info(self, contact: PublicContact):
try:
req = commands.InfoContact(id=contact.registry_id)
@ -735,7 +755,9 @@ class Domain(TimeStampedModel, DomainHelper):
def generic_contact_getter(
self, contact_type_choice: PublicContact.ContactTypeChoices
) -> PublicContact | None:
"""Abstracts the cache logic on EppLib contact items
"""Retrieves the desired PublicContact from the registry.
This abstracts the caching and EPP retrieval for
all contact items and thus may result in EPP calls being sent.
contact_type_choice is a literal in PublicContact.ContactTypeChoices,
for instance: PublicContact.ContactTypeChoices.SECURITY.
@ -744,8 +766,6 @@ class Domain(TimeStampedModel, DomainHelper):
cache_contact_helper(PublicContact.ContactTypeChoices.SECURITY),
or cache_contact_helper("security").
Note: Registrant is handled slightly differently internally,
but the output will be the same.
"""
# registrant_contact(s) are an edge case. They exist on
# the "registrant" property as opposed to contacts.
@ -754,16 +774,16 @@ class Domain(TimeStampedModel, DomainHelper):
desired_property = "registrant"
try:
# Grab from cache
contacts = self._get_property(desired_property)
except KeyError as error:
# Q: Should we be raising an error instead?
logger.error(f"Could not find {contact_type_choice}: {error}")
return None
else:
# Grab from cache
cached_contact = self.grab_contact_in_keys(contacts, contact_type_choice)
cached_contact = self.get_contact_in_keys(contacts, contact_type_choice)
if cached_contact is None:
raise ValueError("No contact was found in cache or the registry")
# TODO - #1103
raise ContactError("No contact was found in cache or the registry")
return cached_contact
@ -780,52 +800,66 @@ class Domain(TimeStampedModel, DomainHelper):
return contact
def get_default_technical_contact(self):
"""Gets the default administrative contact."""
"""Gets the default technical contact."""
contact = PublicContact.get_default_technical()
contact.domain = self
return contact
def get_default_registrant_contact(self):
"""Gets the default administrative contact."""
"""Gets the default registrant contact."""
contact = PublicContact.get_default_registrant()
contact.domain = self
return contact
def grab_contact_in_keys(self, contacts, contact_type):
"""Grabs a contact object.
Returns None if nothing is found.
contact_type compares contact.contact_type == contact_type.
def get_contact_in_keys(self, contacts, contact_type):
"""Gets a contact object.
For example, contact_type = 'security'
Args:
contacts ([PublicContact]): List of PublicContacts
contact_type (literal): Which PublicContact to get
Returns:
PublicContact | None
"""
# Registrant doesn't exist as an array
# Registrant doesn't exist as an array, and is of
# a special data type, so we need to handle that.
if contact_type == PublicContact.ContactTypeChoices.REGISTRANT:
if (
isinstance(contacts, PublicContact)
and contacts.contact_type is not None
and contacts.contact_type == contact_type
):
if contacts.registry_id is None:
raise ValueError("registry_id cannot be None")
return contacts
else:
raise ValueError("Invalid contact object for registrant_contact")
desired_contact = None
if isinstance(contacts, str):
desired_contact = self._registrant_to_public_contact(
self._cache["registrant"]
)
# Set the cache with the updated object
# for performance reasons.
if "registrant" in self._cache:
self._cache["registrant"] = desired_contact
elif isinstance(contacts, PublicContact):
desired_contact = contacts
for contact in contacts:
if (
isinstance(contact, PublicContact)
and contact.contact_type is not None
and contact.contact_type == contact_type
):
if contact.registry_id is None:
raise ValueError("registry_id cannot be None")
return contact
return self._handle_registrant_contact(desired_contact)
# If the for loop didn't do a return,
# then we know that it doesn't exist within cache
logger.info(f"Requested contact {contact.registry_id} does not exist in cache.")
_registry_id: str
if contact_type in contacts:
_registry_id = contacts.get(contact_type)
desired = PublicContact.objects.filter(
registry_id=_registry_id, domain=self, contact_type=contact_type
)
if desired.count() == 1:
return desired.get()
logger.info(f"Requested contact {_registry_id} does not exist in cache.")
return None
def _handle_registrant_contact(self, contact):
if (
contact.contact_type is not None
and contact.contact_type == PublicContact.ContactTypeChoices.REGISTRANT
):
return contact
else:
raise ValueError("Invalid contact object for registrant_contact")
# ForeignKey on UserDomainRole creates a "permissions" member for
# all of the user-roles that are in place for this domain
@ -1108,30 +1142,24 @@ class Domain(TimeStampedModel, DomainHelper):
cleaned = {k: v for k, v in cache.items() if v is not ...}
# statuses can just be a list no need to keep the epp object
if "statuses" in cleaned.keys():
if "statuses" in cleaned:
cleaned["statuses"] = [status.state for status in cleaned["statuses"]]
# Registrant should be of type PublicContact
if "registrant" in cleaned.keys():
cleaned["registrant"] = self._registrant_to_public_contact(
cleaned["registrant"]
)
if (
# fetch_contacts and
"_contacts" in cleaned.keys()
"_contacts" in cleaned
and isinstance(cleaned["_contacts"], list)
and len(cleaned["_contacts"]) > 0
):
cleaned["contacts"] = []
choices = PublicContact.ContactTypeChoices
# We expect that all these fields get populated,
# so we can create these early, rather than waiting.
cleaned["contacts"] = {
choices.ADMINISTRATIVE: None,
choices.SECURITY: None,
choices.TECHNICAL: None,
}
for domainContact in cleaned["_contacts"]:
# we do not use _get_or_create_* because we expect the object we
# just asked the registry for still exists --
# if not, that's a problem
# TODO- discuss-should we check if contact is in public contacts
# and add it if not-
# this is really to keep in mind for the transition
req = commands.InfoContact(id=domainContact.contact)
data = registry.send(req, cleaned=True).res_data[0]
@ -1140,10 +1168,10 @@ class Domain(TimeStampedModel, DomainHelper):
data, domainContact.contact, domainContact.type
)
# Find/create it in the DB, then add it to the list
cleaned["contacts"].append(
self._get_or_create_public_contact(mapped_object)
)
# Find/create it in the DB
in_db = self._get_or_create_public_contact(mapped_object)
cleaned["contacts"][in_db.contact_type] = in_db.registry_id
# get nameserver info, if there are any
if (
@ -1182,7 +1210,7 @@ class Domain(TimeStampedModel, DomainHelper):
def _get_or_create_public_contact(self, public_contact: PublicContact):
"""Tries to find a PublicContact object in our DB.
If it can't, it'll create it."""
If it can't, it'll create it. Returns PublicContact"""
db_contact = PublicContact.objects.filter(
registry_id=public_contact.registry_id,
contact_type=public_contact.contact_type,
@ -1190,37 +1218,36 @@ class Domain(TimeStampedModel, DomainHelper):
)
# Raise an error if we find duplicates.
# This should not occur...
# This should not occur
if db_contact.count() > 1:
raise Exception(
f"Multiple contacts found for {public_contact.contact_type}"
)
if db_contact.count() == 1:
existing_contact = db_contact.get()
# Does the item we're grabbing match
# what we have in our DB?
# If not, we likely have a duplicate.
if (
existing_contact.email != public_contact.email
or existing_contact.registry_id != public_contact.registry_id
):
raise ValueError(
"Requested PublicContact is out of sync "
"with DB. Potential duplicate?"
)
# Save to DB if it doesn't exist already.
if db_contact.count() == 0:
# Doesn't run custom save logic, just saves to DB
public_contact.save(skip_epp_save=True)
logger.info(f"Created a new PublicContact: {public_contact}")
# Append the item we just created
return public_contact
# If it already exists, we can
# assume that the DB instance was updated
# during set, so we should just use that.
return existing_contact
existing_contact = db_contact.get()
# Saves to DB if it doesn't exist already.
# Doesn't run custom save logic, just saves to DB
public_contact.save(skip_epp_save=True)
logger.info(f"Created a new PublicContact: {public_contact}")
# Append the item we just created
return public_contact
# Does the item we're grabbing match
# what we have in our DB?
if (
existing_contact.email != public_contact.email
or existing_contact.registry_id != public_contact.registry_id
):
existing_contact.delete()
public_contact.save()
logger.warning("Requested PublicContact is out of sync " "with DB.")
return public_contact
# If it already exists, we can
# assume that the DB instance was updated
# during set, so we should just use that.
return existing_contact
def _registrant_to_public_contact(self, registry_id: str):
"""EPPLib returns the registrant as a string,