Refactor parts of the code

This commit is contained in:
Rebecca Hsieh 2024-02-12 15:26:31 -08:00
parent f1f91669d7
commit 3c48fb9f7e
No known key found for this signature in database

View file

@ -105,6 +105,51 @@ def parse_row(columns, domain_info: DomainInformation, security_emails_dict=None
return row
# def _check_domain_managers(domain_info, columns):
# max_dm_count = 0
# dm_count = len(domain_info.domain.permissions.all())
# if dm_count > max_dm_count:
# max_dm_count = dm_count
# for i in range(1, max_dm_count + 1):
# if f"Domain manager email {i}" not in columns:
# columns.append(f"Domain manager email {i}")
# return columns
def _get_security_emails(sec_contact_ids):
"""
Retrieve security contact emails for the given security contact IDs.
"""
security_emails_dict = {}
public_contacts = (
PublicContact.objects.only("email", "domain__name")
.select_related("domain")
.filter(registry_id__in=sec_contact_ids)
)
# Populate a dictionary of domain names and their security contacts
for contact in public_contacts:
domain: Domain = contact.domain
if domain is not None and domain.name not in security_emails_dict:
security_emails_dict[domain.name] = contact.email
else:
logger.warning("csv_export -> Domain was none for PublicContact")
return security_emails_dict
def update_columns_with_domain_managers(columns, max_dm_count):
"""
Update the columns list to include "Domain manager email" headers
based on the maximum domain manager count.
"""
for i in range(1, max_dm_count + 1):
if f"Domain manager email {i}" not in columns:
columns.append(f"Domain manager email {i}")
def write_body(
writer,
columns,
@ -127,24 +172,12 @@ def write_body(
# Store all security emails to avoid epp calls or excessive filters
sec_contact_ids = all_domain_infos.values_list("domain__security_contact_registry_id", flat=True)
security_emails_dict = {}
public_contacts = (
PublicContact.objects.only("email", "domain__name")
.select_related("domain")
.filter(registry_id__in=sec_contact_ids)
)
# Populate a dictionary of domain names and their security contacts
for contact in public_contacts:
domain: Domain = contact.domain
if domain is not None and domain.name not in security_emails_dict:
security_emails_dict[domain.name] = contact.email
else:
logger.warning("csv_export -> Domain was none for PublicContact")
security_emails_dict = _get_security_emails(sec_contact_ids)
# The maximum amount of domain managers an account has
# We get the max so we can set the column header accurately
max_dm_count = 0
# We get the max so we can set the column header accurately
paginator_ran = False
# Reduce the memory overhead when performing the write operation
paginator = Paginator(all_domain_infos, 1000)
@ -153,12 +186,11 @@ def write_body(
rows = []
for domain_info in page.object_list:
if get_domain_managers:
# _check_domain_managers(domain_info, columns)
dm_count = len(domain_info.domain.permissions.all())
if dm_count > max_dm_count:
max_dm_count = dm_count
for i in range(1, max_dm_count + 1):
if f"Domain manager email {i}" not in columns:
columns.append(f"Domain manager email {i}")
update_columns_with_domain_managers(columns, max_dm_count)
try:
row = parse_row(columns, domain_info, security_emails_dict, get_domain_managers)
rows.append(row)