diff --git a/src/registrar/management/commands/utility/extra_transition_domain_helper.py b/src/registrar/management/commands/utility/extra_transition_domain_helper.py index 755c9b98a..c082552eb 100644 --- a/src/registrar/management/commands/utility/extra_transition_domain_helper.py +++ b/src/registrar/management/commands/utility/extra_transition_domain_helper.py @@ -182,8 +182,6 @@ class LoadExtraTransitionDomain: # STEP 5: Parse creation and expiration data updated_transition_domain = self.parse_creation_expiration_data(domain_name, transition_domain) - # Check if the instance has changed before saving - updated_transition_domain.save() updated_transition_domains.append(updated_transition_domain) logger.info(f"{TerminalColors.OKCYAN}" f"Successfully updated {domain_name}" f"{TerminalColors.ENDC}") @@ -199,6 +197,28 @@ class LoadExtraTransitionDomain: ) failed_transition_domains.append(domain_name) + updated_fields = [ + "organization_name", + "organization_type", + "federal_type", + "federal_agency", + "first_name", + "middle_name", + "last_name", + "email", + "phone", + "epp_creation_date", + "epp_expiration_date", + ] + + batch_size = 1000 + # Create a Paginator object. Bulk_update on the full dataset + # is too memory intensive for our current app config, so we can chunk this data instead. + paginator = Paginator(updated_transition_domains, batch_size) + for page_num in paginator.page_range: + page = paginator.page(page_num) + TransitionDomain.objects.bulk_update(page.object_list, updated_fields) + failed_count = len(failed_transition_domains) if failed_count == 0: if self.debug: diff --git a/src/registrar/models/domain.py b/src/registrar/models/domain.py index 1a581a4ec..27a8364bc 100644 --- a/src/registrar/models/domain.py +++ b/src/registrar/models/domain.py @@ -910,10 +910,15 @@ class Domain(TimeStampedModel, DomainHelper): raise NotImplementedError() def get_security_email(self): - logger.info("get_security_email-> getting the contact ") - secContact = self.security_contact - if secContact is not None: - return secContact.email + logger.info("get_security_email-> getting the contact") + + security = PublicContact.ContactTypeChoices.SECURITY + security_contact = self.generic_contact_getter(security) + + # If we get a valid value for security_contact, pull its email + # Otherwise, just return nothing + if security_contact is not None and isinstance(security_contact, PublicContact): + return security_contact.email else: return None @@ -1121,7 +1126,6 @@ class Domain(TimeStampedModel, DomainHelper): If you wanted to setup getter logic for Security, you would call: cache_contact_helper(PublicContact.ContactTypeChoices.SECURITY), or cache_contact_helper("security"). - """ # registrant_contact(s) are an edge case. They exist on # the "registrant" property as opposed to contacts. diff --git a/src/registrar/utility/csv_export.py b/src/registrar/utility/csv_export.py index 3924c03c4..f9608f553 100644 --- a/src/registrar/utility/csv_export.py +++ b/src/registrar/utility/csv_export.py @@ -3,10 +3,13 @@ import logging from datetime import datetime from registrar.models.domain import Domain from registrar.models.domain_information import DomainInformation -from registrar.models.public_contact import PublicContact -from django.db.models import Value -from django.db.models.functions import Coalesce from django.utils import timezone +from django.core.paginator import Paginator +from django.db.models import F, Value, CharField +from django.db.models.functions import Concat, Coalesce + +from registrar.models.public_contact import PublicContact + logger = logging.getLogger(__name__) @@ -20,50 +23,77 @@ def write_header(writer, columns): def get_domain_infos(filter_condition, sort_fields): - domain_infos = DomainInformation.objects.filter(**filter_condition).order_by(*sort_fields) - return domain_infos + domain_infos = ( + DomainInformation.objects.select_related("domain", "authorizing_official") + .filter(**filter_condition) + .order_by(*sort_fields) + ) + + # Do a mass concat of the first and last name fields for authorizing_official. + # The old operation was computationally heavy for some reason, so if we precompute + # this here, it is vastly more efficient. + domain_infos_cleaned = domain_infos.annotate( + ao=Concat( + Coalesce(F("authorizing_official__first_name"), Value("")), + Value(" "), + Coalesce(F("authorizing_official__last_name"), Value("")), + output_field=CharField(), + ) + ) + return domain_infos_cleaned -def write_row(writer, columns, domain_info): - security_contacts = domain_info.domain.contacts.filter(contact_type=PublicContact.ContactTypeChoices.SECURITY) +def parse_row(columns, domain_info: DomainInformation, security_emails_dict=None): + """Given a set of columns, generate a new row from cleaned column data""" - # For linter - ao = " " - if domain_info.authorizing_official: - first_name = domain_info.authorizing_official.first_name or "" - last_name = domain_info.authorizing_official.last_name or "" - ao = first_name + " " + last_name + # Domain should never be none when parsing this information + if domain_info.domain is None: + raise ValueError("Domain is none") - security_email = " " - if security_contacts: - security_email = security_contacts[0].email + domain = domain_info.domain # type: ignore + + # Grab the security email from a preset dictionary. + # If nothing exists in the dictionary, grab from .contacts. + if security_emails_dict is not None and domain.name in security_emails_dict: + _email = security_emails_dict.get(domain.name) + security_email = _email if _email is not None else " " + else: + # If the dictionary doesn't contain that data, lets filter for it manually. + # This is a last resort as this is a more expensive operation. + security_contacts = domain.contacts.filter(contact_type=PublicContact.ContactTypeChoices.SECURITY) + _email = security_contacts[0].email if security_contacts else None + security_email = _email if _email is not None else " " - invalid_emails = {"registrar@dotgov.gov", "dotgov@cisa.dhs.gov"} # These are default emails that should not be displayed in the csv report - if security_email is not None and security_email.lower() in invalid_emails: + invalid_emails = {"registrar@dotgov.gov", "dotgov@cisa.dhs.gov"} + if security_email.lower() in invalid_emails: security_email = "(blank)" + if domain_info.federal_type: + domain_type = f"{domain_info.get_organization_type_display()} - {domain_info.get_federal_type_display()}" + else: + domain_type = domain_info.get_organization_type_display() + # create a dictionary of fields which can be included in output FIELDS = { - "Domain name": domain_info.domain.name, - "Domain type": domain_info.get_organization_type_display() + " - " + domain_info.get_federal_type_display() - if domain_info.federal_type - else domain_info.get_organization_type_display(), + "Domain name": domain.name, + "Domain type": domain_type, "Agency": domain_info.federal_agency, "Organization name": domain_info.organization_name, "City": domain_info.city, "State": domain_info.state_territory, - "AO": ao, + "AO": domain_info.ao, # type: ignore "AO email": domain_info.authorizing_official.email if domain_info.authorizing_official else " ", "Security contact email": security_email, - "Status": domain_info.domain.get_state_display(), - "Expiration date": domain_info.domain.expiration_date, - "Created at": domain_info.domain.created_at, - "First ready": domain_info.domain.first_ready, - "Deleted": domain_info.domain.deleted, + "Status": domain.get_state_display(), + "Expiration date": domain.expiration_date, + "Created at": domain.created_at, + "First ready": domain.first_ready, + "Deleted": domain.deleted, } - writer.writerow([FIELDS.get(column, "") for column in columns]) + row = [FIELDS.get(column, "") for column in columns] + return row def write_body( @@ -78,13 +108,41 @@ def write_body( """ # Get the domainInfos - domain_infos = get_domain_infos(filter_condition, sort_fields) + all_domain_infos = get_domain_infos(filter_condition, sort_fields) - all_domain_infos = list(domain_infos) + # Store all security emails to avoid epp calls or excessive filters + sec_contact_ids = all_domain_infos.values_list("domain__security_contact_registry_id", flat=True) + security_emails_dict = {} + public_contacts = ( + PublicContact.objects.only("email", "domain__name") + .select_related("domain") + .filter(registry_id__in=sec_contact_ids) + ) - # Write rows to CSV - for domain_info in all_domain_infos: - write_row(writer, columns, domain_info) + # Populate a dictionary of domain names and their security contacts + for contact in public_contacts: + domain: Domain = contact.domain + if domain is not None and domain.name not in security_emails_dict: + security_emails_dict[domain.name] = contact.email + else: + logger.warning("csv_export -> Domain was none for PublicContact") + + # Reduce the memory overhead when performing the write operation + paginator = Paginator(all_domain_infos, 1000) + for page_num in paginator.page_range: + page = paginator.page(page_num) + rows = [] + for domain_info in page.object_list: + try: + row = parse_row(columns, domain_info, security_emails_dict) + rows.append(row) + except ValueError: + # This should not happen. If it does, just skip this row. + # It indicates that DomainInformation.domain is None. + logger.error("csv_export -> Error when parsing row, domain was None") + continue + + writer.writerows(rows) def export_data_type_to_csv(csv_file):