This commit is contained in:
Rachid Mrad 2024-04-02 21:49:42 -04:00
parent c5e6295a8a
commit 77d1158c1b
No known key found for this signature in database
2 changed files with 71 additions and 72 deletions

View file

@ -9,10 +9,10 @@ from registrar.utility.csv_export import (
export_data_unmanaged_domains_to_csv, export_data_unmanaged_domains_to_csv,
get_sliced_domains, get_sliced_domains,
get_sliced_requests, get_sliced_requests,
write_domains_csv, write_csv_for_domains,
get_default_start_date, get_default_start_date,
get_default_end_date, get_default_end_date,
write_requests_csv, write_csv_for_requests,
) )
from django.core.management import call_command from django.core.management import call_command
@ -242,7 +242,7 @@ class ExportDataTest(MockDb, MockEppLib):
} }
self.maxDiff = None self.maxDiff = None
# Call the export functions # Call the export functions
write_domains_csv( write_csv_for_domains(
writer, writer,
columns, columns,
sort_fields, sort_fields,
@ -273,7 +273,7 @@ class ExportDataTest(MockDb, MockEppLib):
expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip()
self.assertEqual(csv_content, expected_content) self.assertEqual(csv_content, expected_content)
def test_write_domains_csv(self): def test_write_csv_for_domains(self):
"""Test that write_body returns the """Test that write_body returns the
existing domain, test that sort by domain name works, existing domain, test that sort by domain name works,
test that filter works""" test that filter works"""
@ -309,7 +309,7 @@ class ExportDataTest(MockDb, MockEppLib):
], ],
} }
# Call the export functions # Call the export functions
write_domains_csv( write_csv_for_domains(
writer, writer,
columns, columns,
sort_fields, sort_fields,
@ -367,7 +367,7 @@ class ExportDataTest(MockDb, MockEppLib):
], ],
} }
# Call the export functions # Call the export functions
write_domains_csv( write_csv_for_domains(
writer, writer,
columns, columns,
sort_fields, sort_fields,
@ -448,7 +448,7 @@ class ExportDataTest(MockDb, MockEppLib):
} }
# Call the export functions # Call the export functions
write_domains_csv( write_csv_for_domains(
writer, writer,
columns, columns,
sort_fields, sort_fields,
@ -456,7 +456,7 @@ class ExportDataTest(MockDb, MockEppLib):
should_get_domain_managers=False, should_get_domain_managers=False,
should_write_header=True, should_write_header=True,
) )
write_domains_csv( write_csv_for_domains(
writer, writer,
columns, columns,
sort_fields_for_deleted_domains, sort_fields_for_deleted_domains,
@ -528,7 +528,7 @@ class ExportDataTest(MockDb, MockEppLib):
} }
self.maxDiff = None self.maxDiff = None
# Call the export functions # Call the export functions
write_domains_csv( write_csv_for_domains(
writer, writer,
columns, columns,
sort_fields, sort_fields,
@ -673,7 +673,7 @@ class ExportDataTest(MockDb, MockEppLib):
"submission_date__lte": self.end_date, "submission_date__lte": self.end_date,
"submission_date__gte": self.start_date, "submission_date__gte": self.start_date,
} }
write_requests_csv(writer, columns, sort_fields, filter_condition, should_write_header=True) write_csv_for_requests(writer, columns, sort_fields, filter_condition, should_write_header=True)
# Reset the CSV file's position to the beginning # Reset the CSV file's position to the beginning
csv_file.seek(0) csv_file.seek(0)
# Read the content into a variable # Read the content into a variable

View file

