Remove skip epp call, use dict instead

This commit is contained in:
zandercymatics 2024-01-29 12:38:56 -07:00
parent 148ccbdf2a
commit 75e687f13b
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
2 changed files with 27 additions and 27 deletions

View file

@ -909,17 +909,11 @@ class Domain(TimeStampedModel, DomainHelper):
"""Time to renew. Not implemented.""" """Time to renew. Not implemented."""
raise NotImplementedError() raise NotImplementedError()
def get_security_email(self, skip_epp_call=False): def get_security_email(self):
logger.info("get_security_email-> getting the contact") logger.info("get_security_email-> getting the contact")
# If specified, skip the epp call outright.
# Otherwise, proceed as normal.
if skip_epp_call:
logger.info("get_security_email-> skipping epp call")
security = PublicContact.ContactTypeChoices.SECURITY security = PublicContact.ContactTypeChoices.SECURITY
security_contact = self.generic_contact_getter(security, skip_epp_call) security_contact = self.generic_contact_getter(security)
else:
security_contact = self.security_contact
# If we get a valid value for security_contact, pull its email # If we get a valid value for security_contact, pull its email
# Otherwise, just return nothing # Otherwise, just return nothing
@ -1121,9 +1115,7 @@ class Domain(TimeStampedModel, DomainHelper):
) )
raise error raise error
def generic_contact_getter( def generic_contact_getter(self, contact_type_choice: PublicContact.ContactTypeChoices) -> PublicContact | None:
self, contact_type_choice: PublicContact.ContactTypeChoices, skip_epp_call=False
) -> PublicContact | None:
"""Retrieves the desired PublicContact from the registry. """Retrieves the desired PublicContact from the registry.
This abstracts the caching and EPP retrieval for This abstracts the caching and EPP retrieval for
all contact items and thus may result in EPP calls being sent. all contact items and thus may result in EPP calls being sent.
@ -1143,7 +1135,7 @@ class Domain(TimeStampedModel, DomainHelper):
try: try:
# Grab from cache # Grab from cache
contacts = self._get_property(desired_property, skip_epp_call) contacts = self._get_property(desired_property)
except KeyError as error: except KeyError as error:
# if contact type is security, attempt to retrieve registry id # if contact type is security, attempt to retrieve registry id
# for the security contact from domain.security_contact_registry_id # for the security contact from domain.security_contact_registry_id
@ -1878,9 +1870,9 @@ class Domain(TimeStampedModel, DomainHelper):
"""Remove cache data when updates are made.""" """Remove cache data when updates are made."""
self._cache = {} self._cache = {}
def _get_property(self, property, skip_epp_call=False): def _get_property(self, property):
"""Get some piece of info about a domain.""" """Get some piece of info about a domain."""
if property not in self._cache and not skip_epp_call: if property not in self._cache:
self._fetch_cache( self._fetch_cache(
fetch_hosts=(property == "hosts"), fetch_hosts=(property == "hosts"),
fetch_contacts=(property == "contacts"), fetch_contacts=(property == "contacts"),

View file

@ -8,6 +8,8 @@ from django.core.paginator import Paginator
from django.db.models import F, Value, CharField from django.db.models import F, Value, CharField
from django.db.models.functions import Concat, Coalesce from django.db.models.functions import Concat, Coalesce
from registrar.models.public_contact import PublicContact
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -41,7 +43,7 @@ def get_domain_infos(filter_condition, sort_fields):
return domain_infos_cleaned return domain_infos_cleaned
def parse_row(columns, domain_info: DomainInformation, security_emails_dict=None, skip_epp_call=True): def parse_row(columns, domain_info: DomainInformation, security_emails_dict=None):
"""Given a set of columns, generate a new row from cleaned column data""" """Given a set of columns, generate a new row from cleaned column data"""
# Domain should never be none when parsing this information # Domain should never be none when parsing this information
@ -51,13 +53,16 @@ def parse_row(columns, domain_info: DomainInformation, security_emails_dict=None
domain = domain_info.domain # type: ignore domain = domain_info.domain # type: ignore
# Grab the security email from a preset dictionary. # Grab the security email from a preset dictionary.
# If nothing exists in the dictionary, grab from get_security_email # If nothing exists in the dictionary, grab from .contacts.
if security_emails_dict is not None and domain.name in security_emails_dict: if security_emails_dict is not None and domain.name in security_emails_dict:
_email = security_emails_dict.get(domain.name) _email = security_emails_dict.get(domain.name)
security_email = _email if _email is not None else " " security_email = _email if _email is not None else " "
else: else:
cached_sec_email = domain.get_security_email(skip_epp_call) # If the dictionary doesn't contain that data, lets filter for it manually.
security_email = cached_sec_email if cached_sec_email is not None else " " # This is a last resort as this is a more expensive operation.
security_contacts = domain_info.domain.contacts.filter(contact_type=PublicContact.ContactTypeChoices.SECURITY)
_email = security_contacts[0].email
security_email = _email if _email is not None else " "
# These are default emails that should not be displayed in the csv report # These are default emails that should not be displayed in the csv report
invalid_emails = {"registrar@dotgov.gov", "dotgov@cisa.dhs.gov"} invalid_emails = {"registrar@dotgov.gov", "dotgov@cisa.dhs.gov"}
@ -105,15 +110,18 @@ def write_body(
# Get the domainInfos # Get the domainInfos
all_domain_infos = get_domain_infos(filter_condition, sort_fields) all_domain_infos = get_domain_infos(filter_condition, sort_fields)
# Populate a dictionary of domain names and their security contacts # Store all security emails to avoid epp calls or excessive filters
sec_contact_ids = list(all_domain_infos.values_list("domain__security_contact_registry_id", flat=True))
security_emails_dict = {} security_emails_dict = {}
for domain_info in all_domain_infos: public_contacts = PublicContact.objects.filter(registry_id__in=sec_contact_ids)
if domain_info not in security_emails_dict:
# Populate a dictionary of domain names and their security contacts
for contact in public_contacts:
domain: Domain = domain_info.domain domain: Domain = domain_info.domain
if domain is not None: if domain is not None and domain.name not in security_emails_dict:
security_emails_dict[domain.name] = domain.security_contact_registry_id security_emails_dict[domain.name] = contact.email
else: else:
logger.warning("csv_export -> Duplicate domain object found") logger.warning("csv_export -> Domain was none for PublicContact")
# Reduce the memory overhead when performing the write operation # Reduce the memory overhead when performing the write operation
paginator = Paginator(all_domain_infos, 1000) paginator = Paginator(all_domain_infos, 1000)