diff --git a/src/registrar/models/domain.py b/src/registrar/models/domain.py index 0cbe8c071..27a8364bc 100644 --- a/src/registrar/models/domain.py +++ b/src/registrar/models/domain.py @@ -909,17 +909,11 @@ class Domain(TimeStampedModel, DomainHelper): """Time to renew. Not implemented.""" raise NotImplementedError() - def get_security_email(self, skip_epp_call=False): + def get_security_email(self): logger.info("get_security_email-> getting the contact") - # If specified, skip the epp call outright. - # Otherwise, proceed as normal. - if skip_epp_call: - logger.info("get_security_email-> skipping epp call") - security = PublicContact.ContactTypeChoices.SECURITY - security_contact = self.generic_contact_getter(security, skip_epp_call) - else: - security_contact = self.security_contact + security = PublicContact.ContactTypeChoices.SECURITY + security_contact = self.generic_contact_getter(security) # If we get a valid value for security_contact, pull its email # Otherwise, just return nothing @@ -1121,9 +1115,7 @@ class Domain(TimeStampedModel, DomainHelper): ) raise error - def generic_contact_getter( - self, contact_type_choice: PublicContact.ContactTypeChoices, skip_epp_call=False - ) -> PublicContact | None: + def generic_contact_getter(self, contact_type_choice: PublicContact.ContactTypeChoices) -> PublicContact | None: """Retrieves the desired PublicContact from the registry. This abstracts the caching and EPP retrieval for all contact items and thus may result in EPP calls being sent. @@ -1143,7 +1135,7 @@ class Domain(TimeStampedModel, DomainHelper): try: # Grab from cache - contacts = self._get_property(desired_property, skip_epp_call) + contacts = self._get_property(desired_property) except KeyError as error: # if contact type is security, attempt to retrieve registry id # for the security contact from domain.security_contact_registry_id @@ -1878,9 +1870,9 @@ class Domain(TimeStampedModel, DomainHelper): """Remove cache data when updates are made.""" self._cache = {} - def _get_property(self, property, skip_epp_call=False): + def _get_property(self, property): """Get some piece of info about a domain.""" - if property not in self._cache and not skip_epp_call: + if property not in self._cache: self._fetch_cache( fetch_hosts=(property == "hosts"), fetch_contacts=(property == "contacts"), diff --git a/src/registrar/utility/csv_export.py b/src/registrar/utility/csv_export.py index d87f97ad7..e2e31bc0f 100644 --- a/src/registrar/utility/csv_export.py +++ b/src/registrar/utility/csv_export.py @@ -8,6 +8,8 @@ from django.core.paginator import Paginator from django.db.models import F, Value, CharField from django.db.models.functions import Concat, Coalesce +from registrar.models.public_contact import PublicContact + logger = logging.getLogger(__name__) @@ -41,7 +43,7 @@ def get_domain_infos(filter_condition, sort_fields): return domain_infos_cleaned -def parse_row(columns, domain_info: DomainInformation, security_emails_dict=None, skip_epp_call=True): +def parse_row(columns, domain_info: DomainInformation, security_emails_dict=None): """Given a set of columns, generate a new row from cleaned column data""" # Domain should never be none when parsing this information @@ -51,13 +53,16 @@ def parse_row(columns, domain_info: DomainInformation, security_emails_dict=None domain = domain_info.domain # type: ignore # Grab the security email from a preset dictionary. - # If nothing exists in the dictionary, grab from get_security_email + # If nothing exists in the dictionary, grab from .contacts. if security_emails_dict is not None and domain.name in security_emails_dict: _email = security_emails_dict.get(domain.name) security_email = _email if _email is not None else " " else: - cached_sec_email = domain.get_security_email(skip_epp_call) - security_email = cached_sec_email if cached_sec_email is not None else " " + # If the dictionary doesn't contain that data, lets filter for it manually. + # This is a last resort as this is a more expensive operation. + security_contacts = domain_info.domain.contacts.filter(contact_type=PublicContact.ContactTypeChoices.SECURITY) + _email = security_contacts[0].email + security_email = _email if _email is not None else " " # These are default emails that should not be displayed in the csv report invalid_emails = {"registrar@dotgov.gov", "dotgov@cisa.dhs.gov"} @@ -104,16 +109,19 @@ def write_body( # Get the domainInfos all_domain_infos = get_domain_infos(filter_condition, sort_fields) - - # Populate a dictionary of domain names and their security contacts + + # Store all security emails to avoid epp calls or excessive filters + sec_contact_ids = list(all_domain_infos.values_list("domain__security_contact_registry_id", flat=True)) security_emails_dict = {} - for domain_info in all_domain_infos: - if domain_info not in security_emails_dict: - domain: Domain = domain_info.domain - if domain is not None: - security_emails_dict[domain.name] = domain.security_contact_registry_id + public_contacts = PublicContact.objects.filter(registry_id__in=sec_contact_ids) + + # Populate a dictionary of domain names and their security contacts + for contact in public_contacts: + domain: Domain = domain_info.domain + if domain is not None and domain.name not in security_emails_dict: + security_emails_dict[domain.name] = contact.email else: - logger.warning("csv_export -> Duplicate domain object found") + logger.warning("csv_export -> Domain was none for PublicContact") # Reduce the memory overhead when performing the write operation paginator = Paginator(all_domain_infos, 1000)