diff --git a/src/registrar/utility/csv_export.py b/src/registrar/utility/csv_export.py index 242c997de..1c8f365cf 100644 --- a/src/registrar/utility/csv_export.py +++ b/src/registrar/utility/csv_export.py @@ -105,6 +105,51 @@ def parse_row(columns, domain_info: DomainInformation, security_emails_dict=None return row +# def _check_domain_managers(domain_info, columns): +# max_dm_count = 0 + +# dm_count = len(domain_info.domain.permissions.all()) +# if dm_count > max_dm_count: +# max_dm_count = dm_count +# for i in range(1, max_dm_count + 1): +# if f"Domain manager email {i}" not in columns: +# columns.append(f"Domain manager email {i}") + +# return columns + + +def _get_security_emails(sec_contact_ids): + """ + Retrieve security contact emails for the given security contact IDs. + """ + security_emails_dict = {} + public_contacts = ( + PublicContact.objects.only("email", "domain__name") + .select_related("domain") + .filter(registry_id__in=sec_contact_ids) + ) + + # Populate a dictionary of domain names and their security contacts + for contact in public_contacts: + domain: Domain = contact.domain + if domain is not None and domain.name not in security_emails_dict: + security_emails_dict[domain.name] = contact.email + else: + logger.warning("csv_export -> Domain was none for PublicContact") + + return security_emails_dict + + +def update_columns_with_domain_managers(columns, max_dm_count): + """ + Update the columns list to include "Domain manager email" headers + based on the maximum domain manager count. + """ + for i in range(1, max_dm_count + 1): + if f"Domain manager email {i}" not in columns: + columns.append(f"Domain manager email {i}") + + def write_body( writer, columns, @@ -127,24 +172,12 @@ def write_body( # Store all security emails to avoid epp calls or excessive filters sec_contact_ids = all_domain_infos.values_list("domain__security_contact_registry_id", flat=True) - security_emails_dict = {} - public_contacts = ( - PublicContact.objects.only("email", "domain__name") - .select_related("domain") - .filter(registry_id__in=sec_contact_ids) - ) - # Populate a dictionary of domain names and their security contacts - for contact in public_contacts: - domain: Domain = contact.domain - if domain is not None and domain.name not in security_emails_dict: - security_emails_dict[domain.name] = contact.email - else: - logger.warning("csv_export -> Domain was none for PublicContact") + security_emails_dict = _get_security_emails(sec_contact_ids) # The maximum amount of domain managers an account has - # We get the max so we can set the column header accurately max_dm_count = 0 + # We get the max so we can set the column header accurately paginator_ran = False # Reduce the memory overhead when performing the write operation paginator = Paginator(all_domain_infos, 1000) @@ -153,12 +186,11 @@ def write_body( rows = [] for domain_info in page.object_list: if get_domain_managers: + # _check_domain_managers(domain_info, columns) dm_count = len(domain_info.domain.permissions.all()) if dm_count > max_dm_count: max_dm_count = dm_count - for i in range(1, max_dm_count + 1): - if f"Domain manager email {i}" not in columns: - columns.append(f"Domain manager email {i}") + update_columns_with_domain_managers(columns, max_dm_count) try: row = parse_row(columns, domain_info, security_emails_dict, get_domain_managers) rows.append(row)