@ -12,7 +12,6 @@ from django.db.models.functions import Concat, Coalesce
from registrar.models.public_contact import PublicContact from registrar.models.public_contact import PublicContact
from registrar.models.user_domain_role import UserDomainRole from registrar.models.user_domain_role import UserDomainRole
from registrar.models.utility.generic_helper import Timer
from registrar.utility.enums import DefaultEmail from registrar.utility.enums import DefaultEmail
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -54,7 +53,7 @@ def get_domain_infos(filter_condition, sort_fields):
return domain_infos_cleaned return domain_infos_cleaned
def parse_domain_row( def parse_row_for_domain(
columns, columns,
domain_info: DomainInformation, domain_info: DomainInformation,
dict_security_emails_dict=None, dict_security_emails_dict=None,
@ -231,7 +230,8 @@ def build_dictionaries_for_domain_managers(dict_user_domain_roles, dict_domain_i
return dict_user_domain_roles, dict_domain_invitations_with_invited_status return dict_user_domain_roles, dict_domain_invitations_with_invited_status
def write_domains_csv(
def write_csv_for_domains(
writer, writer,
columns, columns,
sort_fields, sort_fields,
@ -246,59 +246,58 @@ def write_domains_csv(
should_write_header: Conditional bc export_data_domain_growth_to_csv calls write_body twice should_write_header: Conditional bc export_data_domain_growth_to_csv calls write_body twice
""" """
with Timer(): # Retrieve domain information and all sec emails
# Retrieve domain information and all sec emails all_domain_infos = get_domain_infos(filter_condition, sort_fields)
all_domain_infos = get_domain_infos(filter_condition, sort_fields) sec_contact_ids = all_domain_infos.values_list("domain__security_contact_registry_id", flat=True)
sec_contact_ids = all_domain_infos.values_list("domain__security_contact_registry_id", flat=True) dict_security_emails_dict = _get_security_emails(sec_contact_ids)
dict_security_emails_dict = _get_security_emails(sec_contact_ids) paginator = Paginator(all_domain_infos, 1000)
paginator = Paginator(all_domain_infos, 1000)
# Initialize variables # Initialize variables
dms_total = 0 dms_total = 0
should_update_columns = False should_update_columns = False
total_body_rows = [] total_body_rows = []
dict_user_domain_roles = {} dict_user_domain_roles = {}
dict_domain_invitations_with_invited_status = {} dict_domain_invitations_with_invited_status = {}
# Build dictionaries if necessary # Build dictionaries if necessary
if should_get_domain_managers: if should_get_domain_managers:
dict_user_domain_roles, dict_domain_invitations_with_invited_status = build_dictionaries_for_domain_managers( dict_user_domain_roles, dict_domain_invitations_with_invited_status = build_dictionaries_for_domain_managers(
dict_user_domain_roles, dict_domain_invitations_with_invited_status dict_user_domain_roles, dict_domain_invitations_with_invited_status
) )
# Process domain information # Process domain information
for page_num in paginator.page_range: for page_num in paginator.page_range:
rows = [] rows = []
page = paginator.page(page_num) page = paginator.page(page_num)
for domain_info in page.object_list: for domain_info in page.object_list:
if should_get_domain_managers: if should_get_domain_managers:
columns, dms_total, should_update_columns = update_columns_with_domain_managers( columns, dms_total, should_update_columns = update_columns_with_domain_managers(
columns, columns,
domain_info, domain_info,
should_update_columns, should_update_columns,
dms_total, dms_total,
dict_domain_invitations_with_invited_status, dict_domain_invitations_with_invited_status,
dict_user_domain_roles, dict_user_domain_roles,
) )
try: try:
row = parse_domain_row( row = parse_row_for_domain(
columns, columns,
domain_info, domain_info,
dict_security_emails_dict, dict_security_emails_dict,
should_get_domain_managers, should_get_domain_managers,
dict_domain_invitations_with_invited_status, dict_domain_invitations_with_invited_status,
dict_user_domain_roles, dict_user_domain_roles,
) )
rows.append(row) rows.append(row)
except ValueError: except ValueError:
logger.error("csv_export -> Error when parsing row, domain was None") logger.error("csv_export -> Error when parsing row, domain was None")
continue continue
total_body_rows.extend(rows) total_body_rows.extend(rows)
if should_write_header: if should_write_header:
write_header(writer, columns) write_header(writer, columns)
writer.writerows(total_body_rows) writer.writerows(total_body_rows)
def get_requests(filter_condition, sort_fields): def get_requests(filter_condition, sort_fields):
@ -312,7 +311,7 @@ def get_requests(filter_condition, sort_fields):
return requests return requests
def parse_request_row(columns, request: DomainRequest): def parse_row_for_requests(columns, request: DomainRequest):
"""Given a set of columns, generate a new row from cleaned column data""" """Given a set of columns, generate a new row from cleaned column data"""
requested_domain_name = "No requested domain" requested_domain_name = "No requested domain"
@ -344,7 +343,7 @@ def parse_request_row(columns, request: DomainRequest):
return row return row
def write_requests_csv( def write_csv_for_requests(
writer, writer,
columns, columns,
sort_fields, sort_fields,
@ -365,7 +364,7 @@ def write_requests_csv(
rows = [] rows = []
for request in page.object_list: for request in page.object_list:
try: try:
row = parse_request_row(columns, request) row = parse_row_for_requests(columns, request)
rows.append(row) rows.append(row)
except ValueError: except ValueError:
# This should not happen. If it does, just skip this row. # This should not happen. If it does, just skip this row.
@ -413,7 +412,7 @@ def export_data_type_to_csv(csv_file):
Domain.State.ON_HOLD, Domain.State.ON_HOLD,
], ],
} }
write_domains_csv( write_csv_for_domains(
writer, columns, sort_fields, filter_condition, should_get_domain_managers=True, should_write_header=True writer, columns, sort_fields, filter_condition, should_get_domain_managers=True, should_write_header=True
) )
@ -446,7 +445,7 @@ def export_data_full_to_csv(csv_file):
Domain.State.ON_HOLD, Domain.State.ON_HOLD,
], ],
} }
write_domains_csv( write_csv_for_domains(
writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True
) )
@ -480,7 +479,7 @@ def export_data_federal_to_csv(csv_file):
Domain.State.ON_HOLD, Domain.State.ON_HOLD,
], ],
} }
write_domains_csv( write_csv_for_domains(
writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True
) )
@ -550,10 +549,10 @@ def export_data_domain_growth_to_csv(csv_file, start_date, end_date):
"domain__deleted__gte": start_date_formatted, "domain__deleted__gte": start_date_formatted,
} }
write_domains_csv( write_csv_for_domains(
writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True
) )
write_domains_csv( write_csv_for_domains(
writer, writer,
columns, columns,
sort_fields_for_deleted_domains, sort_fields_for_deleted_domains,
@ -698,7 +697,7 @@ def export_data_managed_domains_to_csv(csv_file, start_date, end_date):
writer.writerow(managed_domains_sliced_at_end_date) writer.writerow(managed_domains_sliced_at_end_date)
writer.writerow([]) writer.writerow([])
write_domains_csv( write_csv_for_domains(
writer, writer,
columns, columns,
sort_fields, sort_fields,
@ -771,7 +770,7 @@ def export_data_unmanaged_domains_to_csv(csv_file, start_date, end_date):
writer.writerow(unmanaged_domains_sliced_at_end_date) writer.writerow(unmanaged_domains_sliced_at_end_date)
writer.writerow([]) writer.writerow([])
write_domains_csv( write_csv_for_domains(
writer, writer,
columns, columns,
sort_fields, sort_fields,
@ -807,4 +806,4 @@ def export_data_requests_growth_to_csv(csv_file, start_date, end_date):
"submission_date__gte": start_date_formatted, "submission_date__gte": start_date_formatted,
} }
write_requests_csv(writer, columns, sort_fields, filter_condition, should_write_header=True) write_csv_for_requests(writer, columns, sort_fields, filter_condition, should_write_header=True)