Update csv_export.py

This commit is contained in:
zandercymatics 2024-06-05 15:39:22 -06:00
parent 9898fe16c0
commit d870127eb3
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7

View file

@ -1,7 +1,15 @@
import csv
import logging
from datetime import datetime
from registrar.models import Domain, DomainInvitation, DomainRequest, DomainInformation, User, PublicContact, UserDomainRole
from registrar.models import (
Domain,
DomainInvitation,
DomainRequest,
DomainInformation,
User,
PublicContact,
UserDomainRole,
)
from django.db.models import QuerySet, Value, Case, When, CharField, Count, Q, F, Value, CharField
from django.utils import timezone
from django.core.paginator import Paginator
@ -293,6 +301,7 @@ def write_csv_for_domains(
write_header(writer, columns)
writer.writerows(total_body_rows)
def export_data_type_to_csv(csv_file):
"""
All domains report with extra columns.
@ -733,44 +742,44 @@ class DomainRequestExport:
if should_write_header:
write_header(writer, columns)
writer.writerows(total_body_rows)
@staticmethod
def parse_row_for_requests(columns, request):
"""Given a set of columns, generate a new row from cleaned column data"""
# create a dictionary of fields which can be included in output
FIELDS = {
"Domain request": request.get('requested_domain_name'),
"Submitted at": request.get('submission_date'), # TODO - different format?
"Status": request.get('status_display'),
"Domain type": request.get('domain_type'),
"Domain request": request.get("requested_domain_name"),
"Submitted at": request.get("submission_date"), # TODO - different format?
"Status": request.get("status_display"),
"Domain type": request.get("domain_type"),
"Federal type": request.get("human_readable_federal_type"),
"Federal agency": request.get('federal_agency_name'),
"Organization name": request.get('organization_name'),
"Federal agency": request.get("federal_agency_name"),
"Organization name": request.get("organization_name"),
"Election office": request.get("human_readable_election_board"),
"City": request.get('city'),
"State/territory": request.get('state_territory'),
"City": request.get("city"),
"State/territory": request.get("state_territory"),
"Region": None, # TODO - what is this field?
# Creator
"Creator first name": request.get('creator__first_name', ""),
"Creator last name": request.get('creator__last_name', ""),
"Creator email": request.get('creator__email', ""),
"Creator first name": request.get("creator__first_name", ""),
"Creator last name": request.get("creator__last_name", ""),
"Creator email": request.get("creator__email", ""),
"Creator approved domains count": request.get("creator_approved_domains_count", 0),
"Creator active requests count": request.get("creator_active_requests_count", 0),
# End of creator
"Alternative domains": request.get('all_alternative_domains'),
"Alternative domains": request.get("all_alternative_domains"),
# AO
"AO first name": request.get('authorizing_official__first_name', ""),
"AO last name": request.get('authorizing_official__last_name', ""),
"AO email": request.get('authorizing_official__email', ""),
"AO title/role": request.get('authorizing_official__title', ""),
"AO first name": request.get("authorizing_official__first_name", ""),
"AO last name": request.get("authorizing_official__last_name", ""),
"AO email": request.get("authorizing_official__email", ""),
"AO title/role": request.get("authorizing_official__title", ""),
# End of AO
"Request purpose": request.get('purpose'),
"Request additional details": request.get('additional_details'),
"Other contacts": request.get('all_other_contacts'),
"CISA regional representative": request.get('cisa_representative_email'), # TODO - same problem as before
"Current websites": request.get('all_current_websites'),
"Investigator": request.get('investigator_email'),
"Request purpose": request.get("purpose"),
"Request additional details": request.get("additional_details"),
"Other contacts": request.get("all_other_contacts"),
"CISA regional representative": request.get("cisa_representative_email"), # TODO - same problem as before
"Current websites": request.get("all_current_websites"),
"Investigator": request.get("investigator_email"),
}
row = [FIELDS.get(column, "") for column in columns]
@ -809,13 +818,8 @@ class DomainRequestExport:
# Convert the request to a querystring for faster processing. Only grab what we need.
parsed_requests = all_requests.annotate(
requested_domain_name=DomainRequestExport.get_requested_domain_name_query(),
approved_domain_name=F('approved_domain__name'),
).values(
"requested_domain_name",
"generic_org_type",
"federal_type",
"submission_date"
)
approved_domain_name=F("approved_domain__name"),
).values("requested_domain_name", "generic_org_type", "federal_type", "submission_date")
# Set the domain_type field.
# Relies on a python function, get_organization_type_display, so we have to
@ -911,74 +915,50 @@ class DomainRequestExport:
parsed_requests = requests_to_convert.annotate(
requested_domain_name=DomainRequestExport.get_requested_domain_name_query(),
additional_details=DomainRequestExport.get_additional_details_query(),
all_other_contacts=StringAgg(
Concat('other_contacts__first_name', Value(' '), 'other_contacts__last_name', Value(' '), 'other_contacts__email'),
delimiter=' | ',
distinct=True
),
all_current_websites=StringAgg('current_websites__website', delimiter=' | ', distinct=True),
all_alternative_domains=StringAgg('alternative_domains__website', delimiter=' | ', distinct=True),
# TODO - this is returning the wrong count
creator_approved_domains_count=Count(
'creator__domain_requests_created__id',
filter=Q(creator__domain_requests_created__status=DomainRequest.DomainRequestStatus.APPROVED),
distinct=True
),
# TODO - this is returning the wrong count
creator_active_requests_count=Count(
'creator__domain_requests_created__id',
filter=Q(creator__domain_requests_created__status__in=[
DomainRequest.DomainRequestStatus.SUBMITTED,
DomainRequest.DomainRequestStatus.IN_REVIEW,
DomainRequest.DomainRequestStatus.ACTION_NEEDED
]),
distinct=True
),
federal_agency_name = F("federal_agency__agency"),
investigator_email = F("investigator__email"),
human_readable_election_board=Case(
When(is_election_board=True, then=Value("Yes")),
When(is_election_board=False, then=Value("No")),
default=Value("N/A"),
output_field=CharField()
)
creator_approved_domains_count=DomainRequestExport.get_creator_approved_domains_count_query(),
creator_active_requests_count=DomainRequestExport.get_creator_active_requests_count_query(),
human_readable_election_board=DomainRequestExport.get_human_readable_election_board_query(),
all_other_contacts=DomainRequestExport.get_all_other_contacts_query(),
all_current_websites=StringAgg("current_websites__website", delimiter=" | ", distinct=True),
all_alternative_domains=StringAgg("alternative_domains__website", delimiter=" | ", distinct=True),
federal_agency_name=F("federal_agency__agency"),
investigator_email=F("investigator__email"),
)
requests_dict = parsed_requests.values(
# Custom fields
"all_alternative_domains",
'all_other_contacts',
'all_current_websites',
'additional_details',
'requested_domain_name',
"all_other_contacts",
"all_current_websites",
"additional_details",
"requested_domain_name",
"federal_agency_name",
"human_readable_election_board",
# Creator
'creator__first_name',
'creator__last_name',
'creator__email',
"creator__first_name",
"creator__last_name",
"creator__email",
"creator_approved_domains_count",
"creator_active_requests_count",
# AO
'authorizing_official__first_name',
'authorizing_official__last_name',
'authorizing_official__email',
'authorizing_official__title',
"authorizing_official__first_name",
"authorizing_official__last_name",
"authorizing_official__email",
"authorizing_official__title",
# Investigator
"investigator_email",
# Existing fields
"id",
'submission_date',
'status',
'federal_type',
'organization_name',
'is_election_board',
'city',
'state_territory',
'purpose',
'cisa_representative_email',
'investigator',
"submission_date",
"status",
"federal_type",
"organization_name",
"is_election_board",
"city",
"state_territory",
"purpose",
"cisa_representative_email",
"investigator",
"generic_org_type",
"federal_type",
)
@ -989,12 +969,12 @@ class DomainRequestExport:
if org_type:
readable_org_type = DomainRequest.OrganizationChoices.get_org_label(org_type)
request["domain_type"] = readable_org_type
# Handle the federal_type field. Defaults to the wrong format.
federal_type = request.get("federal_type")
if federal_type:
request["human_readable_federal_type"] = DomainRequest.BranchChoices.get_branch_label(federal_type)
# Handle the status field. Defaults to the wrong format.
status = request.get("status")
if status:
@ -1013,30 +993,26 @@ class DomainRequestExport:
When ran, returns requested_domain.name if not null. Otherwise, returns default_message.
Equivalent to:
Equivalent to:
`requested_domain.name If requested_domain.name is not None else default_message`
"""
requested_domain_name_query = Case(
When(
requested_domain__isnull=False,
then=F('requested_domain__name')
),
When(requested_domain__isnull=False, then=F("requested_domain__name")),
default=Value(default_message),
output_field=CharField(),
)
return requested_domain_name_query
@staticmethod
def get_additional_details_query(default_message=None):
def get_additional_details_query(default_message=None, delimiter=" | "):
"""
A SQL case statement for DomainRequest.anything_else and DomainRequest.cisa_representative_email.
When ran, returns a concatenation of anything_else and cisa_representative_email - seperated by " | ".
Equivalent to:
Equivalent to:
`f"{cisa_rep} | {anything_else}" If anything_else or cisa_rep else None`
"""
@ -1048,14 +1024,72 @@ class DomainRequestExport:
# TODO - touch up on this
When(
cisa_representative_email__isnull=False,
then=Concat(F('cisa_representative_email'), Value(' | '), F('anything_else'))
then=Concat(F("cisa_representative_email"), Value(delimiter), F("anything_else")),
),
When(
anything_else__isnull=False,
then=Concat(F('cisa_representative_email'), Value(' | '), F('anything_else'))
then=Concat(F("cisa_representative_email"), Value(delimiter), F("anything_else")),
),
default=Value(default_message),
output_field=CharField(),
)
return additional_details_query
@staticmethod
def get_all_other_contacts_query(delimiter=" | "):
""" """
all_other_contacts_query = StringAgg(
Concat(
"other_contacts__first_name",
Value(" "),
"other_contacts__last_name",
Value(" "),
"other_contacts__email",
),
delimiter=delimiter,
distinct=True,
)
return all_other_contacts_query
@staticmethod
def get_creator_approved_domains_count_query():
""" """
query = Count(
"creator__domain_requests_created__id",
filter=Q(creator__domain_requests_created__status=DomainRequest.DomainRequestStatus.APPROVED),
distinct=True,
)
return query
@staticmethod
def get_creator_active_requests_count_query():
""" """
query = Count(
"creator__domain_requests_created__id",
filter=Q(
creator__domain_requests_created__status__in=[
DomainRequest.DomainRequestStatus.SUBMITTED,
DomainRequest.DomainRequestStatus.IN_REVIEW,
DomainRequest.DomainRequestStatus.ACTION_NEEDED,
]
),
distinct=True,
)
return query
@staticmethod
def get_human_readable_election_board_query():
""" """
query = Case(
When(is_election_board=True, then=Value("Yes")),
When(is_election_board=False, then=Value("No")),
default=Value("N/A"),
output_field=CharField(),
)
return query