Basic setup

This commit is contained in:
zandercymatics 2024-06-05 12:53:40 -06:00
parent eb706b3e83
commit 6db7f138f4
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
4 changed files with 348 additions and 103 deletions

View file

@ -18,6 +18,7 @@ from registrar.views.admin_views import (
ExportDataType,
ExportDataUnmanagedDomains,
AnalyticsView,
ExportDomainRequestDataFull,
)
from registrar.views.domain_request import Step
@ -66,6 +67,11 @@ urlpatterns = [
ExportDataType.as_view(),
name="export_data_type",
),
path(
"admin/analytics/export_data_domain_requests_full/",
ExportDomainRequestDataFull.as_view(),
name="export_data_domain_requests_full",
),
path(
"admin/analytics/export_data_full/",
ExportDataFull.as_view(),

View file

@ -49,6 +49,14 @@
</svg><span class="margin-left-05">Current federal</span>
</a>
</li>
{# TODO: move this and rename it #}
<li class="usa-button-group__item">
<a href="{% url 'export_data_domain_requests_full' %}" class="button" role="button">
<svg class="usa-icon" aria-hidden="true" focusable="false" role="img" width="24" height="24">
<use xlink:href="{%static 'img/sprite.svg'%}#file_download"></use>
</svg><span class="margin-left-05">Current domain request full</span>
</a>
</li>
</ul>
</div>
</div>

View file

@ -1,14 +1,17 @@
import csv
import logging
from datetime import datetime
from django.db.models import QuerySet
from registrar.models.domain import Domain
from registrar.models.domain_invitation import DomainInvitation
from registrar.models.domain_request import DomainRequest
from registrar.models.domain_information import DomainInformation
from django.db.models import Value, Case, When, CharField, Count, Q
from django.utils import timezone
from django.core.paginator import Paginator
from django.db.models import F, Value, CharField
from django.db.models.functions import Concat, Coalesce
from django.contrib.postgres.aggregates import StringAgg
from registrar.models.public_contact import PublicContact
from registrar.models.user_domain_role import UserDomainRole
@ -298,85 +301,6 @@ def write_csv_for_domains(
write_header(writer, columns)
writer.writerows(total_body_rows)
def get_requests(filter_condition, sort_fields):
"""
Returns DomainRequest objects filtered and sorted based on the provided conditions.
filter_condition -> A dictionary of conditions to filter the objects.
sort_fields -> A list of fields to sort the resulting query set.
returns: A queryset of DomainRequest objects
"""
requests = DomainRequest.objects.filter(**filter_condition).order_by(*sort_fields).distinct()
return requests
def parse_row_for_requests(columns, request: DomainRequest):
"""Given a set of columns, generate a new row from cleaned column data"""
requested_domain_name = "No requested domain"
if request.requested_domain is not None:
requested_domain_name = request.requested_domain.name
if request.federal_type:
request_type = f"{request.get_organization_type_display()} - {request.get_federal_type_display()}"
else:
request_type = request.get_organization_type_display()
# create a dictionary of fields which can be included in output
FIELDS = {
"Requested domain": requested_domain_name,
"Status": request.get_status_display(),
"Organization type": request_type,
"Agency": request.federal_agency,
"Organization name": request.organization_name,
"City": request.city,
"State": request.state_territory,
"AO email": request.authorizing_official.email if request.authorizing_official else " ",
"Security contact email": request,
"Created at": request.created_at,
"Submission date": request.submission_date,
}
row = [FIELDS.get(column, "") for column in columns]
return row
def write_csv_for_requests(
writer,
columns,
sort_fields,
filter_condition,
should_write_header=True,
):
"""Receives params from the parent methods and outputs a CSV with filtered and sorted requests.
Works with write_header as long as the same writer object is passed."""
all_requests = get_requests(filter_condition, sort_fields)
# Reduce the memory overhead when performing the write operation
paginator = Paginator(all_requests, 1000)
total_body_rows = []
for page_num in paginator.page_range:
page = paginator.page(page_num)
rows = []
for request in page.object_list:
try:
row = parse_row_for_requests(columns, request)
rows.append(row)
except ValueError:
# This should not happen. If it does, just skip this row.
# It indicates that DomainInformation.domain is None.
logger.error("csv_export -> Error when parsing row, domain was None")
continue
total_body_rows.extend(rows)
if should_write_header:
write_header(writer, columns)
writer.writerows(total_body_rows)
def export_data_type_to_csv(csv_file):
"""
All domains report with extra columns.
@ -782,30 +706,328 @@ def export_data_unmanaged_domains_to_csv(csv_file, start_date, end_date):
)
def export_data_requests_growth_to_csv(csv_file, start_date, end_date):
class DomainRequestExport:
"""
Growth report:
Receive start and end dates from the view, parse them.
Request from write_requests_body SUBMITTED requests that are created between
the start and end dates. Specify sort params.
A collection of functions which return csv files regarding the DomainRequest model.
Purely organizational -- all functions are independent.
"""
start_date_formatted = format_start_date(start_date)
end_date_formatted = format_end_date(end_date)
writer = csv.writer(csv_file)
# define columns to include in export
columns = [
"Requested domain",
"Organization type",
"Submission date",
]
sort_fields = [
"requested_domain__name",
]
filter_condition = {
"status": DomainRequest.DomainRequestStatus.SUBMITTED,
"submission_date__lte": end_date_formatted,
"submission_date__gte": start_date_formatted,
}
@staticmethod
def write_csv_for_requests(
writer,
columns,
requests,
should_write_header=True,
):
"""Receives params from the parent methods and outputs a CSV with filtered and sorted requests.
Works with write_header as long as the same writer object is passed."""
write_csv_for_requests(writer, columns, sort_fields, filter_condition, should_write_header=True)
# Reduce the memory overhead when performing the write operation
paginator = Paginator(requests, 1000)
total_body_rows = []
for page_num in paginator.page_range:
page = paginator.page(page_num)
rows = []
for request in page.object_list:
try:
row = parse_row_for_requests(columns, request)
rows.append(row)
except ValueError as err:
logger.error(f"csv_export -> Error when parsing row: {err}")
continue
total_body_rows.extend(rows)
if should_write_header:
write_header(writer, columns)
writer.writerows(total_body_rows)
@staticmethod
def parse_row_for_requests(columns, request):
"""Given a set of columns, generate a new row from cleaned column data"""
# create a dictionary of fields which can be included in output
FIELDS = {
"Domain request": request.get('requested_domain_name'),
"Submitted at": request.get('submission_date'), # TODO - different format?
"Status": request.get('status_display'),
"Domain type": request.get('request_type'), # todo - revisit this, redundant fields
"Federal type": request.get('federal_type'),
"Federal agency": request.get('federal_agency'),
"Organization name": request.get('organization_name'),
"Election office": request.get('is_election_board'),
"City": request.get('city'),
"State/territory": request.get('state_territory'),
"Region": None, # TODO - what is this field?
# Creator
"Creator first name": request.get('creator__first_name', ""),
"Creator last name": request.get('creator__last_name', ""),
"Creator email": request.get('creator__email', ""),
"Creator approved domains count": request.get("creator_approved_domains_count", 0),
"Creator active requests count": request.get("creator_active_requests_count", 0),
# End of creator
"Alternative domains": request.get('all_alternative_domains'),
# AO
"AO first name": request.get('authorizing_official__first_name', ""),
"AO last name": request.get('authorizing_official__last_name', ""),
"AO email": request.get('authorizing_official__email', ""),
"AO title/role": request.get('authorizing_official__title', ""),
# End pf AO
"Request purpose": request.get('purpose'),
"Request additional details": request.get('additional_details'),
"Other contacts": request.get('all_other_contacts'),
"CISA regional representative": request.get('cisa_representative_email'), # TODO - same problem as before
"Current websites": request.get('all_current_websites'),
"Investigator": request.get('investigator'),
}
row = [FIELDS.get(column, "") for column in columns]
return row
@staticmethod
def export_data_requests_growth_to_csv(csv_file, start_date, end_date):
"""
Growth report:
Receive start and end dates from the view, parse them.
Request from write_requests_body SUBMITTED requests that are created between
the start and end dates. Specify sort params.
"""
start_date_formatted = format_start_date(start_date)
end_date_formatted = format_end_date(end_date)
writer = csv.writer(csv_file)
# define columns to include in export
columns = [
"Domain request",
"Domain type",
"Submitted at",
]
sort_fields = [
"requested_domain__name",
]
filter_condition = {
"status": DomainRequest.DomainRequestStatus.SUBMITTED,
"submission_date__lte": end_date_formatted,
"submission_date__gte": start_date_formatted,
}
all_requests = DomainRequest.objects.filter(**filter_condition).order_by(*sort_fields).distinct()
# Convert the request to a querystring for faster processing. Only grab what we need.
parsed_requests = all_requests.annotate(
requested_domain_name=DomainRequestExport.get_requested_domain_name_query(),
approved_domain_name=F('approved_domain__name'),
request_type=DomainRequestExport.get_request_type_query(),
).values(
"requested_domain_name",
"request_type",
"submission_date"
)
DomainRequestExport.write_csv_for_requests(writer, columns, parsed_requests, should_write_header=True)
@staticmethod
def export_full_domain_request_report(csv_file):
writer = csv.writer(csv_file)
columns = [
"Domain request",
"Submitted at",
"Status",
"Domain type",
"Federal type",
"Federal agency",
"Organization name",
"Election office",
"City",
"State/territory",
"Region",
"Creator first name",
"Creator last name",
"Creator email",
"Creator approved domains count",
"Creator active requests count",
"Alternative domains",
"AO first name",
"AO last name",
"AO email",
"AO title/role",
"Request purpose",
"Request additional details",
"Other contacts",
"CISA regional representative",
"Current websites",
"Investigator",
]
excluded_statuses = [DomainRequest.DomainRequestStatus.STARTED]
order_by = [
"status",
"requested_domain__name",
]
requests = DomainRequest.objects.exclude(status__in=excluded_statuses).order_by(*order_by).distinct()
parsed_requests = DomainRequestExport.annotate_and_prepare_domain_request_data(requests)
DomainRequestExport.write_csv_for_requests(writer, columns, parsed_requests, should_write_header=True)
@staticmethod
def annotate_and_prepare_domain_request_data(requests_to_convert: QuerySet[DomainRequest]) -> QuerySet:
"""
This function is designed to optimize performance by performing data manipulations directly in the database
rather than in Python code, which is especially beneficial for operations on many-to-many fields.
Args:
requests_to_convert (QuerySet[DomainRequest]): A Django QuerySet of DomainRequest objects to be annotated.
Returns:
QuerySet: An annotated queryset that includes both original and annotated fields.
Annotations (python-readable equivalents):
- requested_domain_name: `requested_domain.name If requested_domain.name is not None else default_message`.
- request_type: `f"{organization_type} | {federal_type}" If request.federal_type is not None else organization_type`.
- additional_details: `f"{cisa_rep} | {anything_else}" If anything_else or cisa_rep else None`.
- all_other_contacts: `[f"{c.first_name} {c.last_name} {c.email}" for c in request.other_contacts.all()].join(" | ")`.
- all_current_websites: `# [w.website for w in request.current_websites.all()].join(" | ")`.
- all_alternative_domains: `# [d.website for d in request.alternative_domains.all()].join(" | ")`.
""" # noqa
# As stated, this is equivalent to performing a bunch of if-statement like operations to
# each of these fields inside a for loop. However, we want to avoid that for the sake
# of performance - especially on many-to-many fields (which would require repeated DB calls in the loop).
# By doing these operations in the DB, we save a lot of computation time.
parsed_requests = requests_to_convert.annotate(
requested_domain_name=DomainRequestExport.get_requested_domain_name_query(),
request_type=DomainRequestExport.get_request_type_query(),
additional_details=DomainRequestExport.get_additional_details_query(),
all_other_contacts=StringAgg(
Concat('other_contacts__first_name', Value(' '), 'other_contacts__last_name', Value(' '), 'other_contacts__email'),
delimiter=' | ',
distinct=True
),
all_current_websites=StringAgg('current_websites__website', delimiter=' | ', distinct=True),
all_alternative_domains=StringAgg('alternative_domains__website', delimiter=' | ', distinct=True),
creator_approved_domains_count=Count(
'creator__domainrequest_set',
filter=Q(status=DomainRequest.DomainRequestStatus.APPROVED), distinct=True
),
creator_active_requests_count=Count(
'creator__domainrequest_set',
filter=Q(status__in=[
DomainRequest.DomainRequestStatus.SUBMITTED,
DomainRequest.DomainRequestStatus.IN_REVIEW,
DomainRequest.DomainRequestStatus.ACTION_NEEDED
]),
distinct=True
)
)
requests_dict = parsed_requests.values(
# Custom fields
"all_alternative_domains",
'all_other_contacts',
'all_current_websites',
'additional_details',
'request_type',
'requested_domain_name',
# Creator
'creator__first_name',
'creator__last_name',
'creator__email',
"creator_approved_domains_count",
"creator_active_requests_count",
# AO
'authorizing_official__first_name',
'authorizing_official__last_name',
'authorizing_official__email',
'authorizing_official__title',
# Existing fields
'submission_date',
'status',
'federal_type',
'federal_agency',
'organization_name',
'is_election_board',
'city',
'state_territory',
'purpose',
'cisa_representative_email',
'investigator',
)
return requests_dict
# ============================================================= #
# Helper functions for django ORM queries. #
# We are using these rather than pure python for speed reasons. #
# ============================================================= #
@staticmethod
def get_requested_domain_name_query(default_message="No requested domain"):
"""
A SQL case statement for DomainRequest.requested_domain.name.
When ran, returns requested_domain.name if not null. Otherwise, returns default_message.
Equivalent to:
`requested_domain.name If requested_domain.name is not None else default_message`
"""
requested_domain_name_query = Case(
When(
requested_domain__isnull=False,
then=F('requested_domain__name')
),
default=Value(default_message),
output_field=CharField(),
)
return requested_domain_name_query
@staticmethod
def get_request_type_query():
"""
A SQL case statement for DomainRequest.organization_type and DomainRequest.federal_type.
When ran, returns a concatenation of organization_type and federal_type - seperated by " - ".
If federal_type is null, then we just return the organization_type.
Equivalent to:
`f"{organization_type} | {federal_type}" If request.federal_type is not None else organization_type`
"""
request_type_query = Case(
When(
federal_type__isnull=False,
then=Concat('organization_type', Value(' - '), 'federal_type')
),
default=F('organization_type'),
output_field=CharField(),
)
return request_type_query
@staticmethod
def get_additional_details_query(default_message=None):
"""
A SQL case statement for DomainRequest.anything_else and DomainRequest.cisa_representative_email.
When ran, returns a concatenation of anything_else and cisa_representative_email - seperated by " | ".
Equivalent to:
`f"{cisa_rep} | {anything_else}" If anything_else or cisa_rep else None`
"""
additional_details_query = Case(
# Check if either cisa_representative_email or anything_else is not null
# TODO - touch up on this
When(
cisa_representative_email__isnull=False,
then=Concat(F('cisa_representative_email'), Value(' | '), F('anything_else'))
),
When(
anything_else__isnull=False,
then=Concat(F('cisa_representative_email'), Value(' | '), F('anything_else'))
),
default=Value(default_message),
output_field=CharField(),
)
return additional_details_query

View file

@ -164,6 +164,15 @@ class ExportDataFederal(View):
return response
class ExportDomainRequestDataFull(View):
def get(self, request, *args, **kwargs):
# Smaller export based on 1
response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = 'attachment; filename="current-full-domain-request.csv"'
csv_export.DomainRequestExport.export_full_domain_request_report(response)
return response
class ExportDataDomainsGrowth(View):
def get(self, request, *args, **kwargs):
# Get start_date and end_date from the request's GET parameters
@ -191,7 +200,7 @@ class ExportDataRequestsGrowth(View):
response["Content-Disposition"] = f'attachment; filename="requests-{start_date}-to-{end_date}.csv"'
# For #999: set export_data_domain_growth_to_csv to return the resulting queryset, which we can then use
# in context to display this data in the template.
csv_export.export_data_requests_growth_to_csv(response, start_date, end_date)
csv_export.DomainRequestExport.export_data_requests_growth_to_csv(response, start_date, end_date)
return response