Optimize by using prebuilt dicts

This commit is contained in:
Rachid Mrad 2024-04-02 12:39:29 -04:00
parent dba2dfb6c2
commit 410440ee70
No known key found for this signature in database

View file

@ -11,6 +11,8 @@ from django.db.models import F, Value, CharField, Q, Count
from django.db.models.functions import Concat, Coalesce from django.db.models.functions import Concat, Coalesce
from registrar.models.public_contact import PublicContact from registrar.models.public_contact import PublicContact
from registrar.models.user_domain_role import UserDomainRole
from registrar.models.utility.generic_helper import Timer
from registrar.utility.enums import DefaultEmail from registrar.utility.enums import DefaultEmail
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -33,7 +35,6 @@ def get_domain_infos(filter_condition, sort_fields):
""" """
domain_infos = ( domain_infos = (
DomainInformation.objects.select_related("domain", "authorizing_official") DomainInformation.objects.select_related("domain", "authorizing_official")
.prefetch_related("domain__permissions", "domain__invitations")
.filter(**filter_condition) .filter(**filter_condition)
.order_by(*sort_fields) .order_by(*sort_fields)
.distinct() .distinct()
@ -53,7 +54,7 @@ def get_domain_infos(filter_condition, sort_fields):
return domain_infos_cleaned return domain_infos_cleaned
def parse_domain_row(columns, domain_info: DomainInformation, security_emails_dict=None, get_domain_managers=False, invites_with_invited_status=None): def parse_domain_row(columns, domain_info: DomainInformation, security_emails_dict=None, get_domain_managers=False, domain_invitation_emails=None, domain_permissions_emails=None):
"""Given a set of columns, generate a new row from cleaned column data""" """Given a set of columns, generate a new row from cleaned column data"""
# Domain should never be none when parsing this information # Domain should never be none when parsing this information
@ -105,10 +106,9 @@ def parse_domain_row(columns, domain_info: DomainInformation, security_emails_di
if get_domain_managers: if get_domain_managers:
# Get lists of emails for active and invited domain managers # Get lists of emails for active and invited domain managers
dm_active_emails = [dm.user.email for dm in domain.permissions.all()]
dm_invited_emails = [ dm_active_emails = domain_permissions_emails.get(domain_info.domain.name, [])
invite.email for invite in invites_with_invited_status.filter(domain=domain) dm_invited_emails = domain_invitation_emails.get(domain_info.domain.name, [])
]
# Set up the "matching headers" + row field data for email and status # Set up the "matching headers" + row field data for email and status
i = 0 # Declare i outside of the loop to avoid a reference before assignment in the second loop i = 0 # Declare i outside of the loop to avoid a reference before assignment in the second loop
@ -148,7 +148,7 @@ def _get_security_emails(sec_contact_ids):
def update_columns_with_domain_managers( def update_columns_with_domain_managers(
domain_info,invites_with_invited_status, update_columns, columns, max_dm_total domain_info,domain_invitation_emails, domain_permissions_emails, update_columns, columns, max_dm_total
): ):
"""Helper function that works with 'global' variables set in write_domains_csv """Helper function that works with 'global' variables set in write_domains_csv
Accepts: Accepts:
@ -159,8 +159,25 @@ def update_columns_with_domain_managers(
Returns: Returns:
Updated update_columns, columns, max_dm_active, max_dm_invited, max_dm_total""" Updated update_columns, columns, max_dm_active, max_dm_invited, max_dm_total"""
dm_active = domain_info.domain.permissions.count() dm_active = 0
dm_invited = invites_with_invited_status.filter(domain=domain_info.domain).count() dm_invited = 0
try:
# logger.info(f'domain_invitation_emails {domain_invitation_emails[domain_info.domain.name]}')
# Get the list of invitation emails for the domain name if it exists, otherwise, return an empty list
invitation_emails = domain_invitation_emails.get(domain_info.domain.name, [])
# Count the number of invitation emails
dm_invited = len(invitation_emails)
except KeyError:
pass
try:
active_emails = domain_permissions_emails.get(domain_info.domain.name, [])
# Count the number of invitation emails
dm_active = len(active_emails)
except KeyError:
pass
if dm_active + dm_invited > max_dm_total: if dm_active + dm_invited > max_dm_total:
max_dm_total = dm_active + dm_invited max_dm_total = dm_active + dm_invited
@ -193,6 +210,7 @@ def write_domains_csv(
should_write_header: Conditional bc export_data_domain_growth_to_csv calls write_body twice should_write_header: Conditional bc export_data_domain_growth_to_csv calls write_body twice
""" """
with Timer():
all_domain_infos = get_domain_infos(filter_condition, sort_fields) all_domain_infos = get_domain_infos(filter_condition, sort_fields)
# Store all security emails to avoid epp calls or excessive filters # Store all security emails to avoid epp calls or excessive filters
@ -208,15 +226,33 @@ def write_domains_csv(
max_dm_total = 0 max_dm_total = 0
update_columns = False update_columns = False
invites_with_invited_status=None invites_with_invited_status=None
domain_invitation_emails = {}
domain_permissions_emails = {}
if get_domain_managers: if get_domain_managers:
invites_with_invited_status = DomainInvitation.objects.filter(status=DomainInvitation.DomainInvitationStatus.INVITED).prefetch_related("domain") invites_with_invited_status = DomainInvitation.objects.filter(status=DomainInvitation.DomainInvitationStatus.INVITED).select_related("domain")
# zander = DomainInformation.objects.filter(**filter_condition).annotate(invitations_count=Count('invitation', filter=Q(invitation__status='invited'))).values_list('domain_name', 'invitations_count') # Iterate through each domain invitation and populate the dictionary
# logger.info(f'zander {zander}') for invite in invites_with_invited_status:
# zander_dict = dict(zander) domain_name = invite.domain.name
# logger.info(f'zander_dict {zander_dict}') email = invite.email
if domain_name not in domain_invitation_emails:
domain_invitation_emails[domain_name] = []
domain_invitation_emails[domain_name].append(email)
domain_permissions = UserDomainRole.objects.all()
# Iterate through each domain invitation and populate the dictionary
for permission in domain_permissions:
domain_name = permission.domain.name
email = permission.user.email
if domain_name not in domain_permissions_emails:
domain_permissions_emails[domain_name] = []
domain_permissions_emails[domain_name].append(email)
# logger.info(f'domain_invitation_emails {domain_invitation_emails}')
# This var will live outside of the nested for loops to aggregate # This var will live outside of the nested for loops to aggregate
# the data from those loops # the data from those loops
@ -231,12 +267,12 @@ def write_domains_csv(
if get_domain_managers: if get_domain_managers:
update_columns, columns, max_dm_total = ( update_columns, columns, max_dm_total = (
update_columns_with_domain_managers( update_columns_with_domain_managers(
domain_info,invites_with_invited_status, update_columns, columns, max_dm_total domain_info, domain_invitation_emails,domain_permissions_emails, update_columns, columns, max_dm_total
) )
) )
try: try:
row = parse_domain_row(columns, domain_info, security_emails_dict, get_domain_managers, invites_with_invited_status) row = parse_domain_row(columns, domain_info, security_emails_dict, get_domain_managers, domain_invitation_emails,domain_permissions_emails)
rows.append(row) rows.append(row)
except ValueError: except ValueError:
# This should not happen. If it does, just skip this row. # This should not happen. If it does, just skip this row.
@ -360,6 +396,7 @@ def export_data_type_to_csv(csv_file):
Domain.State.READY, Domain.State.READY,
Domain.State.DNS_NEEDED, Domain.State.DNS_NEEDED,
Domain.State.ON_HOLD, Domain.State.ON_HOLD,
Domain.State.UNKNOWN,
], ],
} }
write_domains_csv( write_domains_csv(