This commit is contained in:
Rachid Mrad 2024-06-26 17:03:39 -04:00
parent 0fecc199d0
commit 56a8f83e18
No known key found for this signature in database
3 changed files with 615 additions and 581 deletions

View file

@ -151,6 +151,11 @@ class Domain(TimeStampedModel, DomainHelper):
# previously existed but has been deleted from the registry # previously existed but has been deleted from the registry
DELETED = "deleted", "Deleted" DELETED = "deleted", "Deleted"
@classmethod
def get_state_label(cls, state: str):
"""Returns the associated label for a given state value"""
return cls(state).label if state else None
@classmethod @classmethod
def get_help_text(cls, state) -> str: def get_help_text(cls, state) -> str:
"""Returns a help message for a desired state. If none is found, an empty string is returned""" """Returns a help message for a desired state. If none is found, an empty string is returned"""

View file

@ -1,3 +1,4 @@
from collections import defaultdict
import csv import csv
import logging import logging
from datetime import datetime from datetime import datetime
@ -32,383 +33,6 @@ def write_header(writer, columns):
writer.writerow(columns) writer.writerow(columns)
def get_domain_infos(filter_condition, sort_fields):
"""
Returns DomainInformation objects filtered and sorted based on the provided conditions.
filter_condition -> A dictionary of conditions to filter the objects.
sort_fields -> A list of fields to sort the resulting query set.
returns: A queryset of DomainInformation objects
"""
domain_infos = (
DomainInformation.objects.select_related("domain", "authorizing_official")
.filter(**filter_condition)
.order_by(*sort_fields)
.distinct()
)
# Do a mass concat of the first and last name fields for authorizing_official.
# The old operation was computationally heavy for some reason, so if we precompute
# this here, it is vastly more efficient.
domain_infos_cleaned = domain_infos.annotate(
ao=Concat(
Coalesce(F("authorizing_official__first_name"), Value("")),
Value(" "),
Coalesce(F("authorizing_official__last_name"), Value("")),
output_field=CharField(),
)
)
return domain_infos_cleaned
def parse_row_for_domain(
columns,
domain_info: DomainInformation,
dict_security_emails=None,
should_get_domain_managers=False,
dict_domain_invitations_with_invited_status=None,
dict_user_domain_roles=None,
):
"""Given a set of columns, generate a new row from cleaned column data"""
# Domain should never be none when parsing this information
if domain_info.domain is None:
logger.error("Attemting to parse row for csv exports but Domain is none in a DomainInfo")
raise ValueError("Domain is none")
domain = domain_info.domain # type: ignore
# Grab the security email from a preset dictionary.
# If nothing exists in the dictionary, grab from .contacts.
if dict_security_emails is not None and domain.name in dict_security_emails:
_email = dict_security_emails.get(domain.name)
security_email = _email if _email is not None else " "
else:
# If the dictionary doesn't contain that data, lets filter for it manually.
# This is a last resort as this is a more expensive operation.
security_contacts = domain.contacts.filter(contact_type=PublicContact.ContactTypeChoices.SECURITY)
_email = security_contacts[0].email if security_contacts else None
security_email = _email if _email is not None else " "
# These are default emails that should not be displayed in the csv report
invalid_emails = {DefaultEmail.LEGACY_DEFAULT.value, DefaultEmail.PUBLIC_CONTACT_DEFAULT.value}
if security_email.lower() in invalid_emails:
security_email = "(blank)"
if domain_info.federal_type and domain_info.organization_type == DomainRequest.OrgChoicesElectionOffice.FEDERAL:
domain_type = f"{domain_info.get_organization_type_display()} - {domain_info.get_federal_type_display()}"
else:
domain_type = domain_info.get_organization_type_display()
# create a dictionary of fields which can be included in output
FIELDS = {
"Domain name": domain.name,
"Status": domain.get_state_display(),
"First ready on": domain.first_ready or "(blank)",
"Expiration date": domain.expiration_date or "(blank)",
"Domain type": domain_type,
"Agency": domain_info.federal_agency,
"Organization name": domain_info.organization_name,
"City": domain_info.city,
"State": domain_info.state_territory,
"AO": domain_info.ao, # type: ignore
"AO email": domain_info.authorizing_official.email if domain_info.authorizing_official else " ",
"Security contact email": security_email,
"Created at": domain.created_at,
"Deleted": domain.deleted,
}
if should_get_domain_managers:
# Get lists of emails for active and invited domain managers
dms_active_emails = dict_user_domain_roles.get(domain_info.domain.name, [])
dms_invited_emails = dict_domain_invitations_with_invited_status.get(domain_info.domain.name, [])
# Set up the "matching headers" + row field data for email and status
i = 0 # Declare i outside of the loop to avoid a reference before assignment in the second loop
for i, dm_email in enumerate(dms_active_emails, start=1):
FIELDS[f"Domain manager {i}"] = dm_email
FIELDS[f"DM{i} status"] = "R"
# Continue enumeration from where we left off and add data for invited domain managers
for j, dm_email in enumerate(dms_invited_emails, start=i + 1):
FIELDS[f"Domain manager {j}"] = dm_email
FIELDS[f"DM{j} status"] = "I"
row = [FIELDS.get(column, "") for column in columns]
return row
def _get_security_emails(sec_contact_ids):
"""
Retrieve security contact emails for the given security contact IDs.
"""
dict_security_emails = {}
public_contacts = (
PublicContact.objects.only("email", "domain__name")
.select_related("domain")
.filter(registry_id__in=sec_contact_ids)
)
# Populate a dictionary of domain names and their security contacts
for contact in public_contacts:
domain: Domain = contact.domain
if domain is not None and domain.name not in dict_security_emails:
dict_security_emails[domain.name] = contact.email
else:
logger.warning("csv_export -> Domain was none for PublicContact")
return dict_security_emails
def count_domain_managers(domain_name, dict_domain_invitations_with_invited_status, dict_user_domain_roles):
"""Count active and invited domain managers"""
dms_active = len(dict_user_domain_roles.get(domain_name, []))
dms_invited = len(dict_domain_invitations_with_invited_status.get(domain_name, []))
return dms_active, dms_invited
def update_columns(columns, dms_total, should_update_columns):
"""Update columns if necessary"""
if should_update_columns:
for i in range(1, dms_total + 1):
email_column_header = f"Domain manager {i}"
status_column_header = f"DM{i} status"
if email_column_header not in columns:
columns.append(email_column_header)
columns.append(status_column_header)
should_update_columns = False
return columns, should_update_columns, dms_total
def update_columns_with_domain_managers(
columns,
domain_info,
should_update_columns,
dms_total,
dict_domain_invitations_with_invited_status,
dict_user_domain_roles,
):
"""Helper function to update columns with domain manager information"""
domain_name = domain_info.domain.name
try:
dms_active, dms_invited = count_domain_managers(
domain_name, dict_domain_invitations_with_invited_status, dict_user_domain_roles
)
if dms_active + dms_invited > dms_total:
dms_total = dms_active + dms_invited
should_update_columns = True
except Exception as err:
logger.error(f"Exception while parsing domain managers for reports: {err}")
return update_columns(columns, dms_total, should_update_columns)
def build_dictionaries_for_domain_managers(dict_user_domain_roles, dict_domain_invitations_with_invited_status):
"""Helper function that builds dicts for invited users and active domain
managers. We do so to avoid filtering within loops."""
user_domain_roles = UserDomainRole.objects.all()
# Iterate through each user domain role and populate the dictionary
for user_domain_role in user_domain_roles:
domain_name = user_domain_role.domain.name
email = user_domain_role.user.email
if domain_name not in dict_user_domain_roles:
dict_user_domain_roles[domain_name] = []
dict_user_domain_roles[domain_name].append(email)
domain_invitations_with_invited_status = None
domain_invitations_with_invited_status = DomainInvitation.objects.filter(
status=DomainInvitation.DomainInvitationStatus.INVITED
).select_related("domain")
# Iterate through each domain invitation and populate the dictionary
for invite in domain_invitations_with_invited_status:
domain_name = invite.domain.name
email = invite.email
if domain_name not in dict_domain_invitations_with_invited_status:
dict_domain_invitations_with_invited_status[domain_name] = []
dict_domain_invitations_with_invited_status[domain_name].append(email)
return dict_user_domain_roles, dict_domain_invitations_with_invited_status
def write_csv_for_domains(
writer,
columns,
sort_fields,
filter_condition,
should_get_domain_managers=False,
should_write_header=True,
):
"""
Receives params from the parent methods and outputs a CSV with filtered and sorted domains.
Works with write_header as long as the same writer object is passed.
should_get_domain_managers: Conditional bc we only use domain manager info for export_data_full_to_csv
should_write_header: Conditional bc export_data_domain_growth_to_csv calls write_body twice
"""
# Retrieve domain information and all sec emails
all_domain_infos = get_domain_infos(filter_condition, sort_fields)
sec_contact_ids = all_domain_infos.values_list("domain__security_contact_registry_id", flat=True)
dict_security_emails = _get_security_emails(sec_contact_ids)
paginator = Paginator(all_domain_infos, 1000)
# Initialize variables
dms_total = 0
should_update_columns = False
total_body_rows = []
dict_user_domain_roles = {}
dict_domain_invitations_with_invited_status = {}
# Build dictionaries if necessary
if should_get_domain_managers:
dict_user_domain_roles, dict_domain_invitations_with_invited_status = build_dictionaries_for_domain_managers(
dict_user_domain_roles, dict_domain_invitations_with_invited_status
)
# Process domain information
for page_num in paginator.page_range:
rows = []
page = paginator.page(page_num)
for domain_info in page.object_list:
if should_get_domain_managers:
columns, dms_total, should_update_columns = update_columns_with_domain_managers(
columns,
domain_info,
should_update_columns,
dms_total,
dict_domain_invitations_with_invited_status,
dict_user_domain_roles,
)
try:
row = parse_row_for_domain(
columns,
domain_info,
dict_security_emails,
should_get_domain_managers,
dict_domain_invitations_with_invited_status,
dict_user_domain_roles,
)
rows.append(row)
except ValueError:
logger.error("csv_export -> Error when parsing row, domain was None")
continue
total_body_rows.extend(rows)
if should_write_header:
write_header(writer, columns)
writer.writerows(total_body_rows)
def export_data_type_to_csv(csv_file):
"""
All domains report with extra columns.
This maps to the "All domain metadata" button.
Exports domains of all statuses.
"""
writer = csv.writer(csv_file)
# define columns to include in export
columns = [
"Domain name",
"Status",
"First ready on",
"Expiration date",
"Domain type",
"Agency",
"Organization name",
"City",
"State",
"AO",
"AO email",
"Security contact email",
# For domain manager we are pass it in as a parameter below in write_body
]
# Coalesce is used to replace federal_type of None with ZZZZZ
sort_fields = [
"organization_type",
Coalesce("federal_type", Value("ZZZZZ")),
"federal_agency",
"domain__name",
]
write_csv_for_domains(
writer, columns, sort_fields, filter_condition={}, should_get_domain_managers=True, should_write_header=True
)
def export_data_full_to_csv(csv_file):
"""All domains report"""
writer = csv.writer(csv_file)
# define columns to include in export
columns = [
"Domain name",
"Domain type",
"Agency",
"Organization name",
"City",
"State",
"Security contact email",
]
# Coalesce is used to replace federal_type of None with ZZZZZ
sort_fields = [
"organization_type",
Coalesce("federal_type", Value("ZZZZZ")),
"federal_agency",
"domain__name",
]
filter_condition = {
"domain__state__in": [
Domain.State.READY,
Domain.State.DNS_NEEDED,
Domain.State.ON_HOLD,
],
}
write_csv_for_domains(
writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True
)
def export_data_federal_to_csv(csv_file):
"""Federal domains report"""
writer = csv.writer(csv_file)
# define columns to include in export
columns = [
"Domain name",
"Domain type",
"Agency",
"Organization name",
"City",
"State",
"Security contact email",
]
# Coalesce is used to replace federal_type of None with ZZZZZ
sort_fields = [
"organization_type",
Coalesce("federal_type", Value("ZZZZZ")),
"federal_agency",
"domain__name",
]
filter_condition = {
"organization_type__icontains": "federal",
"domain__state__in": [
Domain.State.READY,
Domain.State.DNS_NEEDED,
Domain.State.ON_HOLD,
],
}
write_csv_for_domains(
writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True
)
def get_default_start_date(): def get_default_start_date():
# Default to a date that's prior to our first deployment # Default to a date that's prior to our first deployment
return timezone.make_aware(datetime(2023, 11, 1)) return timezone.make_aware(datetime(2023, 11, 1))
@ -427,66 +51,6 @@ def format_end_date(end_date):
return timezone.make_aware(datetime.strptime(end_date, "%Y-%m-%d")) if end_date else get_default_end_date() return timezone.make_aware(datetime.strptime(end_date, "%Y-%m-%d")) if end_date else get_default_end_date()
def export_data_domain_growth_to_csv(csv_file, start_date, end_date):
"""
Growth report:
Receive start and end dates from the view, parse them.
Request from write_body READY domains that are created between
the start and end dates, as well as DELETED domains that are deleted between
the start and end dates. Specify sort params for both lists.
"""
start_date_formatted = format_start_date(start_date)
end_date_formatted = format_end_date(end_date)
writer = csv.writer(csv_file)
# define columns to include in export
columns = [
"Domain name",
"Domain type",
"Agency",
"Organization name",
"City",
"State",
"Status",
"Expiration date",
"Created at",
"First ready",
"Deleted",
]
sort_fields = [
"domain__first_ready",
"domain__name",
]
filter_condition = {
"domain__state__in": [Domain.State.READY],
"domain__first_ready__lte": end_date_formatted,
"domain__first_ready__gte": start_date_formatted,
}
# We also want domains deleted between sar and end dates, sorted
sort_fields_for_deleted_domains = [
"domain__deleted",
"domain__name",
]
filter_condition_for_deleted_domains = {
"domain__state__in": [Domain.State.DELETED],
"domain__deleted__lte": end_date_formatted,
"domain__deleted__gte": start_date_formatted,
}
write_csv_for_domains(
writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True
)
write_csv_for_domains(
writer,
columns,
sort_fields_for_deleted_domains,
filter_condition_for_deleted_domains,
should_get_domain_managers=False,
should_write_header=False,
)
def get_sliced_domains(filter_condition): def get_sliced_domains(filter_condition):
"""Get filtered domains counts sliced by org type and election office. """Get filtered domains counts sliced by org type and election office.
Pass distinct=True when filtering by permissions so we do not to count multiples Pass distinct=True when filtering by permissions so we do not to count multiples
@ -558,17 +122,278 @@ def get_sliced_requests(filter_condition):
election_board, election_board,
] ]
class DomainExport:
"""
A collection of functions which return csv files regarding the Domain model.
"""
def export_data_managed_domains_to_csv(csv_file, start_date, end_date): @classmethod
"""Get counts for domains that have domain managers for two different dates, def export_data_type_to_csv(cls, csv_file):
"""
All domain metadata:
Exports domains of all statuses plus domain managers.
"""
writer = csv.writer(csv_file)
columns = [
"Domain name",
"Status",
"First ready on",
"Expiration date",
"Domain type",
"Agency",
"Organization name",
"City",
"State",
"AO",
"AO email",
"Security contact email",
"Domain managers",
"Invited domain managers",
]
# Coalesce is used to replace federal_type of None with ZZZZZ
sort_fields = [
"organization_type",
Coalesce("federal_type", Value("ZZZZZ")),
"federal_agency",
"domain__name",
]
# Fetch all relevant PublicContact entries
public_contacts = cls.get_all_security_emails()
# Fetch all relevant Invite entries
domain_invitations = cls.get_all_domain_invitations()
# Fetch all relevant ComainUserRole entries
user_domain_roles = cls.get_all_user_domain_roles()
domain_infos = (
DomainInformation.objects.select_related("domain", "authorizing_official")
.prefetch_related("permissions")
.order_by(*sort_fields)
.distinct()
)
annotations = cls._domain_metadata_annotations()
# The .values returned from annotate_and_retrieve_fields can't go two levels deep
# (just returns the field id of say, "creator") - so we have to include this.
additional_values = [
"domain__name",
"domain__state",
"domain__first_ready",
"domain__expiration_date",
"domain__created_at",
"domain__deleted",
"authorizing_official__email",
]
# Convert the domain request queryset to a dictionary (including annotated fields)
annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, public_contacts, domain_invitations, user_domain_roles, additional_values)
requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False)
# Write the csv file
cls.write_csv_for_domains(writer, columns, requests_dict)
@classmethod
def export_data_full_to_csv(cls, csv_file):
"""Current full"""
writer = csv.writer(csv_file)
columns = [
"Domain name",
"Domain type",
"Agency",
"Organization name",
"City",
"State",
"Security contact email",
]
# Coalesce is used to replace federal_type of None with ZZZZZ
sort_fields = [
"organization_type",
Coalesce("federal_type", Value("ZZZZZ")),
"federal_agency",
"domain__name",
]
filter_condition = {
"domain__state__in": [
Domain.State.READY,
Domain.State.DNS_NEEDED,
Domain.State.ON_HOLD,
],
}
domain_infos = (
DomainInformation.objects.select_related("domain")
.filter(**filter_condition)
.order_by(*sort_fields)
.distinct()
)
annotations = {}
additional_values = [
"domain__name",
]
# Convert the domain request queryset to a dictionary (including annotated fields)
annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, {}, {}, additional_values)
requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False)
# Write the csv file
cls.write_csv_for_domains(writer, columns, requests_dict)
@classmethod
def export_data_federal_to_csv(cls, csv_file):
"""Current federal"""
writer = csv.writer(csv_file)
columns = [
"Domain name",
"Domain type",
"Agency",
"Organization name",
"City",
"State",
"Security contact email",
]
# Coalesce is used to replace federal_type of None with ZZZZZ
sort_fields = [
"organization_type",
Coalesce("federal_type", Value("ZZZZZ")),
"federal_agency",
"domain__name",
]
filter_condition = {
"organization_type__icontains": "federal",
"domain__state__in": [
Domain.State.READY,
Domain.State.DNS_NEEDED,
Domain.State.ON_HOLD,
],
}
domain_infos = (
DomainInformation.objects.select_related("domain")
.filter(**filter_condition)
.order_by(*sort_fields)
.distinct()
)
annotations = {}
additional_values = [
"domain__name",
]
# Convert the domain request queryset to a dictionary (including annotated fields)
annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, {}, {}, additional_values)
requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False)
# Write the csv file
cls.write_csv_for_domains(writer, columns, requests_dict)
@classmethod
def export_data_domain_growth_to_csv(cls, csv_file, start_date, end_date):
"""
Domain growth:
Receive start and end dates from the view, parse them.
Request from write_body READY domains that are created between
the start and end dates, as well as DELETED domains that are deleted between
the start and end dates. Specify sort params for both lists.
"""
start_date_formatted = format_start_date(start_date)
end_date_formatted = format_end_date(end_date)
writer = csv.writer(csv_file)
# define columns to include in export
columns = [
"Domain name",
"Domain type",
"Agency",
"Organization name",
"City",
"State",
"Status",
"Expiration date",
"Created at",
"First ready",
"Deleted",
]
sort_fields = [
"domain__first_ready",
"domain__name",
]
filter_condition = {
"domain__state__in": [Domain.State.READY],
"domain__first_ready__lte": end_date_formatted,
"domain__first_ready__gte": start_date_formatted,
}
# We also want domains deleted between sar and end dates, sorted
sort_fields_for_deleted_domains = [
"domain__deleted",
"domain__name",
]
filter_condition_for_deleted_domains = {
"domain__state__in": [Domain.State.DELETED],
"domain__deleted__lte": end_date_formatted,
"domain__deleted__gte": start_date_formatted,
}
domain_infos = (
DomainInformation.objects.select_related("domain")
.filter(**filter_condition)
.order_by(*sort_fields)
.distinct()
)
deleted_domain_infos = (
DomainInformation.objects.select_related("domain")
.filter(**filter_condition_for_deleted_domains)
.order_by(*sort_fields_for_deleted_domains)
.distinct()
)
annotations = {}
additional_values = [
"domain__name",
"domain__state",
"domain__first_ready",
"domain__expiration_date",
"domain__created_at",
"domain__deleted",
]
# Convert the domain request queryset to a dictionary (including annotated fields)
annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, {}, {}, additional_values)
requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False)
# Convert the domain request queryset to a dictionary (including annotated fields)
deleted_annotated_domains = cls.annotate_and_retrieve_fields(deleted_domain_infos, annotations, {}, {}, {}, additional_values)
deleted_requests_dict = convert_queryset_to_dict(deleted_annotated_domains, is_model=False)
cls.write_csv_for_domains(
writer, columns, requests_dict
)
cls.write_csv_for_domains(
writer,
columns,
deleted_requests_dict,
should_write_header=False,
)
@classmethod
def export_data_managed_domains_to_csv(cls, csv_file, start_date, end_date):
"""
Managed domains:
Get counts for domains that have domain managers for two different dates,
get list of managed domains at end_date.""" get list of managed domains at end_date."""
start_date_formatted = format_start_date(start_date) start_date_formatted = format_start_date(start_date)
end_date_formatted = format_end_date(end_date) end_date_formatted = format_end_date(end_date)
writer = csv.writer(csv_file) writer = csv.writer(csv_file)
columns = [ columns = [
"Domain name", "Domain name",
"Domain type", "Domain type",
"Domain managers",
"Invited domain managers",
] ]
sort_fields = [ sort_fields = [
"domain__name", "domain__name",
@ -621,19 +446,42 @@ def export_data_managed_domains_to_csv(csv_file, start_date, end_date):
writer.writerow(managed_domains_sliced_at_end_date) writer.writerow(managed_domains_sliced_at_end_date)
writer.writerow([]) writer.writerow([])
write_csv_for_domains( domain_invitations = cls.get_all_domain_invitations()
writer,
columns, # Fetch all relevant ComainUserRole entries
sort_fields, user_domain_roles = cls.get_all_user_domain_roles()
filter_managed_domains_end_date,
should_get_domain_managers=True, annotations = {}
should_write_header=True, # The .values returned from annotate_and_retrieve_fields can't go two levels deep
# (just returns the field id of say, "creator") - so we have to include this.
additional_values = [
"domain__name",
]
domain_infos = (
DomainInformation.objects.select_related("domain")
.prefetch_related("permissions")
.filter(**filter_managed_domains_end_date)
.order_by(*sort_fields)
.distinct()
) )
# Convert the domain request queryset to a dictionary (including annotated fields)
annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, domain_invitations, user_domain_roles, additional_values)
requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False)
def export_data_unmanaged_domains_to_csv(csv_file, start_date, end_date): cls.write_csv_for_domains(
"""Get counts for domains that do not have domain managers for two different dates, writer,
get list of unmanaged domains at end_date.""" columns,
requests_dict
)
@classmethod
def export_data_unmanaged_domains_to_csv(cls, csv_file, start_date, end_date):
"""
Unmanaged domains:
Get counts for domains that have domain managers for two different dates,
get list of managed domains at end_date."""
start_date_formatted = format_start_date(start_date) start_date_formatted = format_start_date(start_date)
end_date_formatted = format_end_date(end_date) end_date_formatted = format_end_date(end_date)
@ -694,15 +542,208 @@ def export_data_unmanaged_domains_to_csv(csv_file, start_date, end_date):
writer.writerow(unmanaged_domains_sliced_at_end_date) writer.writerow(unmanaged_domains_sliced_at_end_date)
writer.writerow([]) writer.writerow([])
write_csv_for_domains( annotations = {}
# The .values returned from annotate_and_retrieve_fields can't go two levels deep
# (just returns the field id of say, "creator") - so we have to include this.
additional_values = [
"domain__name",
]
domain_infos = (
DomainInformation.objects.select_related("domain")
.filter(**filter_unmanaged_domains_end_date)
.order_by(*sort_fields)
.distinct()
)
# Convert the domain request queryset to a dictionary (including annotated fields)
annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, {}, {}, additional_values)
requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False)
cls.write_csv_for_domains(
writer, writer,
columns, columns,
sort_fields, requests_dict
filter_unmanaged_domains_end_date,
should_get_domain_managers=False,
should_write_header=True,
) )
@classmethod
def _domain_metadata_annotations(cls, delimiter=", "):
""""""
return {
"ao_name": Concat(
Coalesce(F("authorizing_official__first_name"), Value("")),
Value(" "),
Coalesce(F("authorizing_official__last_name"), Value("")),
output_field=CharField(),
),
}
@classmethod
def annotate_and_retrieve_fields(
cls, domains, annotations, public_contacts={}, domain_invitations={}, user_domain_roles={}, additional_values=None, include_many_to_many=False
) -> QuerySet:
"""
Applies annotations to a queryset and retrieves specified fields,
including class-defined and annotation-defined.
Parameters:
requests (QuerySet): Initial queryset.
annotations (dict, optional): Fields to compute {field_name: expression}.
additional_values (list, optional): Extra fields to retrieve; defaults to annotation keys if None.
include_many_to_many (bool, optional): Determines if we should include many to many fields or not
Returns:
QuerySet: Contains dictionaries with the specified fields for each record.
"""
if additional_values is None:
additional_values = []
# We can infer that if we're passing in annotations,
# we want to grab the result of said annotation.
if annotations:
additional_values.extend(annotations.keys())
# Get prexisting fields on DomainRequest
domain_fields = set()
for field in DomainInformation._meta.get_fields():
# Exclude many to many fields unless we specify
many_to_many = isinstance(field, ManyToManyField) and include_many_to_many
if many_to_many or not isinstance(field, ManyToManyField):
domain_fields.add(field.name)
queryset = domains.annotate(**annotations).values(*domain_fields, *additional_values)
annotated_domains = []
# Create mapping of domain to a list of invited users and managers
invited_users_dict = defaultdict(list)
for domain, email in domain_invitations:
invited_users_dict[domain].append(email)
managers_dict = defaultdict(list)
for domain, email in user_domain_roles:
managers_dict[domain].append(email)
# Annotate with security_contact from public_contacts
for domain in queryset:
domain['security_contact_email'] = public_contacts.get(domain.get('domain__registry_id'))
domain['invited_users'] = ', '.join(invited_users_dict.get(domain.get('domain__name'), []))
domain['managers'] = ', '.join(managers_dict.get(domain.get('domain__name'), []))
annotated_domains.append(domain)
if annotated_domains:
return annotated_domains
return queryset
@staticmethod
def parse_row_for_domains(columns, domain):
"""
Given a set of columns and a request dictionary, generate a new row from cleaned column data.
"""
status = domain.get("domain__state")
human_readable_status = Domain.State.get_state_label(status)
expiration_date = domain.get("domain__expiration_date")
if expiration_date is None:
expiration_date = "(blank)"
first_ready_on = domain.get("domain__first_ready")
if first_ready_on is None:
first_ready_on = "(blank)"
domain_org_type = domain.get("generic_org_type")
human_readable_domain_org_type = DomainRequest.OrganizationChoices.get_org_label(domain_org_type)
domain_federal_type = domain.get("federal_type")
human_readable_domain_federal_type = BranchChoices.get_branch_label(domain_federal_type)
domain_type = human_readable_domain_org_type
if domain_federal_type and domain_org_type == DomainRequest.OrgChoicesElectionOffice.FEDERAL:
domain_type = f"{human_readable_domain_federal_type} - {human_readable_domain_org_type}"
if domain.get("domain__name") == "18f.gov":
print(f'domain_type {domain_type}')
print(f'federal_agency {domain.get("federal_agency")}')
print(f'city {domain.get("city")}')
# create a dictionary of fields which can be included in output.
# "extra_fields" are precomputed fields (generated in the DB or parsed).
FIELDS = {
"Domain name": domain.get("domain__name"),
"Status": human_readable_status,
"First ready on": first_ready_on,
"Expiration date": expiration_date,
"Domain type": domain_type,
"Agency": domain.get("federal_agency"),
"Organization name": domain.get("organization_name"),
"City": domain.get("city"),
"State": domain.get("state_territory"),
"AO": domain.get("ao_name"),
"AO email": domain.get("authorizing_official__email"),
"Security contact email": domain.get("security_contact_email"),
"Created at": domain.get("domain__created_at"),
"Deleted": domain.get("domain__deleted"),
"Domain managers": domain.get("managers"),
"Invited domain managers": domain.get("invited_users"),
}
row = [FIELDS.get(column, "") for column in columns]
return row
@staticmethod
def write_csv_for_domains(
writer,
columns,
domains_dict,
should_write_header=True,
):
"""Receives params from the parent methods and outputs a CSV with filtered and sorted requests.
Works with write_header as long as the same writer object is passed."""
rows = []
for domain in domains_dict.values():
try:
row = DomainExport.parse_row_for_domains(columns, domain)
rows.append(row)
except ValueError as err:
logger.error(f"csv_export -> Error when parsing row: {err}")
continue
if should_write_header:
write_header(writer, columns)
writer.writerows(rows)
# ============================================================= #
# Helper functions for django ORM queries. #
# We are using these rather than pure python for speed reasons. #
# ============================================================= #
@classmethod
def get_all_security_emails(cls):
"""
Fetch all PublicContact entries and return a mapping of registry_id to email.
"""
public_contacts = PublicContact.objects.values_list('registry_id', 'email')
return {registry_id: email for registry_id, email in public_contacts}
@classmethod
def get_all_domain_invitations(cls):
"""
Fetch all DomainInvitation entries and return a mapping of domain to email.
"""
domain_invitations = DomainInvitation.objects.filter(status="invited").values_list('domain__name', 'email')
return list(domain_invitations)
@classmethod
def get_all_user_domain_roles(cls):
"""
Fetch all UserDomainRole entries and return a mapping of domain to user__email.
"""
user_domain_roles = UserDomainRole.objects.select_related('user').values_list('domain__name', 'user__email')
return list(user_domain_roles)
class DomainRequestExport: class DomainRequestExport:
""" """

