mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-08-05 17:28:31 +02:00
Merge branch 'main' into meoward/2228-assign-to-me
This commit is contained in:
commit
05db1fd178
10 changed files with 540 additions and 124 deletions
|
@ -774,6 +774,15 @@ div.dja__model-description{
|
|||
text-transform: capitalize;
|
||||
}
|
||||
|
||||
.wrapped-button-group {
|
||||
// This button group has too many items
|
||||
flex-wrap: wrap;
|
||||
// Fix a weird spacing issue with USWDS a buttons in DJA
|
||||
a.button {
|
||||
padding: 6px 8px 10px 8px;
|
||||
}
|
||||
}
|
||||
|
||||
.usa-button--dja-link-color {
|
||||
color: var(--link-fg);
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ from registrar.views.admin_views import (
|
|||
ExportDataType,
|
||||
ExportDataUnmanagedDomains,
|
||||
AnalyticsView,
|
||||
ExportDomainRequestDataFull,
|
||||
)
|
||||
|
||||
from registrar.views.domain_request import Step
|
||||
|
@ -66,6 +67,11 @@ urlpatterns = [
|
|||
ExportDataType.as_view(),
|
||||
name="export_data_type",
|
||||
),
|
||||
path(
|
||||
"admin/analytics/export_data_domain_requests_full/",
|
||||
ExportDomainRequestDataFull.as_view(),
|
||||
name="export_data_domain_requests_full",
|
||||
),
|
||||
path(
|
||||
"admin/analytics/export_data_full/",
|
||||
ExportDataFull.as_view(),
|
||||
|
|
|
@ -52,6 +52,11 @@ class DomainRequest(TimeStampedModel):
|
|||
WITHDRAWN = "withdrawn", "Withdrawn"
|
||||
STARTED = "started", "Started"
|
||||
|
||||
@classmethod
|
||||
def get_status_label(cls, status_name: str):
|
||||
"""Returns the associated label for a given status name"""
|
||||
return cls(status_name).label if status_name else None
|
||||
|
||||
class StateTerritoryChoices(models.TextChoices):
|
||||
ALABAMA = "AL", "Alabama (AL)"
|
||||
ALASKA = "AK", "Alaska (AK)"
|
||||
|
@ -133,6 +138,14 @@ class DomainRequest(TimeStampedModel):
|
|||
SPECIAL_DISTRICT = "special_district", "Special district"
|
||||
SCHOOL_DISTRICT = "school_district", "School district"
|
||||
|
||||
@classmethod
|
||||
def get_org_label(cls, org_name: str):
|
||||
"""Returns the associated label for a given org name"""
|
||||
org_names = org_name.split("_election")
|
||||
if len(org_names) > 0:
|
||||
org_name = org_names[0]
|
||||
return cls(org_name).label if org_name else None
|
||||
|
||||
class OrgChoicesElectionOffice(models.TextChoices):
|
||||
"""
|
||||
Primary organization choices for Django admin:
|
||||
|
|
|
@ -298,3 +298,26 @@ def replace_url_queryparams(url_to_modify: str, query_params, convert_list_to_cs
|
|||
new_url = urlunparse(url_parts)
|
||||
|
||||
return new_url
|
||||
|
||||
|
||||
def convert_queryset_to_dict(queryset, is_model=True, key="id"):
|
||||
"""
|
||||
Transforms a queryset into a dictionary keyed by a specified key (like "id").
|
||||
|
||||
Parameters:
|
||||
requests (QuerySet or list of dicts): Input data.
|
||||
is_model (bool): Indicates if each item in 'queryset' are model instances (True) or dictionaries (False).
|
||||
key (str): Key or attribute to use for the resulting dictionary's keys.
|
||||
|
||||
Returns:
|
||||
dict: Dictionary with keys derived from 'key' and values corresponding to items in 'queryset'.
|
||||
"""
|
||||
|
||||
if is_model:
|
||||
request_dict = {getattr(value, key): value for value in queryset}
|
||||
else:
|
||||
# Querysets sometimes contain sets of dictionaries.
|
||||
# Calling .values is an example of this.
|
||||
request_dict = {value[key]: value for value in queryset}
|
||||
|
||||
return request_dict
|
||||
|
|
|
@ -27,28 +27,35 @@
|
|||
<div class="module height-full">
|
||||
<h2>Current domains</h2>
|
||||
<div class="padding-top-2 padding-x-2">
|
||||
<ul class="usa-button-group">
|
||||
<ul class="usa-button-group wrapped-button-group">
|
||||
<li class="usa-button-group__item">
|
||||
<a href="{% url 'export_data_type' %}" class="button" role="button">
|
||||
<a href="{% url 'export_data_type' %}" class="button text-no-wrap" role="button">
|
||||
<svg class="usa-icon" aria-hidden="true" focusable="false" role="img" width="24" height="24">
|
||||
<use xlink:href="{%static 'img/sprite.svg'%}#file_download"></use>
|
||||
</svg><span class="margin-left-05">All domain metadata</span>
|
||||
</a>
|
||||
</li>
|
||||
<li class="usa-button-group__item">
|
||||
<a href="{% url 'export_data_full' %}" class="button" role="button">
|
||||
<a href="{% url 'export_data_full' %}" class="button text-no-wrap" role="button">
|
||||
<svg class="usa-icon" aria-hidden="true" focusable="false" role="img" width="24" height="24">
|
||||
<use xlink:href="{%static 'img/sprite.svg'%}#file_download"></use>
|
||||
</svg><span class="margin-left-05">Current full</span>
|
||||
</a>
|
||||
</li>
|
||||
<li class="usa-button-group__item">
|
||||
<a href="{% url 'export_data_federal' %}" class="button" role="button">
|
||||
<a href="{% url 'export_data_federal' %}" class="button text-no-wrap" role="button">
|
||||
<svg class="usa-icon" aria-hidden="true" focusable="false" role="img" width="24" height="24">
|
||||
<use xlink:href="{%static 'img/sprite.svg'%}#file_download"></use>
|
||||
</svg><span class="margin-left-05">Current federal</span>
|
||||
</a>
|
||||
</li>
|
||||
<li class="usa-button-group__item">
|
||||
<a href="{% url 'export_data_domain_requests_full' %}" class="button text-no-wrap" role="button">
|
||||
<svg class="usa-icon" aria-hidden="true" focusable="false" role="img" width="24" height="24">
|
||||
<use xlink:href="{%static 'img/sprite.svg'%}#file_download"></use>
|
||||
</svg><span class="margin-left-05">All domain requests metadata</span>
|
||||
</a>
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
</div>
|
||||
|
|
|
@ -735,19 +735,53 @@ class MockDb(TestCase):
|
|||
self.domain_request_4 = completed_domain_request(
|
||||
status=DomainRequest.DomainRequestStatus.STARTED,
|
||||
name="city4.gov",
|
||||
is_election_board=True,
|
||||
generic_org_type="city",
|
||||
)
|
||||
self.domain_request_5 = completed_domain_request(
|
||||
status=DomainRequest.DomainRequestStatus.APPROVED,
|
||||
name="city5.gov",
|
||||
)
|
||||
self.domain_request_6 = completed_domain_request(
|
||||
status=DomainRequest.DomainRequestStatus.STARTED,
|
||||
name="city6.gov",
|
||||
)
|
||||
self.domain_request_3.submit()
|
||||
self.domain_request_4.submit()
|
||||
self.domain_request_6.submit()
|
||||
|
||||
other, _ = Contact.objects.get_or_create(
|
||||
first_name="Testy1232",
|
||||
last_name="Tester24",
|
||||
title="Another Tester",
|
||||
email="te2@town.com",
|
||||
phone="(555) 555 5557",
|
||||
)
|
||||
other_2, _ = Contact.objects.get_or_create(
|
||||
first_name="Meow",
|
||||
last_name="Tester24",
|
||||
title="Another Tester",
|
||||
email="te2@town.com",
|
||||
phone="(555) 555 5557",
|
||||
)
|
||||
website, _ = Website.objects.get_or_create(website="igorville.gov")
|
||||
website_2, _ = Website.objects.get_or_create(website="cheeseville.gov")
|
||||
website_3, _ = Website.objects.get_or_create(website="https://www.example.com")
|
||||
website_4, _ = Website.objects.get_or_create(website="https://www.example2.com")
|
||||
|
||||
self.domain_request_3.other_contacts.add(other, other_2)
|
||||
self.domain_request_3.alternative_domains.add(website, website_2)
|
||||
self.domain_request_3.current_websites.add(website_3, website_4)
|
||||
self.domain_request_3.cisa_representative_email = "test@igorville.com"
|
||||
self.domain_request_3.submission_date = get_time_aware_date(datetime(2024, 4, 2))
|
||||
self.domain_request_4.submission_date = get_time_aware_date(datetime(2024, 4, 2))
|
||||
self.domain_request_3.save()
|
||||
|
||||
self.domain_request_4.submission_date = get_time_aware_date(datetime(2024, 4, 2))
|
||||
self.domain_request_4.save()
|
||||
|
||||
self.domain_request_6.submission_date = get_time_aware_date(datetime(2024, 4, 2))
|
||||
self.domain_request_6.save()
|
||||
|
||||
def tearDown(self):
|
||||
super().tearDown()
|
||||
PublicContact.objects.all().delete()
|
||||
|
|
|
@ -4,6 +4,7 @@ from django.test import Client, RequestFactory
|
|||
from io import StringIO
|
||||
from registrar.models.domain_request import DomainRequest
|
||||
from registrar.models.domain import Domain
|
||||
from registrar.models.utility.generic_helper import convert_queryset_to_dict
|
||||
from registrar.utility.csv_export import (
|
||||
export_data_managed_domains_to_csv,
|
||||
export_data_unmanaged_domains_to_csv,
|
||||
|
@ -12,7 +13,7 @@ from registrar.utility.csv_export import (
|
|||
write_csv_for_domains,
|
||||
get_default_start_date,
|
||||
get_default_end_date,
|
||||
write_csv_for_requests,
|
||||
DomainRequestExport,
|
||||
)
|
||||
|
||||
from django.core.management import call_command
|
||||
|
@ -23,6 +24,7 @@ from botocore.exceptions import ClientError
|
|||
import boto3_mocking
|
||||
from registrar.utility.s3_bucket import S3ClientError, S3ClientErrorCodes # type: ignore
|
||||
from django.utils import timezone
|
||||
from api.tests.common import less_console_noise_decorator
|
||||
from .common import MockDb, MockEppLib, less_console_noise, get_time_aware_date
|
||||
|
||||
|
||||
|
@ -667,10 +669,7 @@ class ExportDataTest(MockDb, MockEppLib):
|
|||
# Define columns, sort fields, and filter condition
|
||||
# We'll skip submission date because it's dynamic and therefore
|
||||
# impossible to set in expected_content
|
||||
columns = [
|
||||
"Requested domain",
|
||||
"Organization type",
|
||||
]
|
||||
columns = ["Domain request", "Domain type", "Federal type"]
|
||||
sort_fields = [
|
||||
"requested_domain__name",
|
||||
]
|
||||
|
@ -679,7 +678,12 @@ class ExportDataTest(MockDb, MockEppLib):
|
|||
"submission_date__lte": self.end_date,
|
||||
"submission_date__gte": self.start_date,
|
||||
}
|
||||
write_csv_for_requests(writer, columns, sort_fields, filter_condition, should_write_header=True)
|
||||
|
||||
additional_values = ["requested_domain__name"]
|
||||
all_requests = DomainRequest.objects.filter(**filter_condition).order_by(*sort_fields).distinct()
|
||||
annotated_requests = DomainRequestExport.annotate_and_retrieve_fields(all_requests, {}, additional_values)
|
||||
requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False)
|
||||
DomainRequestExport.write_csv_for_requests(writer, columns, requests_dict)
|
||||
# Reset the CSV file's position to the beginning
|
||||
csv_file.seek(0)
|
||||
# Read the content into a variable
|
||||
|
@ -687,9 +691,10 @@ class ExportDataTest(MockDb, MockEppLib):
|
|||
# We expect READY domains first, created between today-2 and today+2, sorted by created_at then name
|
||||
# and DELETED domains deleted between today-2 and today+2, sorted by deleted then name
|
||||
expected_content = (
|
||||
"Requested domain,Organization type\n"
|
||||
"city3.gov,Federal - Executive\n"
|
||||
"city4.gov,Federal - Executive\n"
|
||||
"Domain request,Domain type,Federal type\n"
|
||||
"city3.gov,Federal,Executive\n"
|
||||
"city4.gov,City,Executive\n"
|
||||
"city6.gov,Federal,Executive\n"
|
||||
)
|
||||
|
||||
# Normalize line endings and remove commas,
|
||||
|
@ -699,6 +704,72 @@ class ExportDataTest(MockDb, MockEppLib):
|
|||
|
||||
self.assertEqual(csv_content, expected_content)
|
||||
|
||||
@less_console_noise_decorator
|
||||
def test_full_domain_request_report(self):
|
||||
"""Tests the full domain request report."""
|
||||
|
||||
# Create a CSV file in memory
|
||||
csv_file = StringIO()
|
||||
writer = csv.writer(csv_file)
|
||||
|
||||
# Call the report. Get existing fields from the report itself.
|
||||
annotations = DomainRequestExport._full_domain_request_annotations()
|
||||
additional_values = [
|
||||
"requested_domain__name",
|
||||
"federal_agency__agency",
|
||||
"authorizing_official__first_name",
|
||||
"authorizing_official__last_name",
|
||||
"authorizing_official__email",
|
||||
"authorizing_official__title",
|
||||
"creator__first_name",
|
||||
"creator__last_name",
|
||||
"creator__email",
|
||||
"investigator__email",
|
||||
]
|
||||
requests = DomainRequest.objects.exclude(status=DomainRequest.DomainRequestStatus.STARTED)
|
||||
annotated_requests = DomainRequestExport.annotate_and_retrieve_fields(requests, annotations, additional_values)
|
||||
requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False)
|
||||
DomainRequestExport.write_csv_for_requests(writer, DomainRequestExport.all_columns, requests_dict)
|
||||
|
||||
# Reset the CSV file's position to the beginning
|
||||
csv_file.seek(0)
|
||||
# Read the content into a variable
|
||||
csv_content = csv_file.read()
|
||||
print(csv_content)
|
||||
self.maxDiff = None
|
||||
expected_content = (
|
||||
# Header
|
||||
"Domain request,Submitted at,Status,Domain type,Federal type,"
|
||||
"Federal agency,Organization name,Election office,City,State/territory,"
|
||||
"Region,Creator first name,Creator last name,Creator email,Creator approved domains count,"
|
||||
"Creator active requests count,Alternative domains,AO first name,AO last name,AO email,"
|
||||
"AO title/role,Request purpose,Request additional details,Other contacts,"
|
||||
"CISA regional representative,Current websites,Investigator\n"
|
||||
# Content
|
||||
"city2.gov,,In review,Federal,Executive,,Testorg,N/A,,NY,2,,,,0,1,city1.gov,Testy,Tester,testy@town.com,"
|
||||
"Chief Tester,Purpose of the site,There is more,Testy Tester testy2@town.com,,city.com,\n"
|
||||
"city3.gov,2024-04-02,Submitted,Federal,Executive,,Testorg,N/A,,NY,2,,,,0,1,"
|
||||
"cheeseville.gov | city1.gov | igorville.gov,Testy,Tester,testy@town.com,Chief Tester,"
|
||||
"Purpose of the site,CISA-first-name CISA-last-name | There is more,Meow Tester24 te2@town.com | "
|
||||
"Testy1232 Tester24 te2@town.com | Testy Tester testy2@town.com,test@igorville.com,"
|
||||
"city.com | https://www.example2.com | https://www.example.com,\n"
|
||||
"city4.gov,2024-04-02,Submitted,City,Executive,,Testorg,Yes,,NY,2,,,,0,1,city1.gov,Testy,Tester,"
|
||||
"testy@town.com,Chief Tester,Purpose of the site,CISA-first-name CISA-last-name | There is more,"
|
||||
"Testy Tester testy2@town.com,cisaRep@igorville.gov,city.com,\n"
|
||||
"city5.gov,,Approved,Federal,Executive,,Testorg,N/A,,NY,2,,,,1,0,city1.gov,Testy,Tester,testy@town.com,"
|
||||
"Chief Tester,Purpose of the site,There is more,Testy Tester testy2@town.com,,city.com,\n"
|
||||
"city6.gov,2024-04-02,Submitted,Federal,Executive,,Testorg,N/A,,NY,2,,,,0,1,city1.gov,Testy,Tester,"
|
||||
"testy@town.com,Chief Tester,Purpose of the site,CISA-first-name CISA-last-name | There is more,"
|
||||
"Testy Tester testy2@town.com,cisaRep@igorville.gov,city.com,"
|
||||
)
|
||||
|
||||
# Normalize line endings and remove commas,
|
||||
# spaces and leading/trailing whitespace
|
||||
csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip()
|
||||
expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip()
|
||||
|
||||
self.assertEqual(csv_content, expected_content)
|
||||
|
||||
|
||||
class HelperFunctions(MockDb):
|
||||
"""This asserts that 1=1. Its limited usefulness lies in making sure the helper methods stay healthy."""
|
||||
|
@ -741,5 +812,5 @@ class HelperFunctions(MockDb):
|
|||
"submission_date__lte": self.end_date,
|
||||
}
|
||||
submitted_requests_sliced_at_end_date = get_sliced_requests(filter_condition)
|
||||
expected_content = [2, 2, 0, 0, 0, 0, 0, 0, 0, 0]
|
||||
expected_content = [3, 2, 0, 0, 0, 0, 1, 0, 0, 1]
|
||||
self.assertEqual(submitted_requests_sliced_at_end_date, expected_content)
|
||||
|
|
|
@ -5,3 +5,8 @@ class BranchChoices(models.TextChoices):
|
|||
EXECUTIVE = "executive", "Executive"
|
||||
JUDICIAL = "judicial", "Judicial"
|
||||
LEGISLATIVE = "legislative", "Legislative"
|
||||
|
||||
@classmethod
|
||||
def get_branch_label(cls, branch_name: str):
|
||||
"""Returns the associated label for a given org name"""
|
||||
return cls(branch_name).label if branch_name else None
|
||||
|
|
|
@ -1,18 +1,25 @@
|
|||
import csv
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from registrar.models.domain import Domain
|
||||
from registrar.models.domain_invitation import DomainInvitation
|
||||
from registrar.models.domain_request import DomainRequest
|
||||
from registrar.models.domain_information import DomainInformation
|
||||
from registrar.models import (
|
||||
Domain,
|
||||
DomainInvitation,
|
||||
DomainRequest,
|
||||
DomainInformation,
|
||||
PublicContact,
|
||||
UserDomainRole,
|
||||
)
|
||||
from django.db.models import QuerySet, Value, CharField, Count, Q, F
|
||||
from django.db.models import ManyToManyField
|
||||
from django.utils import timezone
|
||||
from django.core.paginator import Paginator
|
||||
from django.db.models import F, Value, CharField
|
||||
from django.db.models.functions import Concat, Coalesce
|
||||
|
||||
from registrar.models.public_contact import PublicContact
|
||||
from registrar.models.user_domain_role import UserDomainRole
|
||||
from django.contrib.postgres.aggregates import StringAgg
|
||||
from registrar.models.utility.generic_helper import convert_queryset_to_dict
|
||||
from registrar.templatetags.custom_filters import get_region
|
||||
from registrar.utility.enums import DefaultEmail
|
||||
from registrar.utility.constants import BranchChoices
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -299,84 +306,6 @@ def write_csv_for_domains(
|
|||
writer.writerows(total_body_rows)
|
||||
|
||||
|
||||
def get_requests(filter_condition, sort_fields):
|
||||
"""
|
||||
Returns DomainRequest objects filtered and sorted based on the provided conditions.
|
||||
filter_condition -> A dictionary of conditions to filter the objects.
|
||||
sort_fields -> A list of fields to sort the resulting query set.
|
||||
returns: A queryset of DomainRequest objects
|
||||
"""
|
||||
requests = DomainRequest.objects.filter(**filter_condition).order_by(*sort_fields).distinct()
|
||||
return requests
|
||||
|
||||
|
||||
def parse_row_for_requests(columns, request: DomainRequest):
|
||||
"""Given a set of columns, generate a new row from cleaned column data"""
|
||||
|
||||
requested_domain_name = "No requested domain"
|
||||
|
||||
if request.requested_domain is not None:
|
||||
requested_domain_name = request.requested_domain.name
|
||||
|
||||
if request.federal_type:
|
||||
request_type = f"{request.get_organization_type_display()} - {request.get_federal_type_display()}"
|
||||
else:
|
||||
request_type = request.get_organization_type_display()
|
||||
|
||||
# create a dictionary of fields which can be included in output
|
||||
FIELDS = {
|
||||
"Requested domain": requested_domain_name,
|
||||
"Status": request.get_status_display(),
|
||||
"Organization type": request_type,
|
||||
"Agency": request.federal_agency,
|
||||
"Organization name": request.organization_name,
|
||||
"City": request.city,
|
||||
"State": request.state_territory,
|
||||
"AO email": request.authorizing_official.email if request.authorizing_official else " ",
|
||||
"Security contact email": request,
|
||||
"Created at": request.created_at,
|
||||
"Submission date": request.submission_date,
|
||||
}
|
||||
|
||||
row = [FIELDS.get(column, "") for column in columns]
|
||||
return row
|
||||
|
||||
|
||||
def write_csv_for_requests(
|
||||
writer,
|
||||
columns,
|
||||
sort_fields,
|
||||
filter_condition,
|
||||
should_write_header=True,
|
||||
):
|
||||
"""Receives params from the parent methods and outputs a CSV with filtered and sorted requests.
|
||||
Works with write_header as long as the same writer object is passed."""
|
||||
|
||||
all_requests = get_requests(filter_condition, sort_fields)
|
||||
|
||||
# Reduce the memory overhead when performing the write operation
|
||||
paginator = Paginator(all_requests, 1000)
|
||||
total_body_rows = []
|
||||
|
||||
for page_num in paginator.page_range:
|
||||
page = paginator.page(page_num)
|
||||
rows = []
|
||||
for request in page.object_list:
|
||||
try:
|
||||
row = parse_row_for_requests(columns, request)
|
||||
rows.append(row)
|
||||
except ValueError:
|
||||
# This should not happen. If it does, just skip this row.
|
||||
# It indicates that DomainInformation.domain is None.
|
||||
logger.error("csv_export -> Error when parsing row, domain was None")
|
||||
continue
|
||||
total_body_rows.extend(rows)
|
||||
|
||||
if should_write_header:
|
||||
write_header(writer, columns)
|
||||
writer.writerows(total_body_rows)
|
||||
|
||||
|
||||
def export_data_type_to_csv(csv_file):
|
||||
"""
|
||||
All domains report with extra columns.
|
||||
|
@ -775,30 +704,338 @@ def export_data_unmanaged_domains_to_csv(csv_file, start_date, end_date):
|
|||
)
|
||||
|
||||
|
||||
def export_data_requests_growth_to_csv(csv_file, start_date, end_date):
|
||||
class DomainRequestExport:
|
||||
"""
|
||||
Growth report:
|
||||
Receive start and end dates from the view, parse them.
|
||||
Request from write_requests_body SUBMITTED requests that are created between
|
||||
the start and end dates. Specify sort params.
|
||||
A collection of functions which return csv files regarding the DomainRequest model.
|
||||
"""
|
||||
|
||||
start_date_formatted = format_start_date(start_date)
|
||||
end_date_formatted = format_end_date(end_date)
|
||||
writer = csv.writer(csv_file)
|
||||
# define columns to include in export
|
||||
columns = [
|
||||
"Requested domain",
|
||||
"Organization type",
|
||||
"Submission date",
|
||||
# Get all columns on the full metadata report
|
||||
all_columns = [
|
||||
"Domain request",
|
||||
"Submitted at",
|
||||
"Status",
|
||||
"Domain type",
|
||||
"Federal type",
|
||||
"Federal agency",
|
||||
"Organization name",
|
||||
"Election office",
|
||||
"City",
|
||||
"State/territory",
|
||||
"Region",
|
||||
"Creator first name",
|
||||
"Creator last name",
|
||||
"Creator email",
|
||||
"Creator approved domains count",
|
||||
"Creator active requests count",
|
||||
"Alternative domains",
|
||||
"AO first name",
|
||||
"AO last name",
|
||||
"AO email",
|
||||
"AO title/role",
|
||||
"Request purpose",
|
||||
"Request additional details",
|
||||
"Other contacts",
|
||||
"CISA regional representative",
|
||||
"Current websites",
|
||||
"Investigator",
|
||||
]
|
||||
sort_fields = [
|
||||
"requested_domain__name",
|
||||
]
|
||||
filter_condition = {
|
||||
"status": DomainRequest.DomainRequestStatus.SUBMITTED,
|
||||
"submission_date__lte": end_date_formatted,
|
||||
"submission_date__gte": start_date_formatted,
|
||||
}
|
||||
|
||||
write_csv_for_requests(writer, columns, sort_fields, filter_condition, should_write_header=True)
|
||||
@classmethod
|
||||
def export_data_requests_growth_to_csv(cls, csv_file, start_date, end_date):
|
||||
"""
|
||||
Growth report:
|
||||
Receive start and end dates from the view, parse them.
|
||||
Request from write_requests_body SUBMITTED requests that are created between
|
||||
the start and end dates. Specify sort params.
|
||||
"""
|
||||
|
||||
start_date_formatted = format_start_date(start_date)
|
||||
end_date_formatted = format_end_date(end_date)
|
||||
writer = csv.writer(csv_file)
|
||||
# define columns to include in export
|
||||
columns = [
|
||||
"Domain request",
|
||||
"Domain type",
|
||||
"Federal type",
|
||||
"Submitted at",
|
||||
]
|
||||
|
||||
sort_fields = [
|
||||
"requested_domain__name",
|
||||
]
|
||||
filter_condition = {
|
||||
"status": DomainRequest.DomainRequestStatus.SUBMITTED,
|
||||
"submission_date__lte": end_date_formatted,
|
||||
"submission_date__gte": start_date_formatted,
|
||||
}
|
||||
|
||||
# We don't want to annotate anything, but we do want to access the requested domain name
|
||||
annotations = {}
|
||||
additional_values = ["requested_domain__name"]
|
||||
|
||||
all_requests = DomainRequest.objects.filter(**filter_condition).order_by(*sort_fields).distinct()
|
||||
|
||||
annotated_requests = cls.annotate_and_retrieve_fields(all_requests, annotations, additional_values)
|
||||
requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False)
|
||||
|
||||
cls.write_csv_for_requests(writer, columns, requests_dict)
|
||||
|
||||
@classmethod
|
||||
def export_full_domain_request_report(cls, csv_file):
|
||||
"""
|
||||
Generates a detailed domain request report to a CSV file.
|
||||
|
||||
Retrieves and annotates DomainRequest objects, excluding 'STARTED' status,
|
||||
with related data optimizations via select/prefetch and annotation.
|
||||
|
||||
Annotated with counts and aggregates of related entities.
|
||||
Converts to dict and writes to CSV using predefined columns.
|
||||
|
||||
Parameters:
|
||||
csv_file (file-like object): Target CSV file.
|
||||
"""
|
||||
writer = csv.writer(csv_file)
|
||||
|
||||
requests = (
|
||||
DomainRequest.objects.select_related(
|
||||
"creator", "authorizing_official", "federal_agency", "investigator", "requested_domain"
|
||||
)
|
||||
.prefetch_related("current_websites", "other_contacts", "alternative_domains")
|
||||
.exclude(status__in=[DomainRequest.DomainRequestStatus.STARTED])
|
||||
.order_by(
|
||||
"status",
|
||||
"requested_domain__name",
|
||||
)
|
||||
.distinct()
|
||||
)
|
||||
|
||||
# Annotations are custom columns returned to the queryset (AKA: computed in the DB).
|
||||
annotations = cls._full_domain_request_annotations()
|
||||
|
||||
# The .values returned from annotate_and_retrieve_fields can't go two levels deep
|
||||
# (just returns the field id of say, "creator") - so we have to include this.
|
||||
additional_values = [
|
||||
"requested_domain__name",
|
||||
"federal_agency__agency",
|
||||
"authorizing_official__first_name",
|
||||
"authorizing_official__last_name",
|
||||
"authorizing_official__email",
|
||||
"authorizing_official__title",
|
||||
"creator__first_name",
|
||||
"creator__last_name",
|
||||
"creator__email",
|
||||
"investigator__email",
|
||||
]
|
||||
|
||||
# Convert the domain request queryset to a dictionary (including annotated fields)
|
||||
annotated_requests = cls.annotate_and_retrieve_fields(requests, annotations, additional_values)
|
||||
requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False)
|
||||
|
||||
# Write the csv file
|
||||
cls.write_csv_for_requests(writer, cls.all_columns, requests_dict)
|
||||
|
||||
@classmethod
|
||||
def _full_domain_request_annotations(cls, delimiter=" | "):
|
||||
"""Returns the annotations for the full domain request report"""
|
||||
return {
|
||||
"creator_approved_domains_count": DomainRequestExport.get_creator_approved_domains_count_query(),
|
||||
"creator_active_requests_count": DomainRequestExport.get_creator_active_requests_count_query(),
|
||||
"all_current_websites": StringAgg("current_websites__website", delimiter=delimiter, distinct=True),
|
||||
"all_alternative_domains": StringAgg("alternative_domains__website", delimiter=delimiter, distinct=True),
|
||||
# Coerce the other contacts object to "{first_name} {last_name} {email}"
|
||||
"all_other_contacts": StringAgg(
|
||||
Concat(
|
||||
"other_contacts__first_name",
|
||||
Value(" "),
|
||||
"other_contacts__last_name",
|
||||
Value(" "),
|
||||
"other_contacts__email",
|
||||
),
|
||||
delimiter=delimiter,
|
||||
distinct=True,
|
||||
),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def write_csv_for_requests(
|
||||
writer,
|
||||
columns,
|
||||
requests_dict,
|
||||
should_write_header=True,
|
||||
):
|
||||
"""Receives params from the parent methods and outputs a CSV with filtered and sorted requests.
|
||||
Works with write_header as long as the same writer object is passed."""
|
||||
|
||||
rows = []
|
||||
for request in requests_dict.values():
|
||||
try:
|
||||
row = DomainRequestExport.parse_row_for_requests(columns, request)
|
||||
rows.append(row)
|
||||
except ValueError as err:
|
||||
logger.error(f"csv_export -> Error when parsing row: {err}")
|
||||
continue
|
||||
|
||||
if should_write_header:
|
||||
write_header(writer, columns)
|
||||
|
||||
writer.writerows(rows)
|
||||
|
||||
@staticmethod
|
||||
def parse_row_for_requests(columns, request):
|
||||
"""
|
||||
Given a set of columns and a request dictionary, generate a new row from cleaned column data.
|
||||
"""
|
||||
|
||||
# Handle the federal_type field. Defaults to the wrong format.
|
||||
federal_type = request.get("federal_type")
|
||||
human_readable_federal_type = BranchChoices.get_branch_label(federal_type) if federal_type else None
|
||||
|
||||
# Handle the org_type field
|
||||
org_type = request.get("generic_org_type") or request.get("organization_type")
|
||||
human_readable_org_type = DomainRequest.OrganizationChoices.get_org_label(org_type) if org_type else None
|
||||
|
||||
# Handle the status field. Defaults to the wrong format.
|
||||
status = request.get("status")
|
||||
status_display = DomainRequest.DomainRequestStatus.get_status_label(status) if status else None
|
||||
|
||||
# Handle the region field.
|
||||
state_territory = request.get("state_territory")
|
||||
region = get_region(state_territory) if state_territory else None
|
||||
|
||||
# Handle the requested_domain field (add a default if None)
|
||||
requested_domain = request.get("requested_domain__name")
|
||||
requested_domain_name = requested_domain if requested_domain else "No requested domain"
|
||||
|
||||
# Handle the election field. N/A if None, "Yes"/"No" if boolean
|
||||
human_readable_election_board = "N/A"
|
||||
is_election_board = request.get("is_election_board")
|
||||
if is_election_board is not None:
|
||||
human_readable_election_board = "Yes" if is_election_board else "No"
|
||||
|
||||
# Handle the additional details field. Pipe seperated.
|
||||
cisa_rep_first = request.get("cisa_representative_first_name")
|
||||
cisa_rep_last = request.get("cisa_representative_last_name")
|
||||
name = [n for n in [cisa_rep_first, cisa_rep_last] if n]
|
||||
|
||||
cisa_rep = " ".join(name) if name else None
|
||||
details = [cisa_rep, request.get("anything_else")]
|
||||
additional_details = " | ".join([field for field in details if field])
|
||||
|
||||
# create a dictionary of fields which can be included in output.
|
||||
# "extra_fields" are precomputed fields (generated in the DB or parsed).
|
||||
FIELDS = {
|
||||
# Parsed fields - defined above.
|
||||
"Domain request": requested_domain_name,
|
||||
"Region": region,
|
||||
"Status": status_display,
|
||||
"Election office": human_readable_election_board,
|
||||
"Federal type": human_readable_federal_type,
|
||||
"Domain type": human_readable_org_type,
|
||||
"Request additional details": additional_details,
|
||||
# Annotated fields - passed into the request dict.
|
||||
"Creator approved domains count": request.get("creator_approved_domains_count", 0),
|
||||
"Creator active requests count": request.get("creator_active_requests_count", 0),
|
||||
"Alternative domains": request.get("all_alternative_domains"),
|
||||
"Other contacts": request.get("all_other_contacts"),
|
||||
"Current websites": request.get("all_current_websites"),
|
||||
# Untouched FK fields - passed into the request dict.
|
||||
"Federal agency": request.get("federal_agency__agency"),
|
||||
"AO first name": request.get("authorizing_official__first_name"),
|
||||
"AO last name": request.get("authorizing_official__last_name"),
|
||||
"AO email": request.get("authorizing_official__email"),
|
||||
"AO title/role": request.get("authorizing_official__title"),
|
||||
"Creator first name": request.get("creator__first_name"),
|
||||
"Creator last name": request.get("creator__last_name"),
|
||||
"Creator email": request.get("creator__email"),
|
||||
"Investigator": request.get("investigator__email"),
|
||||
# Untouched fields
|
||||
"Organization name": request.get("organization_name"),
|
||||
"City": request.get("city"),
|
||||
"State/territory": request.get("state_territory"),
|
||||
"Request purpose": request.get("purpose"),
|
||||
"CISA regional representative": request.get("cisa_representative_email"),
|
||||
"Submitted at": request.get("submission_date"),
|
||||
}
|
||||
|
||||
row = [FIELDS.get(column, "") for column in columns]
|
||||
return row
|
||||
|
||||
@classmethod
|
||||
def annotate_and_retrieve_fields(
|
||||
cls, requests, annotations, additional_values=None, include_many_to_many=False
|
||||
) -> QuerySet:
|
||||
"""
|
||||
Applies annotations to a queryset and retrieves specified fields,
|
||||
including class-defined and annotation-defined.
|
||||
|
||||
Parameters:
|
||||
requests (QuerySet): Initial queryset.
|
||||
annotations (dict, optional): Fields to compute {field_name: expression}.
|
||||
additional_values (list, optional): Extra fields to retrieve; defaults to annotation keys if None.
|
||||
include_many_to_many (bool, optional): Determines if we should include many to many fields or not
|
||||
|
||||
Returns:
|
||||
QuerySet: Contains dictionaries with the specified fields for each record.
|
||||
"""
|
||||
|
||||
if additional_values is None:
|
||||
additional_values = []
|
||||
|
||||
# We can infer that if we're passing in annotations,
|
||||
# we want to grab the result of said annotation.
|
||||
if annotations:
|
||||
additional_values.extend(annotations.keys())
|
||||
|
||||
# Get prexisting fields on DomainRequest
|
||||
domain_request_fields = set()
|
||||
for field in DomainRequest._meta.get_fields():
|
||||
# Exclude many to many fields unless we specify
|
||||
many_to_many = isinstance(field, ManyToManyField) and include_many_to_many
|
||||
if many_to_many or not isinstance(field, ManyToManyField):
|
||||
domain_request_fields.add(field.name)
|
||||
|
||||
queryset = requests.annotate(**annotations).values(*domain_request_fields, *additional_values)
|
||||
return queryset
|
||||
|
||||
# ============================================================= #
|
||||
# Helper functions for django ORM queries. #
|
||||
# We are using these rather than pure python for speed reasons. #
|
||||
# ============================================================= #
|
||||
|
||||
@staticmethod
|
||||
def get_creator_approved_domains_count_query():
|
||||
"""
|
||||
Generates a Count query for distinct approved domain requests per creator.
|
||||
|
||||
Returns:
|
||||
Count: Aggregates distinct 'APPROVED' domain requests by creator.
|
||||
"""
|
||||
|
||||
query = Count(
|
||||
"creator__domain_requests_created__id",
|
||||
filter=Q(creator__domain_requests_created__status=DomainRequest.DomainRequestStatus.APPROVED),
|
||||
distinct=True,
|
||||
)
|
||||
return query
|
||||
|
||||
@staticmethod
|
||||
def get_creator_active_requests_count_query():
|
||||
"""
|
||||
Generates a Count query for distinct approved domain requests per creator.
|
||||
|
||||
Returns:
|
||||
Count: Aggregates distinct 'SUBMITTED', 'IN_REVIEW', and 'ACTION_NEEDED' domain requests by creator.
|
||||
"""
|
||||
|
||||
query = Count(
|
||||
"creator__domain_requests_created__id",
|
||||
filter=Q(
|
||||
creator__domain_requests_created__status__in=[
|
||||
DomainRequest.DomainRequestStatus.SUBMITTED,
|
||||
DomainRequest.DomainRequestStatus.IN_REVIEW,
|
||||
DomainRequest.DomainRequestStatus.ACTION_NEEDED,
|
||||
]
|
||||
),
|
||||
distinct=True,
|
||||
)
|
||||
return query
|
||||
|
|
|
@ -164,6 +164,17 @@ class ExportDataFederal(View):
|
|||
return response
|
||||
|
||||
|
||||
class ExportDomainRequestDataFull(View):
|
||||
"""Generates a downloaded report containing all Domain Requests (except started)"""
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
"""Returns a content disposition response for current-full-domain-request.csv"""
|
||||
response = HttpResponse(content_type="text/csv")
|
||||
response["Content-Disposition"] = 'attachment; filename="current-full-domain-request.csv"'
|
||||
csv_export.DomainRequestExport.export_full_domain_request_report(response)
|
||||
return response
|
||||
|
||||
|
||||
class ExportDataDomainsGrowth(View):
|
||||
def get(self, request, *args, **kwargs):
|
||||
# Get start_date and end_date from the request's GET parameters
|
||||
|
@ -191,7 +202,7 @@ class ExportDataRequestsGrowth(View):
|
|||
response["Content-Disposition"] = f'attachment; filename="requests-{start_date}-to-{end_date}.csv"'
|
||||
# For #999: set export_data_domain_growth_to_csv to return the resulting queryset, which we can then use
|
||||
# in context to display this data in the template.
|
||||
csv_export.export_data_requests_growth_to_csv(response, start_date, end_date)
|
||||
csv_export.DomainRequestExport.export_data_requests_growth_to_csv(response, start_date, end_date)
|
||||
|
||||
return response
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue