mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-07-22 18:56:15 +02:00
Merge branch 'main' into za/1620-hide-registrardotgov-email
This commit is contained in:
commit
382fbdb98b
30 changed files with 650 additions and 200 deletions
|
@ -3,11 +3,12 @@ import logging
|
|||
from datetime import datetime
|
||||
from registrar.models.domain import Domain
|
||||
from registrar.models.domain_information import DomainInformation
|
||||
from registrar.models.public_contact import PublicContact
|
||||
from django.db.models import Value
|
||||
from django.db.models.functions import Coalesce
|
||||
from django.utils import timezone
|
||||
from django.core.paginator import Paginator
|
||||
from django.db.models import F, Value, CharField
|
||||
from django.db.models.functions import Concat, Coalesce
|
||||
|
||||
from registrar.models.public_contact import PublicContact
|
||||
from registrar.utility.enums import DefaultEmail
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -22,50 +23,77 @@ def write_header(writer, columns):
|
|||
|
||||
|
||||
def get_domain_infos(filter_condition, sort_fields):
|
||||
domain_infos = DomainInformation.objects.filter(**filter_condition).order_by(*sort_fields)
|
||||
return domain_infos
|
||||
domain_infos = (
|
||||
DomainInformation.objects.select_related("domain", "authorizing_official")
|
||||
.filter(**filter_condition)
|
||||
.order_by(*sort_fields)
|
||||
)
|
||||
|
||||
# Do a mass concat of the first and last name fields for authorizing_official.
|
||||
# The old operation was computationally heavy for some reason, so if we precompute
|
||||
# this here, it is vastly more efficient.
|
||||
domain_infos_cleaned = domain_infos.annotate(
|
||||
ao=Concat(
|
||||
Coalesce(F("authorizing_official__first_name"), Value("")),
|
||||
Value(" "),
|
||||
Coalesce(F("authorizing_official__last_name"), Value("")),
|
||||
output_field=CharField(),
|
||||
)
|
||||
)
|
||||
return domain_infos_cleaned
|
||||
|
||||
|
||||
def write_row(writer, columns, domain_info):
|
||||
security_contacts = domain_info.domain.contacts.filter(contact_type=PublicContact.ContactTypeChoices.SECURITY)
|
||||
def parse_row(columns, domain_info: DomainInformation, security_emails_dict=None):
|
||||
"""Given a set of columns, generate a new row from cleaned column data"""
|
||||
|
||||
# For linter
|
||||
ao = " "
|
||||
if domain_info.authorizing_official:
|
||||
first_name = domain_info.authorizing_official.first_name or ""
|
||||
last_name = domain_info.authorizing_official.last_name or ""
|
||||
ao = first_name + " " + last_name
|
||||
# Domain should never be none when parsing this information
|
||||
if domain_info.domain is None:
|
||||
raise ValueError("Domain is none")
|
||||
|
||||
security_email = " "
|
||||
if security_contacts:
|
||||
security_email = security_contacts[0].email
|
||||
domain = domain_info.domain # type: ignore
|
||||
|
||||
# Grab the security email from a preset dictionary.
|
||||
# If nothing exists in the dictionary, grab from .contacts.
|
||||
if security_emails_dict is not None and domain.name in security_emails_dict:
|
||||
_email = security_emails_dict.get(domain.name)
|
||||
security_email = _email if _email is not None else " "
|
||||
else:
|
||||
# If the dictionary doesn't contain that data, lets filter for it manually.
|
||||
# This is a last resort as this is a more expensive operation.
|
||||
security_contacts = domain.contacts.filter(contact_type=PublicContact.ContactTypeChoices.SECURITY)
|
||||
_email = security_contacts[0].email if security_contacts else None
|
||||
security_email = _email if _email is not None else " "
|
||||
|
||||
invalid_emails = {DefaultEmail.LEGACY_DEFAULT, DefaultEmail.PUBLIC_CONTACT_DEFAULT}
|
||||
# These are default emails that should not be displayed in the csv report
|
||||
if security_email is not None and security_email.lower() in invalid_emails:
|
||||
invalid_emails = {DefaultEmail.LEGACY_DEFAULT, DefaultEmail.PUBLIC_CONTACT_DEFAULT}
|
||||
if security_email.lower() in invalid_emails:
|
||||
security_email = "(blank)"
|
||||
|
||||
if domain_info.federal_type:
|
||||
domain_type = f"{domain_info.get_organization_type_display()} - {domain_info.get_federal_type_display()}"
|
||||
else:
|
||||
domain_type = domain_info.get_organization_type_display()
|
||||
|
||||
# create a dictionary of fields which can be included in output
|
||||
FIELDS = {
|
||||
"Domain name": domain_info.domain.name,
|
||||
"Domain type": domain_info.get_organization_type_display() + " - " + domain_info.get_federal_type_display()
|
||||
if domain_info.federal_type
|
||||
else domain_info.get_organization_type_display(),
|
||||
"Domain name": domain.name,
|
||||
"Domain type": domain_type,
|
||||
"Agency": domain_info.federal_agency,
|
||||
"Organization name": domain_info.organization_name,
|
||||
"City": domain_info.city,
|
||||
"State": domain_info.state_territory,
|
||||
"AO": ao,
|
||||
"AO": domain_info.ao, # type: ignore
|
||||
"AO email": domain_info.authorizing_official.email if domain_info.authorizing_official else " ",
|
||||
"Security contact email": security_email,
|
||||
"Status": domain_info.domain.get_state_display(),
|
||||
"Expiration date": domain_info.domain.expiration_date,
|
||||
"Created at": domain_info.domain.created_at,
|
||||
"First ready": domain_info.domain.first_ready,
|
||||
"Deleted": domain_info.domain.deleted,
|
||||
"Status": domain.get_state_display(),
|
||||
"Expiration date": domain.expiration_date,
|
||||
"Created at": domain.created_at,
|
||||
"First ready": domain.first_ready,
|
||||
"Deleted": domain.deleted,
|
||||
}
|
||||
|
||||
writer.writerow([FIELDS.get(column, "") for column in columns])
|
||||
row = [FIELDS.get(column, "") for column in columns]
|
||||
return row
|
||||
|
||||
|
||||
def write_body(
|
||||
|
@ -80,13 +108,41 @@ def write_body(
|
|||
"""
|
||||
|
||||
# Get the domainInfos
|
||||
domain_infos = get_domain_infos(filter_condition, sort_fields)
|
||||
all_domain_infos = get_domain_infos(filter_condition, sort_fields)
|
||||
|
||||
all_domain_infos = list(domain_infos)
|
||||
# Store all security emails to avoid epp calls or excessive filters
|
||||
sec_contact_ids = all_domain_infos.values_list("domain__security_contact_registry_id", flat=True)
|
||||
security_emails_dict = {}
|
||||
public_contacts = (
|
||||
PublicContact.objects.only("email", "domain__name")
|
||||
.select_related("domain")
|
||||
.filter(registry_id__in=sec_contact_ids)
|
||||
)
|
||||
|
||||
# Write rows to CSV
|
||||
for domain_info in all_domain_infos:
|
||||
write_row(writer, columns, domain_info)
|
||||
# Populate a dictionary of domain names and their security contacts
|
||||
for contact in public_contacts:
|
||||
domain: Domain = contact.domain
|
||||
if domain is not None and domain.name not in security_emails_dict:
|
||||
security_emails_dict[domain.name] = contact.email
|
||||
else:
|
||||
logger.warning("csv_export -> Domain was none for PublicContact")
|
||||
|
||||
# Reduce the memory overhead when performing the write operation
|
||||
paginator = Paginator(all_domain_infos, 1000)
|
||||
for page_num in paginator.page_range:
|
||||
page = paginator.page(page_num)
|
||||
rows = []
|
||||
for domain_info in page.object_list:
|
||||
try:
|
||||
row = parse_row(columns, domain_info, security_emails_dict)
|
||||
rows.append(row)
|
||||
except ValueError:
|
||||
# This should not happen. If it does, just skip this row.
|
||||
# It indicates that DomainInformation.domain is None.
|
||||
logger.error("csv_export -> Error when parsing row, domain was None")
|
||||
continue
|
||||
|
||||
writer.writerows(rows)
|
||||
|
||||
|
||||
def export_data_type_to_csv(csv_file):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue