Grab sec emails from dict

This commit is contained in:
zandercymatics 2024-01-29 10:52:38 -07:00
parent d66899164d
commit 619cadb953
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7

View file

@ -41,7 +41,7 @@ def get_domain_infos(filter_condition, sort_fields):
return domain_infos_cleaned
def parse_row(columns, domain_info: DomainInformation, skip_epp_call=True):
def parse_row(columns, domain_info: DomainInformation, security_emails_dict=None, skip_epp_call=True):
"""Given a set of columns, generate a new row from cleaned column data"""
# Domain should never be none when parsing this information
@ -50,11 +50,17 @@ def parse_row(columns, domain_info: DomainInformation, skip_epp_call=True):
domain = domain_info.domain # type: ignore
cached_sec_email = domain.get_security_email(skip_epp_call)
security_email = cached_sec_email if cached_sec_email is not None else " "
# Grab the security email from a preset dictionary.
# If nothing exists in the dictionary, grab from get_security_email
if security_emails_dict is not None and domain.name in security_emails_dict:
_email = security_emails_dict.get(domain.name)
security_email = _email if _email is not None else " "
else:
cached_sec_email = domain.get_security_email(skip_epp_call)
security_email = cached_sec_email if cached_sec_email is not None else " "
invalid_emails = {"registrar@dotgov.gov", "dotgov@cisa.dhs.gov"}
# These are default emails that should not be displayed in the csv report
invalid_emails = {"registrar@dotgov.gov", "dotgov@cisa.dhs.gov"}
if security_email.lower() in invalid_emails:
security_email = "(blank)"
@ -98,6 +104,16 @@ def write_body(
# Get the domainInfos
all_domain_infos = get_domain_infos(filter_condition, sort_fields)
# Populate a dictionary of domain names and their security contacts
security_emails_dict = {}
for domain_info in all_domain_infos:
if domain_info not in security_emails_dict:
domain: Domain = domain_info.domain
if domain is not None:
security_emails_dict[domain.name] = domain.security_contact_registry_id
else:
logger.warning("csv_export -> Duplicate domain object found")
# Reduce the memory overhead when performing the write operation
paginator = Paginator(all_domain_infos, 1000)
@ -106,7 +122,7 @@ def write_body(
rows = []
for domain_info in page.object_list:
try:
row = parse_row(columns, domain_info)
row = parse_row(columns, domain_info, security_emails_dict)
rows.append(row)
except ValueError:
# This should not happen. If it does, just skip this row.