This commit is contained in:
zandercymatics 2024-06-07 14:26:04 -06:00
parent cafce7c54b
commit 34a9497372
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
2 changed files with 187 additions and 197 deletions

View file

@ -298,3 +298,27 @@ def replace_url_queryparams(url_to_modify: str, query_params, convert_list_to_cs
new_url = urlunparse(url_parts)
return new_url
@staticmethod
def convert_queryset_to_dict(queryset, is_model=True, key="id"):
"""
Transforms a queryset into a dictionary keyed by a specified key (like "id").
Parameters:
requests (QuerySet or list of dicts): Input data.
is_model (bool): Indicates if each item in 'queryset' are model instances (True) or dictionaries (False).
key (str): Key or attribute to use for the resulting dictionary's keys.
Returns:
dict: Dictionary with keys derived from 'key' and values corresponding to items in 'queryset'.
"""
if is_model:
request_dict = {getattr(value, key): value for value in queryset}
else:
# Querysets sometimes contain sets of dictionaries.
# Calling .values is an example of this.
request_dict = {value[key]: value for value in queryset}
return request_dict

View file

@ -14,6 +14,7 @@ from django.utils import timezone
from django.core.paginator import Paginator
from django.db.models.functions import Concat, Coalesce
from django.contrib.postgres.aggregates import StringAgg
from registrar.models.utility.generic_helper import convert_queryset_to_dict
from registrar.templatetags.custom_filters import get_region
from registrar.utility.enums import DefaultEmail
@ -713,6 +714,9 @@ class DomainRequestExport:
Purely organizational -- all functions are independent.
"""
# Get all the field names of the DomainRequest object
domain_request_fields = set(field.name for field in DomainRequest._meta.get_fields())
all_columns = [
"Domain request",
"Submitted at",
@ -743,111 +747,8 @@ class DomainRequestExport:
"Investigator",
]
@staticmethod
def write_csv_for_requests(
writer,
columns,
requests,
requests_with_extra_fields=None,
should_write_header=True,
):
"""Receives params from the parent methods and outputs a CSV with filtered and sorted requests.
Works with write_header as long as the same writer object is passed."""
# Reduce the memory overhead when performing the write operation
paginator = Paginator(requests, 1000)
total_body_rows = []
for page_num in paginator.page_range:
page = paginator.page(page_num)
rows = []
for request in page.object_list:
try:
extra = requests_with_extra_fields.get(request.id) if requests_with_extra_fields else {}
row = DomainRequestExport.parse_row_for_requests(columns, request, extra)
rows.append(row)
except ValueError as err:
logger.error(f"csv_export -> Error when parsing row: {err}")
continue
total_body_rows.extend(rows)
if should_write_header:
write_header(writer, columns)
writer.writerows(total_body_rows)
@staticmethod
def parse_row_for_requests(columns, request: DomainRequest, extra_fields: dict):
"""
Given a set of columns and a request dictionary,
generate a new row from cleaned column data
"""
# Handle the domain_type field. Defaults to the wrong format.
org_type = request.organization_type
if org_type and extra_fields.get("domain_type") is None:
readable_org_type = DomainRequest.OrganizationChoices.get_org_label(org_type)
extra_fields["domain_type"] = readable_org_type
# Handle the federal_type field. Defaults to the wrong format.
federal_type = request.federal_type
if federal_type and extra_fields.get("human_readable_federal_type") is None:
extra_fields["human_readable_federal_type"] = DomainRequest.BranchChoices.get_branch_label(federal_type)
# Handle the status field. Defaults to the wrong format.
status = request.status
if status and extra_fields.get("status_display") is None:
extra_fields["status_display"] = DomainRequest.DomainRequestStatus.get_status_label(status)
# Handle the region field.
state_territory = request.state_territory
if state_territory and extra_fields.get("region") is None:
extra_fields["region"] = get_region(state_territory)
human_readable_election_board = "N/A"
if request.is_election_board is not None:
human_readable_election_board = "Yes" if request.is_election_board else "No"
requested_domain = request.requested_domain
if extra_fields.get("requested_domain_name") is None:
extra_fields["requested_domain_name"] = getattr(requested_domain, "name", "No requested domain")
# create a dictionary of fields which can be included in output.
# "extra_fields" are precomputed fields (generated in the DB or parsed).
FIELDS = {
"Domain request": extra_fields.get("requested_domain_name"),
"Status": extra_fields.get("status_display"),
"Domain type": extra_fields.get("domain_type"),
"Federal type": extra_fields.get("human_readable_federal_type"),
"Region": extra_fields.get("region"),
"Creator approved domains count": extra_fields.get("creator_approved_domains_count", 0),
"Creator active requests count": extra_fields.get("creator_active_requests_count", 0),
"Alternative domains": extra_fields.get("all_alternative_domains"),
"Request additional details": extra_fields.get("additional_details"),
"Other contacts": extra_fields.get("all_other_contacts"),
"Current websites": extra_fields.get("all_current_websites"),
# Normal fields
"Federal agency": request.federal_agency.agency if request.federal_agency else None,
"AO first name": request.authorizing_official.first_name,
"AO last name": request.authorizing_official.last_name,
"AO email": request.authorizing_official.email,
"AO title/role": request.authorizing_official.title,
"Creator first name": request.creator.first_name,
"Creator last name": request.creator.last_name,
"Creator email": request.creator.email,
"Organization name": request.organization_name,
"Election office": human_readable_election_board,
"City": request.city,
"State/territory": request.state_territory,
"Request purpose": request.purpose,
"CISA regional representative": request.cisa_representative_email,
"Investigator": request.investigator.email if request.investigator else None,
"Submitted at": request.submission_date,
}
row = [FIELDS.get(column, "") for column in columns]
return row
@staticmethod
def export_data_requests_growth_to_csv(csv_file, start_date, end_date):
@classmethod
def export_data_requests_growth_to_csv(cls, csv_file, start_date, end_date):
"""
Growth report:
Receive start and end dates from the view, parse them.
@ -876,33 +777,27 @@ class DomainRequestExport:
all_requests = DomainRequest.objects.filter(**filter_condition).order_by(*sort_fields).distinct()
# Convert the request to a querystring. Only grab what we need.
values_to_return = ["id", "generic_org_type", "federal_type", "submission_date"]
extra_values = DomainRequestExport.annotate_request_and_return_values(all_requests, {}, values_to_return)
# Override the default value for domain_type
for request in extra_values.values():
# Handle the domain_type field. Defaults to the wrong variant.
org_type = request.get("generic_org_type")
federal_type = request.get("federal_type")
request["domain_type"] = None
if org_type:
readable_org_type = DomainRequest.OrganizationChoices.get_org_label(org_type)
if federal_type:
readable_federal_type = DomainRequest.BranchChoices.get_branch_label(federal_type)
request["domain_type"] = f"{readable_org_type} - {readable_federal_type}"
else:
request["domain_type"] = readable_org_type
DomainRequestExport.write_csv_for_requests(writer, columns, all_requests, extra_values, should_write_header=True)
annotated_requests = cls.annotate_and_retrieve_fields(all_requests, {"requested_domain__name"})
requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False)
cls.write_csv_for_requests(writer, columns, requests_dict)
@classmethod
def export_full_domain_request_report(cls, csv_file):
writer = csv.writer(csv_file)
requests = (
DomainRequest.objects.select_related(
"creator", "authorizing_official", "federal_agency", "investigator", "requested_domain"
)
.prefetch_related("current_websites", "other_contacts", "alternative_domains")
.exclude(status__in=[DomainRequest.DomainRequestStatus.STARTED])
.order_by("status","requested_domain__name",)
.distinct()
)
# Annotations are custom columns returned to the queryset (AKA: computed in the DB)
annotations = {
"additional_details": DomainRequestExport.get_additional_details_query(),
"creator_approved_domains_count": DomainRequestExport.get_creator_approved_domains_count_query(),
"creator_active_requests_count": DomainRequestExport.get_creator_active_requests_count_query(),
"all_other_contacts": DomainRequestExport.get_all_other_contacts_query(),
@ -910,95 +805,166 @@ class DomainRequestExport:
"all_alternative_domains": StringAgg("alternative_domains__website", delimiter=" | ", distinct=True),
}
values_to_return = [
# Return out custom fields
"all_alternative_domains",
"all_other_contacts",
"all_current_websites",
"additional_details",
"creator_approved_domains_count",
"creator_active_requests_count",
# .values can't go two levels deep (just returns the id) - so we have to include this.
additional_values = [
"requested_domain__name",
"federal_agency__agency",
"authorizing_official__first_name",
"authorizing_official__last_name",
"authorizing_official__email",
"authorizing_official__title",
"creator__first_name",
"creator__last_name",
"creator__email",
"investigator__email"
]
requests = (
DomainRequest.objects.select_related(
"creator", "authorizing_official", "federal_agency", "investigator", "requested_domain"
)
.exclude(status__in=[DomainRequest.DomainRequestStatus.STARTED])
.order_by("status","requested_domain__name",)
.distinct()
)
# Convert the domain request queryset to a dictionary (including annotated fields)
annotated_requests = cls.annotate_and_retrieve_fields(requests, annotations, additional_values)
requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False)
extra_values = DomainRequestExport.annotate_request_and_return_values(requests, annotations, values_to_return)
DomainRequestExport.write_csv_for_requests(writer, cls.all_columns, requests, extra_values, should_write_header=True)
# Write the csv file
cls.write_csv_for_requests(writer, cls.all_columns, requests_dict)
@staticmethod
def annotate_request_and_return_values(requests, annotations, values_to_return, return_dict=True) -> QuerySet | dict:
def write_csv_for_requests(
writer,
columns,
requests_dict,
should_write_header=True,
):
"""Receives params from the parent methods and outputs a CSV with filtered and sorted requests.
Works with write_header as long as the same writer object is passed."""
rows = []
for request in requests_dict.values():
try:
row = DomainRequestExport.parse_row_for_requests(columns, request)
rows.append(row)
except ValueError as err:
logger.error(f"csv_export -> Error when parsing row: {err}")
continue
if should_write_header:
write_header(writer, columns)
writer.writerows(rows)
@staticmethod
def parse_row_for_requests(columns, request, show_expanded_org_type=False):
"""
Annotates a queryset with specified annotations and retrieves specified fields.
Given a set of columns and a request dictionary,
generate a new row from cleaned column data
"""
# FK fields
federal_agency = "federal_agency"
ao = "authorizing_official"
creator = "creator"
investigator = "investigator"
# Handle the federal_type field. Defaults to the wrong format.
federal_type = request.get("federal_type")
human_readable_federal_type = DomainRequest.BranchChoices.get_branch_label(federal_type) if federal_type else None
# Handle the org_type field. Either "Tribal" or "Federal - Executive".
org_type = request.get("organization_type")
human_readable_org_type = DomainRequest.OrganizationChoices.get_org_label(org_type) if org_type else None
if human_readable_org_type and human_readable_federal_type and show_expanded_org_type:
human_readable_org_type = f"{human_readable_org_type} - {human_readable_federal_type}"
# Handle the status field. Defaults to the wrong format.
status = request.get("status")
status_display = DomainRequest.DomainRequestStatus.get_status_label(status) if status else None
# Handle the region field.
state_territory = request.get("state_territory")
region = get_region(state_territory) if state_territory else None
requested_domain = request.get("requested_domain__name")
requested_domain_name = requested_domain if requested_domain else "No requested domain"
# Handle the election field. N/A if None, "Yes"/"No" if boolean
human_readable_election_board = "N/A"
if request.get("is_election_board") is not None:
human_readable_election_board = "Yes" if request.is_election_board else "No"
# Handle the additional details field. Pipe seperated
details = [request.get("cisa_representative_email"), request.get("anything_else")]
additional_details = " | ".join([field for field in details if field is not None])
# create a dictionary of fields which can be included in output.
# "extra_fields" are precomputed fields (generated in the DB or parsed).
FIELDS = {
# Parsed fields - defined above.
"Domain request": requested_domain_name,
"Region": region,
"Status": status_display,
"Election office": human_readable_election_board,
"Federal type": human_readable_federal_type,
"Domain type": human_readable_org_type,
"Request additional details": additional_details,
# Annotated fields - passed into the request dict.
"Creator approved domains count": request.get("creator_approved_domains_count", 0),
"Creator active requests count": request.get("creator_active_requests_count", 0),
"Alternative domains": request.get("all_alternative_domains"),
"Other contacts": request.get("all_other_contacts"),
"Current websites": request.get("all_current_websites"),
# Untouched FK fields - passed into the request dict.
"Federal agency": request.get(f"{federal_agency}__agency"),
"AO first name": request.get(f"{ao}__first_name"),
"AO last name": request.get(f"{ao}__last_name"),
"AO email": request.get(f"{ao}__email"),
"AO title/role": request.get(f"{ao}__title"),
"Creator first name": request.get(f"{creator}__first_name"),
"Creator last name": request.get(f"{creator}__last_name"),
"Creator email": request.get(f"{creator}__email"),
"Investigator": request.get(f"{investigator}__email"),
# Untouched fields
"Organization name": request.get("organization_name"),
"City": request.get("city"),
"State/territory": request.get("state_territory"),
"Request purpose": request.get("purpose"),
"CISA regional representative": request.get("cisa_representative_email"),
"Submitted at": request.get("submission_date"),
}
row = [FIELDS.get(column, "") for column in columns]
return row
@classmethod
def annotate_and_retrieve_fields(cls, requests, annotations, additional_values=None) -> QuerySet:
"""
Applies annotations to a queryset and retrieves specified fields,
including class-defined and annotation-defined.
Parameters:
requests (QuerySet): The initial queryset to which annotations will be applied.
annotations (dict): A dictionary where keys are names of the new fields to create, and values are
expressions that describe how to calculate these fields.
values_to_return (list): A list of strings that specify which fields should be included in the final
output, in addition to the 'id' field which is included by default.
return_dict (bool): If True, the method returns a dictionary where each key is an 'id' of an item
and each value is a dictionary of the fields specified in `values_to_return`.
If False, the method returns a QuerySet containing dictionaries of the requested fields.
requests (QuerySet): Initial queryset.
annotations (dict, optional): Fields to compute {field_name: expression}.
additional_values (list, optional): Extra fields to retrieve; defaults to annotation keys if None.
Returns:
QuerySet | dict: Depending on the value of `return_dict`, returns either a QuerySet or a dictionary.
The QuerySet contains dictionaries for each record with the specified fields.
The dictionary contains keys as record ids and values as dictionaries of the specified fields.
QuerySet: Contains dictionaries with the specified fields for each record.
"""
queryset = requests.annotate(**annotations).values("id", *values_to_return)
if return_dict:
requests_dict = {value["id"]: value for value in queryset}
return requests_dict
if annotations is None:
annotations = {}
if additional_values is None:
additional_values = []
# We can infer that if we're passing in annotations,
# we want to grab the result of said annotation.
if annotations:
additional_values.extend(annotations.keys())
queryset = requests.annotate(**annotations).values(*cls.domain_request_fields, *additional_values)
return queryset
# ============================================================= #
# Helper functions for django ORM queries. #
# We are using these rather than pure python for speed reasons. #
# ============================================================= #
# AXE THIS
@staticmethod
def get_additional_details_query(default_message=None, delimiter=" | "):
"""
A SQL case statement for DomainRequest.anything_else and DomainRequest.cisa_representative_email.
When ran, returns a concatenation of anything_else and cisa_representative_email - seperated by " | ".
Equivalent to:
`f"{cisa_rep} | {anything_else}" If anything_else or cisa_rep else None`
"""
# TODO this should be replaced with cisa_representative_first_name and cisa_representative_last_name
additional_details_query = Case(
When(
cisa_representative_email__isnull=False,
anything_else__isnull=False,
then=Concat(F("cisa_representative_email"), Value(delimiter), F("anything_else")),
),
When(
cisa_representative_email__isnull=True,
anything_else__isnull=False,
then=F("anything_else"),
),
When(
cisa_representative_email__isnull=False,
anything_else__isnull=True,
then=F("cisa_representative_email"),
),
default=Value(default_message),
output_field=CharField(),
)
return additional_details_query
@staticmethod
def get_all_other_contacts_query(delimiter=" | "):