refactor wip

This commit is contained in:
Rachid Mrad 2024-04-02 20:54:43 -04:00
parent 045140091b
commit c5e6295a8a
No known key found for this signature in database
3 changed files with 162 additions and 134 deletions

View file

@ -243,7 +243,12 @@ class ExportDataTest(MockDb, MockEppLib):
self.maxDiff = None self.maxDiff = None
# Call the export functions # Call the export functions
write_domains_csv( write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True writer,
columns,
sort_fields,
filter_condition,
should_get_domain_managers=False,
should_write_header=True,
) )
# Reset the CSV file's position to the beginning # Reset the CSV file's position to the beginning
@ -305,7 +310,12 @@ class ExportDataTest(MockDb, MockEppLib):
} }
# Call the export functions # Call the export functions
write_domains_csv( write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True writer,
columns,
sort_fields,
filter_condition,
should_get_domain_managers=False,
should_write_header=True,
) )
# Reset the CSV file's position to the beginning # Reset the CSV file's position to the beginning
csv_file.seek(0) csv_file.seek(0)
@ -358,7 +368,12 @@ class ExportDataTest(MockDb, MockEppLib):
} }
# Call the export functions # Call the export functions
write_domains_csv( write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True writer,
columns,
sort_fields,
filter_condition,
should_get_domain_managers=False,
should_write_header=True,
) )
# Reset the CSV file's position to the beginning # Reset the CSV file's position to the beginning
csv_file.seek(0) csv_file.seek(0)
@ -438,7 +453,7 @@ class ExportDataTest(MockDb, MockEppLib):
columns, columns,
sort_fields, sort_fields,
filter_condition, filter_condition,
get_domain_managers=False, should_get_domain_managers=False,
should_write_header=True, should_write_header=True,
) )
write_domains_csv( write_domains_csv(
@ -446,7 +461,7 @@ class ExportDataTest(MockDb, MockEppLib):
columns, columns,
sort_fields_for_deleted_domains, sort_fields_for_deleted_domains,
filter_conditions_for_deleted_domains, filter_conditions_for_deleted_domains,
get_domain_managers=False, should_get_domain_managers=False,
should_write_header=False, should_write_header=False,
) )
# Reset the CSV file's position to the beginning # Reset the CSV file's position to the beginning
@ -514,7 +529,12 @@ class ExportDataTest(MockDb, MockEppLib):
self.maxDiff = None self.maxDiff = None
# Call the export functions # Call the export functions
write_domains_csv( write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=True, should_write_header=True writer,
columns,
sort_fields,
filter_condition,
should_get_domain_managers=True,
should_write_header=True,
) )
# Reset the CSV file's position to the beginning # Reset the CSV file's position to the beginning
@ -697,13 +717,8 @@ class HelperFunctions(MockDb):
"domain__first_ready__lte": self.end_date, "domain__first_ready__lte": self.end_date,
} }
# Test with distinct # Test with distinct
managed_domains_sliced_at_end_date = get_sliced_domains(filter_condition, True)
expected_content = [3, 2, 1, 0, 0, 0, 0, 0, 0, 2]
self.assertEqual(managed_domains_sliced_at_end_date, expected_content)
# Test without distinct
managed_domains_sliced_at_end_date = get_sliced_domains(filter_condition) managed_domains_sliced_at_end_date = get_sliced_domains(filter_condition)
expected_content = [3, 4, 1, 0, 0, 0, 0, 0, 0, 2] expected_content = [3, 2, 1, 0, 0, 0, 0, 0, 0, 2]
self.assertEqual(managed_domains_sliced_at_end_date, expected_content) self.assertEqual(managed_domains_sliced_at_end_date, expected_content)
def test_get_sliced_requests(self): def test_get_sliced_requests(self):

View file

