Update csv_export.py

This commit is contained in:
zandercymatics 2024-06-07 12:19:49 -06:00
parent be9e3ccb1a
commit cafce7c54b
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7

View file

@ -713,20 +713,47 @@ class DomainRequestExport:
Purely organizational -- all functions are independent. Purely organizational -- all functions are independent.
""" """
all_columns = [
"Domain request",
"Submitted at",
"Status",
"Domain type",
"Federal type",
"Federal agency",
"Organization name",
"Election office",
"City",
"State/territory",
"Region",
"Creator first name",
"Creator last name",
"Creator email",
"Creator approved domains count",
"Creator active requests count",
"Alternative domains",
"AO first name",
"AO last name",
"AO email",
"AO title/role",
"Request purpose",
"Request additional details",
"Other contacts",
"CISA regional representative",
"Current websites",
"Investigator",
]
@staticmethod @staticmethod
def write_csv_for_requests( def write_csv_for_requests(
writer, writer,
columns, columns,
requests, requests,
extra_request_fields=None, requests_with_extra_fields=None,
should_write_header=True, should_write_header=True,
): ):
"""Receives params from the parent methods and outputs a CSV with filtered and sorted requests. """Receives params from the parent methods and outputs a CSV with filtered and sorted requests.
Works with write_header as long as the same writer object is passed.""" Works with write_header as long as the same writer object is passed."""
if extra_request_fields:
extra_request_fields_dict = {request["id"]: request for request in extra_request_fields}
# Reduce the memory overhead when performing the write operation # Reduce the memory overhead when performing the write operation
paginator = Paginator(requests, 1000) paginator = Paginator(requests, 1000)
total_body_rows = [] total_body_rows = []
@ -735,7 +762,7 @@ class DomainRequestExport:
rows = [] rows = []
for request in page.object_list: for request in page.object_list:
try: try:
extra = extra_request_fields_dict.get(request.id) if extra_request_fields else None extra = requests_with_extra_fields.get(request.id) if requests_with_extra_fields else {}
row = DomainRequestExport.parse_row_for_requests(columns, request, extra) row = DomainRequestExport.parse_row_for_requests(columns, request, extra)
rows.append(row) rows.append(row)
except ValueError as err: except ValueError as err:
@ -850,10 +877,11 @@ class DomainRequestExport:
all_requests = DomainRequest.objects.filter(**filter_condition).order_by(*sort_fields).distinct() all_requests = DomainRequest.objects.filter(**filter_condition).order_by(*sort_fields).distinct()
# Convert the request to a querystring. Only grab what we need. # Convert the request to a querystring. Only grab what we need.
annotations = all_requests.values("id", "generic_org_type", "federal_type", "submission_date") values_to_return = ["id", "generic_org_type", "federal_type", "submission_date"]
extra_values = DomainRequestExport.annotate_request_and_return_values(all_requests, {}, values_to_return)
# Override the default value for domain_type # Override the default value for domain_type
for request in annotations: for request in extra_values.values():
# Handle the domain_type field. Defaults to the wrong variant. # Handle the domain_type field. Defaults to the wrong variant.
org_type = request.get("generic_org_type") org_type = request.get("generic_org_type")
federal_type = request.get("federal_type") federal_type = request.get("federal_type")
@ -867,111 +895,76 @@ class DomainRequestExport:
else: else:
request["domain_type"] = readable_org_type request["domain_type"] = readable_org_type
DomainRequestExport.write_csv_for_requests(writer, columns, all_requests, annotations, should_write_header=True) DomainRequestExport.write_csv_for_requests(writer, columns, all_requests, extra_values, should_write_header=True)
@staticmethod @classmethod
def export_full_domain_request_report(csv_file): def export_full_domain_request_report(cls, csv_file):
writer = csv.writer(csv_file) writer = csv.writer(csv_file)
columns = [
"Domain request",
"Submitted at",
"Status",
"Domain type",
"Federal type",
"Federal agency",
"Organization name",
"Election office",
"City",
"State/territory",
"Region",
"Creator first name",
"Creator last name",
"Creator email",
"Creator approved domains count",
"Creator active requests count",
"Alternative domains",
"AO first name",
"AO last name",
"AO email",
"AO title/role",
"Request purpose",
"Request additional details",
"Other contacts",
"CISA regional representative",
"Current websites",
"Investigator",
]
excluded_statuses = [DomainRequest.DomainRequestStatus.STARTED]
order_by = [
"status",
"requested_domain__name",
]
requests = (
DomainRequest.objects.select_related(
"creator", "authorizing_official", "federal_agency", "investigator", "requested_domain"
)
.exclude(status__in=excluded_statuses)
.order_by(*order_by)
.distinct()
)
extra_fields = DomainRequestExport.annotate_and_prepare_domain_request_data(requests)
DomainRequestExport.write_csv_for_requests(writer, columns, requests, extra_fields, should_write_header=True) annotations = {
"additional_details": DomainRequestExport.get_additional_details_query(),
"creator_approved_domains_count": DomainRequestExport.get_creator_approved_domains_count_query(),
"creator_active_requests_count": DomainRequestExport.get_creator_active_requests_count_query(),
"all_other_contacts": DomainRequestExport.get_all_other_contacts_query(),
"all_current_websites": StringAgg("current_websites__website", delimiter=" | ", distinct=True),
"all_alternative_domains": StringAgg("alternative_domains__website", delimiter=" | ", distinct=True),
}
@staticmethod values_to_return = [
def annotate_and_prepare_domain_request_data(requests_to_convert: QuerySet[DomainRequest]) -> QuerySet: # Return out custom fields
"""
This function is designed to optimize performance by performing data manipulations directly in the database
rather than in Python code, which is especially beneficial for operations on many-to-many fields.
Args:
requests_to_convert (QuerySet[DomainRequest]): A Django QuerySet of DomainRequest objects to be annotated.
Returns:
QuerySet: An annotated queryset that includes both original and annotated fields.
Annotations (examples of python-readable equivalents):
- additional_details: `f"{cisa_rep} | {anything_else}" If anything_else or cisa_rep else None`.
- all_other_contacts: `[f"{c.first_name} {c.last_name} {c.email}" for c in request.other_contacts.all()].join(" | ")`.
- all_current_websites: `[w.website for w in request.current_websites.all()].join(" | ")`.
- all_alternative_domains: `[d.website for d in request.alternative_domains.all()].join(" | ")`.
""" # noqa
# As stated, this is equivalent to performing a bunch of if-statement like operations to
# each of these fields inside a for loop. However, we want to avoid that for the sake
# of performance - especially on many-to-many fields (which would require repeated DB calls in the loop).
# By doing these operations in the DB, we save a lot of computation time.
# We can do this for most fields, except ones that require us to grab .label (such as generic_org_type).
# For those fields, they will otherwise just return the value representation so we parse those manually.
parsed_requests = requests_to_convert.annotate(
additional_details=DomainRequestExport.get_additional_details_query(),
creator_approved_domains_count=DomainRequestExport.get_creator_approved_domains_count_query(),
creator_active_requests_count=DomainRequestExport.get_creator_active_requests_count_query(),
all_other_contacts=DomainRequestExport.get_all_other_contacts_query(),
all_current_websites=StringAgg("current_websites__website", delimiter=" | ", distinct=True),
all_alternative_domains=StringAgg("alternative_domains__website", delimiter=" | ", distinct=True),
)
requests_queryset = parsed_requests.values(
# Existing fields
"id",
# Custom fields
"all_alternative_domains", "all_alternative_domains",
"all_other_contacts", "all_other_contacts",
"all_current_websites", "all_current_websites",
"additional_details", "additional_details",
"creator_approved_domains_count", "creator_approved_domains_count",
"creator_active_requests_count", "creator_active_requests_count",
]
requests = (
DomainRequest.objects.select_related(
"creator", "authorizing_official", "federal_agency", "investigator", "requested_domain"
)
.exclude(status__in=[DomainRequest.DomainRequestStatus.STARTED])
.order_by("status","requested_domain__name",)
.distinct()
) )
return requests_queryset extra_values = DomainRequestExport.annotate_request_and_return_values(requests, annotations, values_to_return)
DomainRequestExport.write_csv_for_requests(writer, cls.all_columns, requests, extra_values, should_write_header=True)
@staticmethod
def annotate_request_and_return_values(requests, annotations, values_to_return, return_dict=True) -> QuerySet | dict:
"""
Annotates a queryset with specified annotations and retrieves specified fields.
Parameters:
requests (QuerySet): The initial queryset to which annotations will be applied.
annotations (dict): A dictionary where keys are names of the new fields to create, and values are
expressions that describe how to calculate these fields.
values_to_return (list): A list of strings that specify which fields should be included in the final
output, in addition to the 'id' field which is included by default.
return_dict (bool): If True, the method returns a dictionary where each key is an 'id' of an item
and each value is a dictionary of the fields specified in `values_to_return`.
If False, the method returns a QuerySet containing dictionaries of the requested fields.
Returns:
QuerySet | dict: Depending on the value of `return_dict`, returns either a QuerySet or a dictionary.
The QuerySet contains dictionaries for each record with the specified fields.
The dictionary contains keys as record ids and values as dictionaries of the specified fields.
"""
queryset = requests.annotate(**annotations).values("id", *values_to_return)
if return_dict:
requests_dict = {value["id"]: value for value in queryset}
return requests_dict
return queryset
# ============================================================= # # ============================================================= #
# Helper functions for django ORM queries. # # Helper functions for django ORM queries. #
# We are using these rather than pure python for speed reasons. # # We are using these rather than pure python for speed reasons. #
# ============================================================= # # ============================================================= #
# AXE THIS
@staticmethod @staticmethod
def get_additional_details_query(default_message=None, delimiter=" | "): def get_additional_details_query(default_message=None, delimiter=" | "):
""" """