View file

@ -142,7 +142,7 @@ class ExportDataType(View):
# match the CSV example with all the fields # match the CSV example with all the fields
response = HttpResponse(content_type="text/csv") response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = 'attachment; filename="domains-by-type.csv"' response["Content-Disposition"] = 'attachment; filename="domains-by-type.csv"'
csv_export.export_data_type_to_csv(response) csv_export.DomainExport.export_data_type_to_csv(response)
return response return response
@ -151,7 +151,7 @@ class ExportDataFull(View):
# Smaller export based on 1 # Smaller export based on 1
response = HttpResponse(content_type="text/csv") response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = 'attachment; filename="current-full.csv"' response["Content-Disposition"] = 'attachment; filename="current-full.csv"'
csv_export.export_data_full_to_csv(response) csv_export.DomainExport.export_data_full_to_csv(response)
return response return response
@ -160,7 +160,7 @@ class ExportDataFederal(View):
# Federal only # Federal only
response = HttpResponse(content_type="text/csv") response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = 'attachment; filename="current-federal.csv"' response["Content-Disposition"] = 'attachment; filename="current-federal.csv"'
csv_export.export_data_federal_to_csv(response) csv_export.DomainExport.export_data_federal_to_csv(response)
return response return response
@ -177,31 +177,23 @@ class ExportDomainRequestDataFull(View):
class ExportDataDomainsGrowth(View): class ExportDataDomainsGrowth(View):
def get(self, request, *args, **kwargs): def get(self, request, *args, **kwargs):
# Get start_date and end_date from the request's GET parameters
# #999: not needed if we switch to django forms
start_date = request.GET.get("start_date", "") start_date = request.GET.get("start_date", "")
end_date = request.GET.get("end_date", "") end_date = request.GET.get("end_date", "")
response = HttpResponse(content_type="text/csv") response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = f'attachment; filename="domain-growth-report-{start_date}-to-{end_date}.csv"' response["Content-Disposition"] = f'attachment; filename="domain-growth-report-{start_date}-to-{end_date}.csv"'
# For #999: set export_data_domain_growth_to_csv to return the resulting queryset, which we can then use csv_export.DomainExport.export_data_domain_growth_to_csv(response, start_date, end_date)
# in context to display this data in the template.
csv_export.export_data_domain_growth_to_csv(response, start_date, end_date)
return response return response
class ExportDataRequestsGrowth(View): class ExportDataRequestsGrowth(View):
def get(self, request, *args, **kwargs): def get(self, request, *args, **kwargs):
# Get start_date and end_date from the request's GET parameters
# #999: not needed if we switch to django forms
start_date = request.GET.get("start_date", "") start_date = request.GET.get("start_date", "")
end_date = request.GET.get("end_date", "") end_date = request.GET.get("end_date", "")
response = HttpResponse(content_type="text/csv") response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = f'attachment; filename="requests-{start_date}-to-{end_date}.csv"' response["Content-Disposition"] = f'attachment; filename="requests-{start_date}-to-{end_date}.csv"'
# For #999: set export_data_domain_growth_to_csv to return the resulting queryset, which we can then use
# in context to display this data in the template.
csv_export.DomainRequestExport.export_data_requests_growth_to_csv(response, start_date, end_date) csv_export.DomainRequestExport.export_data_requests_growth_to_csv(response, start_date, end_date)
return response return response
@ -209,25 +201,21 @@ class ExportDataRequestsGrowth(View):
class ExportDataManagedDomains(View): class ExportDataManagedDomains(View):
def get(self, request, *args, **kwargs): def get(self, request, *args, **kwargs):
# Get start_date and end_date from the request's GET parameters
# #999: not needed if we switch to django forms
start_date = request.GET.get("start_date", "") start_date = request.GET.get("start_date", "")
end_date = request.GET.get("end_date", "") end_date = request.GET.get("end_date", "")
response = HttpResponse(content_type="text/csv") response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = f'attachment; filename="managed-domains-{start_date}-to-{end_date}.csv"' response["Content-Disposition"] = f'attachment; filename="managed-domains-{start_date}-to-{end_date}.csv"'
csv_export.export_data_managed_domains_to_csv(response, start_date, end_date) csv_export.DomainExport.export_data_managed_domains_to_csv(response, start_date, end_date)
return response return response
class ExportDataUnmanagedDomains(View): class ExportDataUnmanagedDomains(View):
def get(self, request, *args, **kwargs): def get(self, request, *args, **kwargs):
# Get start_date and end_date from the request's GET parameters
# #999: not needed if we switch to django forms
start_date = request.GET.get("start_date", "") start_date = request.GET.get("start_date", "")
end_date = request.GET.get("end_date", "") end_date = request.GET.get("end_date", "")
response = HttpResponse(content_type="text/csv") response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = f'attachment; filename="unamanaged-domains-{start_date}-to-{end_date}.csv"' response["Content-Disposition"] = f'attachment; filename="unamanaged-domains-{start_date}-to-{end_date}.csv"'
csv_export.export_data_unmanaged_domains_to_csv(response, start_date, end_date) csv_export.DomainExport.export_data_unmanaged_domains_to_csv(response, start_date, end_date)
return response return response