@ -7,7 +7,7 @@ from registrar.models.domain_request import DomainRequest
from registrar.models.domain_information import DomainInformation from registrar.models.domain_information import DomainInformation
from django.utils import timezone from django.utils import timezone
from django.core.paginator import Paginator from django.core.paginator import Paginator
from django.db.models import F, Value, CharField, Q, Count from django.db.models import F, Value, CharField
from django.db.models.functions import Concat, Coalesce from django.db.models.functions import Concat, Coalesce
from registrar.models.public_contact import PublicContact from registrar.models.public_contact import PublicContact
@ -54,7 +54,14 @@ def get_domain_infos(filter_condition, sort_fields):
return domain_infos_cleaned return domain_infos_cleaned
def parse_domain_row(columns, domain_info: DomainInformation, security_emails_dict=None, get_domain_managers=False, domain_invitation_emails=None, domain_permissions_emails=None): def parse_domain_row(
columns,
domain_info: DomainInformation,
dict_security_emails_dict=None,
should_get_domain_managers=False,
dict_domain_invitations_with_invited_status=None,
dict_user_domain_roles=None,
):
"""Given a set of columns, generate a new row from cleaned column data""" """Given a set of columns, generate a new row from cleaned column data"""
# Domain should never be none when parsing this information # Domain should never be none when parsing this information
@ -66,8 +73,8 @@ def parse_domain_row(columns, domain_info: DomainInformation, security_emails_di
# Grab the security email from a preset dictionary. # Grab the security email from a preset dictionary.
# If nothing exists in the dictionary, grab from .contacts. # If nothing exists in the dictionary, grab from .contacts.
if security_emails_dict is not None and domain.name in security_emails_dict: if dict_security_emails_dict is not None and domain.name in dict_security_emails_dict:
_email = security_emails_dict.get(domain.name) _email = dict_security_emails_dict.get(domain.name)
security_email = _email if _email is not None else " " security_email = _email if _email is not None else " "
else: else:
# If the dictionary doesn't contain that data, lets filter for it manually. # If the dictionary doesn't contain that data, lets filter for it manually.
@ -104,20 +111,20 @@ def parse_domain_row(columns, domain_info: DomainInformation, security_emails_di
"Deleted": domain.deleted, "Deleted": domain.deleted,
} }
if get_domain_managers: if should_get_domain_managers:
# Get lists of emails for active and invited domain managers # Get lists of emails for active and invited domain managers
dm_active_emails = domain_permissions_emails.get(domain_info.domain.name, []) dms_active_emails = dict_user_domain_roles.get(domain_info.domain.name, [])
dm_invited_emails = domain_invitation_emails.get(domain_info.domain.name, []) dms_invited_emails = dict_domain_invitations_with_invited_status.get(domain_info.domain.name, [])
# Set up the "matching headers" + row field data for email and status # Set up the "matching headers" + row field data for email and status
i = 0 # Declare i outside of the loop to avoid a reference before assignment in the second loop i = 0 # Declare i outside of the loop to avoid a reference before assignment in the second loop
for i, dm_email in enumerate(dm_active_emails, start=1): for i, dm_email in enumerate(dms_active_emails, start=1):
FIELDS[f"Domain manager {i}"] = dm_email FIELDS[f"Domain manager {i}"] = dm_email
FIELDS[f"DM{i} status"] = "R" FIELDS[f"DM{i} status"] = "R"
# Continue enumeration from where we left off and add data for invited domain managers # Continue enumeration from where we left off and add data for invited domain managers
for j, dm_email in enumerate(dm_invited_emails, start=i + 1): for j, dm_email in enumerate(dms_invited_emails, start=i + 1):
FIELDS[f"Domain manager {j}"] = dm_email FIELDS[f"Domain manager {j}"] = dm_email
FIELDS[f"DM{j} status"] = "I" FIELDS[f"DM{j} status"] = "I"
@ -129,7 +136,7 @@ def _get_security_emails(sec_contact_ids):
""" """
Retrieve security contact emails for the given security contact IDs. Retrieve security contact emails for the given security contact IDs.
""" """
security_emails_dict = {} dict_security_emails_dict = {}
public_contacts = ( public_contacts = (
PublicContact.objects.only("email", "domain__name") PublicContact.objects.only("email", "domain__name")
.select_related("domain") .select_related("domain")
@ -139,144 +146,152 @@ def _get_security_emails(sec_contact_ids):
# Populate a dictionary of domain names and their security contacts # Populate a dictionary of domain names and their security contacts
for contact in public_contacts: for contact in public_contacts:
domain: Domain = contact.domain domain: Domain = contact.domain
if domain is not None and domain.name not in security_emails_dict: if domain is not None and domain.name not in dict_security_emails_dict:
security_emails_dict[domain.name] = contact.email dict_security_emails_dict[domain.name] = contact.email
else: else:
logger.warning("csv_export -> Domain was none for PublicContact") logger.warning("csv_export -> Domain was none for PublicContact")
return security_emails_dict return dict_security_emails_dict
def count_domain_managers(domain_name, dict_domain_invitations_with_invited_status, dict_user_domain_roles):
"""Count active and invited domain managers"""
dms_active = len(dict_user_domain_roles.get(domain_name, []))
dms_invited = len(dict_domain_invitations_with_invited_status.get(domain_name, []))
return dms_active, dms_invited
def update_columns(columns, dms_total, should_update_columns):
"""Update columns if necessary"""
if should_update_columns:
for i in range(1, dms_total + 1):
email_column_header = f"Domain manager {i}"
status_column_header = f"DM{i} status"
if email_column_header not in columns:
columns.append(email_column_header)
columns.append(status_column_header)
should_update_columns = False
return columns, should_update_columns, dms_total
def update_columns_with_domain_managers( def update_columns_with_domain_managers(
domain_info,domain_invitation_emails, domain_permissions_emails, update_columns, columns, max_dm_total columns,
domain_info,
should_update_columns,
dms_total,
dict_domain_invitations_with_invited_status,
dict_user_domain_roles,
): ):
"""Helper function that works with 'global' variables set in write_domains_csv """Helper function to update columns with domain manager information"""
Accepts:
domain_info -> Domains to parse
update_columns -> A control to make sure we only run the columns test and update when needed
columns -> The header cells in the csv that's under construction
max_dm_total -> Starts at 0 and gets updated and passed again through this method
Returns:
Updated update_columns, columns, max_dm_active, max_dm_invited, max_dm_total"""
dm_active = 0 domain_name = domain_info.domain.name
dm_invited = 0
try:
# logger.info(f'domain_invitation_emails {domain_invitation_emails[domain_info.domain.name]}')
# Get the list of invitation emails for the domain name if it exists, otherwise, return an empty list
invitation_emails = domain_invitation_emails.get(domain_info.domain.name, [])
# Count the number of invitation emails
dm_invited = len(invitation_emails)
except KeyError:
pass
try: try:
active_emails = domain_permissions_emails.get(domain_info.domain.name, []) dms_active, dms_invited = count_domain_managers(
# Count the number of invitation emails domain_name, dict_domain_invitations_with_invited_status, dict_user_domain_roles
dm_active = len(active_emails) )
except KeyError:
pass if dms_active + dms_invited > dms_total:
dms_total = dms_active + dms_invited
should_update_columns = True
except Exception as err:
logger.error(f"Exception while parsing domain managers for reports: {err}")
return update_columns(columns, dms_total, should_update_columns)
if dm_active + dm_invited > max_dm_total: def build_dictionaries_for_domain_managers(dict_user_domain_roles, dict_domain_invitations_with_invited_status):
max_dm_total = dm_active + dm_invited """Helper function that builds dicts for invited users and active domain
update_columns = True managers. We do so to avoid filtering within loops."""
if update_columns: user_domain_roles = None
for i in range(1, max_dm_total + 1): user_domain_roles = UserDomainRole.objects.all()
column_name = f"Domain manager {i}"
column2_name = f"DM{i} status"
if column_name not in columns:
columns.append(column_name)
columns.append(column2_name)
update_columns = False
return update_columns, columns, max_dm_total # Iterate through each user domain role and populate the dictionary
for user_domain_role in user_domain_roles:
domain_name = user_domain_role.domain.name
email = user_domain_role.user.email
if domain_name not in dict_user_domain_roles:
dict_user_domain_roles[domain_name] = []
dict_user_domain_roles[domain_name].append(email)
domain_invitations_with_invited_status = None
domain_invitations_with_invited_status = DomainInvitation.objects.filter(
status=DomainInvitation.DomainInvitationStatus.INVITED
).select_related("domain")
# Iterate through each domain invitation and populate the dictionary
for invite in domain_invitations_with_invited_status:
domain_name = invite.domain.name
email = invite.email
if domain_name not in dict_domain_invitations_with_invited_status:
dict_domain_invitations_with_invited_status[domain_name] = []
dict_domain_invitations_with_invited_status[domain_name].append(email)
return dict_user_domain_roles, dict_domain_invitations_with_invited_status
def write_domains_csv( def write_domains_csv(
writer, writer,
columns, columns,
sort_fields, sort_fields,
filter_condition, filter_condition,
get_domain_managers=False, should_get_domain_managers=False,
should_write_header=True, should_write_header=True,
): ):
""" """
Receives params from the parent methods and outputs a CSV with filtered and sorted domains. Receives params from the parent methods and outputs a CSV with filtered and sorted domains.
Works with write_header as long as the same writer object is passed. Works with write_header as long as the same writer object is passed.
get_domain_managers: Conditional bc we only use domain manager info for export_data_full_to_csv should_get_domain_managers: Conditional bc we only use domain manager info for export_data_full_to_csv
should_write_header: Conditional bc export_data_domain_growth_to_csv calls write_body twice should_write_header: Conditional bc export_data_domain_growth_to_csv calls write_body twice
""" """
with Timer(): with Timer():
# Retrieve domain information and all sec emails
all_domain_infos = get_domain_infos(filter_condition, sort_fields) all_domain_infos = get_domain_infos(filter_condition, sort_fields)
# Store all security emails to avoid epp calls or excessive filters
sec_contact_ids = all_domain_infos.values_list("domain__security_contact_registry_id", flat=True) sec_contact_ids = all_domain_infos.values_list("domain__security_contact_registry_id", flat=True)
dict_security_emails_dict = _get_security_emails(sec_contact_ids)
security_emails_dict = _get_security_emails(sec_contact_ids)
# Reduce the memory overhead when performing the write operation
paginator = Paginator(all_domain_infos, 1000) paginator = Paginator(all_domain_infos, 1000)
# We get the number of domain managers (DMs) an the domain # Initialize variables
# that has the most DMs so we can set the header row appropriately dms_total = 0
should_update_columns = False
max_dm_total = 0
update_columns = False
invites_with_invited_status=None
domain_invitation_emails = {}
domain_permissions_emails = {}
if get_domain_managers:
invites_with_invited_status = DomainInvitation.objects.filter(status=DomainInvitation.DomainInvitationStatus.INVITED).select_related("domain")
# Iterate through each domain invitation and populate the dictionary
for invite in invites_with_invited_status:
domain_name = invite.domain.name
email = invite.email
if domain_name not in domain_invitation_emails:
domain_invitation_emails[domain_name] = []
domain_invitation_emails[domain_name].append(email)
domain_permissions = UserDomainRole.objects.all()
# Iterate through each domain invitation and populate the dictionary
for permission in domain_permissions:
domain_name = permission.domain.name
email = permission.user.email
if domain_name not in domain_permissions_emails:
domain_permissions_emails[domain_name] = []
domain_permissions_emails[domain_name].append(email)
# logger.info(f'domain_invitation_emails {domain_invitation_emails}')
# This var will live outside of the nested for loops to aggregate
# the data from those loops
total_body_rows = [] total_body_rows = []
dict_user_domain_roles = {}
dict_domain_invitations_with_invited_status = {}
# Build dictionaries if necessary
if should_get_domain_managers:
dict_user_domain_roles, dict_domain_invitations_with_invited_status = build_dictionaries_for_domain_managers(
dict_user_domain_roles, dict_domain_invitations_with_invited_status
)
# Process domain information
for page_num in paginator.page_range: for page_num in paginator.page_range:
rows = [] rows = []
page = paginator.page(page_num) page = paginator.page(page_num)
for domain_info in page.object_list: for domain_info in page.object_list:
if should_get_domain_managers:
# Get max number of domain managers columns, dms_total, should_update_columns = update_columns_with_domain_managers(
if get_domain_managers: columns,
update_columns, columns, max_dm_total = ( domain_info,
update_columns_with_domain_managers( should_update_columns,
domain_info, domain_invitation_emails,domain_permissions_emails, update_columns, columns, max_dm_total dms_total,
) dict_domain_invitations_with_invited_status,
dict_user_domain_roles,
) )
try: try:
row = parse_domain_row(columns, domain_info, security_emails_dict, get_domain_managers, domain_invitation_emails,domain_permissions_emails) row = parse_domain_row(
columns,
domain_info,
dict_security_emails_dict,
should_get_domain_managers,
dict_domain_invitations_with_invited_status,
dict_user_domain_roles,
)
rows.append(row) rows.append(row)
except ValueError: except ValueError:
# This should not happen. If it does, just skip this row.
# It indicates that DomainInformation.domain is None.
logger.error("csv_export -> Error when parsing row, domain was None") logger.error("csv_export -> Error when parsing row, domain was None")
continue continue
total_body_rows.extend(rows) total_body_rows.extend(rows)
@ -399,7 +414,7 @@ def export_data_type_to_csv(csv_file):
], ],
} }
write_domains_csv( write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=True, should_write_header=True writer, columns, sort_fields, filter_condition, should_get_domain_managers=True, should_write_header=True
) )
@ -432,7 +447,7 @@ def export_data_full_to_csv(csv_file):
], ],
} }
write_domains_csv( write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True
) )
@ -466,7 +481,7 @@ def export_data_federal_to_csv(csv_file):
], ],
} }
write_domains_csv( write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True
) )
@ -536,19 +551,19 @@ def export_data_domain_growth_to_csv(csv_file, start_date, end_date):
} }
write_domains_csv( write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True
) )
write_domains_csv( write_domains_csv(
writer, writer,
columns, columns,
sort_fields_for_deleted_domains, sort_fields_for_deleted_domains,
filter_condition_for_deleted_domains, filter_condition_for_deleted_domains,
get_domain_managers=False, should_get_domain_managers=False,
should_write_header=False, should_write_header=False,
) )
def get_sliced_domains(filter_condition, distinct=False): def get_sliced_domains(filter_condition):
"""Get filtered domains counts sliced by org type and election office. """Get filtered domains counts sliced by org type and election office.
Pass distinct=True when filtering by permissions so we do not to count multiples Pass distinct=True when filtering by permissions so we do not to count multiples
when a domain has more that one manager. when a domain has more that one manager.
@ -639,7 +654,7 @@ def export_data_managed_domains_to_csv(csv_file, start_date, end_date):
"domain__permissions__isnull": False, "domain__permissions__isnull": False,
"domain__first_ready__lte": start_date_formatted, "domain__first_ready__lte": start_date_formatted,
} }
managed_domains_sliced_at_start_date = get_sliced_domains(filter_managed_domains_start_date, True) managed_domains_sliced_at_start_date = get_sliced_domains(filter_managed_domains_start_date)
writer.writerow(["MANAGED DOMAINS COUNTS AT START DATE"]) writer.writerow(["MANAGED DOMAINS COUNTS AT START DATE"])
writer.writerow( writer.writerow(
@ -663,7 +678,7 @@ def export_data_managed_domains_to_csv(csv_file, start_date, end_date):
"domain__permissions__isnull": False, "domain__permissions__isnull": False,
"domain__first_ready__lte": end_date_formatted, "domain__first_ready__lte": end_date_formatted,
} }
managed_domains_sliced_at_end_date = get_sliced_domains(filter_managed_domains_end_date, True) managed_domains_sliced_at_end_date = get_sliced_domains(filter_managed_domains_end_date)
writer.writerow(["MANAGED DOMAINS COUNTS AT END DATE"]) writer.writerow(["MANAGED DOMAINS COUNTS AT END DATE"])
writer.writerow( writer.writerow(
@ -688,7 +703,7 @@ def export_data_managed_domains_to_csv(csv_file, start_date, end_date):
columns, columns,
sort_fields, sort_fields,
filter_managed_domains_end_date, filter_managed_domains_end_date,
get_domain_managers=True, should_get_domain_managers=True,
should_write_header=True, should_write_header=True,
) )
@ -712,7 +727,7 @@ def export_data_unmanaged_domains_to_csv(csv_file, start_date, end_date):
"domain__permissions__isnull": True, "domain__permissions__isnull": True,
"domain__first_ready__lte": start_date_formatted, "domain__first_ready__lte": start_date_formatted,
} }
unmanaged_domains_sliced_at_start_date = get_sliced_domains(filter_unmanaged_domains_start_date, True) unmanaged_domains_sliced_at_start_date = get_sliced_domains(filter_unmanaged_domains_start_date)
writer.writerow(["UNMANAGED DOMAINS AT START DATE"]) writer.writerow(["UNMANAGED DOMAINS AT START DATE"])
writer.writerow( writer.writerow(
@ -736,7 +751,7 @@ def export_data_unmanaged_domains_to_csv(csv_file, start_date, end_date):
"domain__permissions__isnull": True, "domain__permissions__isnull": True,
"domain__first_ready__lte": end_date_formatted, "domain__first_ready__lte": end_date_formatted,
} }
unmanaged_domains_sliced_at_end_date = get_sliced_domains(filter_unmanaged_domains_end_date, True) unmanaged_domains_sliced_at_end_date = get_sliced_domains(filter_unmanaged_domains_end_date)
writer.writerow(["UNMANAGED DOMAINS AT END DATE"]) writer.writerow(["UNMANAGED DOMAINS AT END DATE"])
writer.writerow( writer.writerow(
@ -761,7 +776,7 @@ def export_data_unmanaged_domains_to_csv(csv_file, start_date, end_date):
columns, columns,
sort_fields, sort_fields,
filter_unmanaged_domains_end_date, filter_unmanaged_domains_end_date,
get_domain_managers=False, should_get_domain_managers=False,
should_write_header=True, should_write_header=True,
) )

View file

@ -49,8 +49,8 @@ class AnalyticsView(View):
"domain__permissions__isnull": False, "domain__permissions__isnull": False,
"domain__first_ready__lte": end_date_formatted, "domain__first_ready__lte": end_date_formatted,
} }
managed_domains_sliced_at_start_date = csv_export.get_sliced_domains(filter_managed_domains_start_date, True) managed_domains_sliced_at_start_date = csv_export.get_sliced_domains(filter_managed_domains_start_date)
managed_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_managed_domains_end_date, True) managed_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_managed_domains_end_date)
filter_unmanaged_domains_start_date = { filter_unmanaged_domains_start_date = {
"domain__permissions__isnull": True, "domain__permissions__isnull": True,
@ -60,10 +60,8 @@ class AnalyticsView(View):
"domain__permissions__isnull": True, "domain__permissions__isnull": True,
"domain__first_ready__lte": end_date_formatted, "domain__first_ready__lte": end_date_formatted,
} }
unmanaged_domains_sliced_at_start_date = csv_export.get_sliced_domains( unmanaged_domains_sliced_at_start_date = csv_export.get_sliced_domains(filter_unmanaged_domains_start_date)
filter_unmanaged_domains_start_date, True unmanaged_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_unmanaged_domains_end_date)
)
unmanaged_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_unmanaged_domains_end_date, True)
filter_ready_domains_start_date = { filter_ready_domains_start_date = {
"domain__state__in": [models.Domain.State.READY], "domain__state__in": [models.Domain.State.READY],