mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-08-06 09:45:23 +02:00
refactored domain request exports
This commit is contained in:
parent
10529e4094
commit
38125ea97f
2 changed files with 268 additions and 343 deletions
|
@ -33,17 +33,14 @@ def write_header(writer, columns):
|
|||
"""
|
||||
writer.writerow(columns)
|
||||
|
||||
|
||||
def get_default_start_date():
|
||||
# Default to a date that's prior to our first deployment
|
||||
return timezone.make_aware(datetime(2023, 11, 1))
|
||||
|
||||
|
||||
def get_default_end_date():
|
||||
# Default to now()
|
||||
return timezone.now()
|
||||
|
||||
|
||||
def format_start_date(start_date):
|
||||
return timezone.make_aware(datetime.strptime(start_date, "%Y-%m-%d")) if start_date else get_default_start_date()
|
||||
|
||||
|
@ -51,78 +48,6 @@ def format_start_date(start_date):
|
|||
def format_end_date(end_date):
|
||||
return timezone.make_aware(datetime.strptime(end_date, "%Y-%m-%d")) if end_date else get_default_end_date()
|
||||
|
||||
|
||||
def get_sliced_domains(filter_condition):
|
||||
"""Get filtered domains counts sliced by org type and election office.
|
||||
Pass distinct=True when filtering by permissions so we do not to count multiples
|
||||
when a domain has more that one manager.
|
||||
"""
|
||||
|
||||
domains = DomainInformation.objects.all().filter(**filter_condition).distinct()
|
||||
domains_count = domains.count()
|
||||
federal = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.FEDERAL).distinct().count()
|
||||
interstate = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.INTERSTATE).count()
|
||||
state_or_territory = (
|
||||
domains.filter(generic_org_type=DomainRequest.OrganizationChoices.STATE_OR_TERRITORY).distinct().count()
|
||||
)
|
||||
tribal = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.TRIBAL).distinct().count()
|
||||
county = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.COUNTY).distinct().count()
|
||||
city = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.CITY).distinct().count()
|
||||
special_district = (
|
||||
domains.filter(generic_org_type=DomainRequest.OrganizationChoices.SPECIAL_DISTRICT).distinct().count()
|
||||
)
|
||||
school_district = (
|
||||
domains.filter(generic_org_type=DomainRequest.OrganizationChoices.SCHOOL_DISTRICT).distinct().count()
|
||||
)
|
||||
election_board = domains.filter(is_election_board=True).distinct().count()
|
||||
|
||||
return [
|
||||
domains_count,
|
||||
federal,
|
||||
interstate,
|
||||
state_or_territory,
|
||||
tribal,
|
||||
county,
|
||||
city,
|
||||
special_district,
|
||||
school_district,
|
||||
election_board,
|
||||
]
|
||||
|
||||
|
||||
def get_sliced_requests(filter_condition):
|
||||
"""Get filtered requests counts sliced by org type and election office."""
|
||||
requests = DomainRequest.objects.all().filter(**filter_condition).distinct()
|
||||
requests_count = requests.count()
|
||||
federal = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.FEDERAL).distinct().count()
|
||||
interstate = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.INTERSTATE).distinct().count()
|
||||
state_or_territory = (
|
||||
requests.filter(generic_org_type=DomainRequest.OrganizationChoices.STATE_OR_TERRITORY).distinct().count()
|
||||
)
|
||||
tribal = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.TRIBAL).distinct().count()
|
||||
county = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.COUNTY).distinct().count()
|
||||
city = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.CITY).distinct().count()
|
||||
special_district = (
|
||||
requests.filter(generic_org_type=DomainRequest.OrganizationChoices.SPECIAL_DISTRICT).distinct().count()
|
||||
)
|
||||
school_district = (
|
||||
requests.filter(generic_org_type=DomainRequest.OrganizationChoices.SCHOOL_DISTRICT).distinct().count()
|
||||
)
|
||||
election_board = requests.filter(is_election_board=True).distinct().count()
|
||||
|
||||
return [
|
||||
requests_count,
|
||||
federal,
|
||||
interstate,
|
||||
state_or_territory,
|
||||
tribal,
|
||||
county,
|
||||
city,
|
||||
special_district,
|
||||
school_district,
|
||||
election_board,
|
||||
]
|
||||
|
||||
class BaseExport(ABC):
|
||||
"""
|
||||
A generic class for exporting data which returns a csv file for the given model.
|
||||
|
@ -173,6 +98,13 @@ class BaseExport(ABC):
|
|||
"""
|
||||
return []
|
||||
|
||||
@classmethod
|
||||
def get_exclusions(cls):
|
||||
"""
|
||||
Get a Q object of exclusion conditions to use when building queryset.
|
||||
"""
|
||||
return Q()
|
||||
|
||||
@classmethod
|
||||
def get_filter_conditions(cls, start_date=None, end_date=None):
|
||||
"""
|
||||
|
@ -260,6 +192,7 @@ class BaseExport(ABC):
|
|||
kwargs = cls.get_additional_args()
|
||||
select_related = cls.get_select_related()
|
||||
prefetch_related = cls.get_prefetch_related()
|
||||
exclusions = cls.get_exclusions()
|
||||
filter_conditions = cls.get_filter_conditions(start_date, end_date)
|
||||
computed_fields = cls.get_computed_fields()
|
||||
related_table_fields = cls.get_related_table_fields()
|
||||
|
@ -268,6 +201,7 @@ class BaseExport(ABC):
|
|||
cls.model().objects.select_related(*select_related)
|
||||
.prefetch_related(*prefetch_related)
|
||||
.filter(filter_conditions)
|
||||
.exclude(exclusions)
|
||||
.order_by(*sort_fields)
|
||||
.distinct()
|
||||
)
|
||||
|
@ -282,7 +216,6 @@ class BaseExport(ABC):
|
|||
# Write the csv file
|
||||
cls.write_csv(writer, columns, models_dict)
|
||||
|
||||
|
||||
@classmethod
|
||||
def write_csv(
|
||||
cls,
|
||||
|
@ -588,7 +521,6 @@ class DomainDataType(DomainExport):
|
|||
"""
|
||||
Get a list of fields from related tables.
|
||||
"""
|
||||
print("DomainDataType::get_related_table_fields")
|
||||
return [
|
||||
"domain__name",
|
||||
"domain__state",
|
||||
|
@ -1120,142 +1052,263 @@ class DomainUnmanaged(DomainExport):
|
|||
csv_writer.writerow([])
|
||||
|
||||
|
||||
class DomainRequestExport:
|
||||
"""
|
||||
A collection of functions which return csv files regarding the DomainRequest model.
|
||||
"""
|
||||
|
||||
# Get all columns on the full metadata report
|
||||
all_columns = [
|
||||
"Domain request",
|
||||
"Submitted at",
|
||||
"Status",
|
||||
"Domain type",
|
||||
"Federal type",
|
||||
"Federal agency",
|
||||
"Organization name",
|
||||
"Election office",
|
||||
"City",
|
||||
"State/territory",
|
||||
"Region",
|
||||
"Creator first name",
|
||||
"Creator last name",
|
||||
"Creator email",
|
||||
"Creator approved domains count",
|
||||
"Creator active requests count",
|
||||
"Alternative domains",
|
||||
"AO first name",
|
||||
"AO last name",
|
||||
"AO email",
|
||||
"AO title/role",
|
||||
"Request purpose",
|
||||
"Request additional details",
|
||||
"Other contacts",
|
||||
"CISA regional representative",
|
||||
"Current websites",
|
||||
"Investigator",
|
||||
]
|
||||
class DomainRequestExport(BaseExport):
|
||||
|
||||
@classmethod
|
||||
def export_data_requests_growth_to_csv(cls, csv_file, start_date, end_date):
|
||||
def model(cls):
|
||||
# Return the model class that this export handles
|
||||
return DomainRequest
|
||||
|
||||
@classmethod
|
||||
def get_sliced_requests(cls, filter_condition):
|
||||
"""Get filtered requests counts sliced by org type and election office."""
|
||||
requests = DomainRequest.objects.all().filter(**filter_condition).distinct()
|
||||
requests_count = requests.count()
|
||||
federal = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.FEDERAL).distinct().count()
|
||||
interstate = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.INTERSTATE).distinct().count()
|
||||
state_or_territory = (
|
||||
requests.filter(generic_org_type=DomainRequest.OrganizationChoices.STATE_OR_TERRITORY).distinct().count()
|
||||
)
|
||||
tribal = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.TRIBAL).distinct().count()
|
||||
county = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.COUNTY).distinct().count()
|
||||
city = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.CITY).distinct().count()
|
||||
special_district = (
|
||||
requests.filter(generic_org_type=DomainRequest.OrganizationChoices.SPECIAL_DISTRICT).distinct().count()
|
||||
)
|
||||
school_district = (
|
||||
requests.filter(generic_org_type=DomainRequest.OrganizationChoices.SCHOOL_DISTRICT).distinct().count()
|
||||
)
|
||||
election_board = requests.filter(is_election_board=True).distinct().count()
|
||||
|
||||
return [
|
||||
requests_count,
|
||||
federal,
|
||||
interstate,
|
||||
state_or_territory,
|
||||
tribal,
|
||||
county,
|
||||
city,
|
||||
special_district,
|
||||
school_district,
|
||||
election_board,
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def parse_row(cls, columns, model):
|
||||
"""
|
||||
Growth report:
|
||||
Receive start and end dates from the view, parse them.
|
||||
Request from write_requests_body SUBMITTED requests that are created between
|
||||
the start and end dates. Specify sort params.
|
||||
Given a set of columns and a model dictionary, generate a new row from cleaned column data.
|
||||
"""
|
||||
|
||||
start_date_formatted = format_start_date(start_date)
|
||||
end_date_formatted = format_end_date(end_date)
|
||||
writer = csv.writer(csv_file)
|
||||
# define columns to include in export
|
||||
columns = [
|
||||
# Handle the federal_type field. Defaults to the wrong format.
|
||||
federal_type = model.get("federal_type")
|
||||
human_readable_federal_type = BranchChoices.get_branch_label(federal_type) if federal_type else None
|
||||
|
||||
# Handle the org_type field
|
||||
org_type = model.get("generic_org_type") or model.get("organization_type")
|
||||
human_readable_org_type = DomainRequest.OrganizationChoices.get_org_label(org_type) if org_type else None
|
||||
|
||||
# Handle the status field. Defaults to the wrong format.
|
||||
status = model.get("status")
|
||||
status_display = DomainRequest.DomainRequestStatus.get_status_label(status) if status else None
|
||||
|
||||
# Handle the region field.
|
||||
state_territory = model.get("state_territory")
|
||||
region = get_region(state_territory) if state_territory else None
|
||||
|
||||
# Handle the requested_domain field (add a default if None)
|
||||
requested_domain = model.get("requested_domain__name")
|
||||
requested_domain_name = requested_domain if requested_domain else "No requested domain"
|
||||
|
||||
# Handle the election field. N/A if None, "Yes"/"No" if boolean
|
||||
human_readable_election_board = "N/A"
|
||||
is_election_board = model.get("is_election_board")
|
||||
if is_election_board is not None:
|
||||
human_readable_election_board = "Yes" if is_election_board else "No"
|
||||
|
||||
# Handle the additional details field. Pipe seperated.
|
||||
cisa_rep_first = model.get("cisa_representative_first_name")
|
||||
cisa_rep_last = model.get("cisa_representative_last_name")
|
||||
name = [n for n in [cisa_rep_first, cisa_rep_last] if n]
|
||||
|
||||
cisa_rep = " ".join(name) if name else None
|
||||
details = [cisa_rep, model.get("anything_else")]
|
||||
additional_details = " | ".join([field for field in details if field])
|
||||
|
||||
# create a dictionary of fields which can be included in output.
|
||||
# "extra_fields" are precomputed fields (generated in the DB or parsed).
|
||||
FIELDS = {
|
||||
# Parsed fields - defined above.
|
||||
"Domain request": requested_domain_name,
|
||||
"Region": region,
|
||||
"Status": status_display,
|
||||
"Election office": human_readable_election_board,
|
||||
"Federal type": human_readable_federal_type,
|
||||
"Domain type": human_readable_org_type,
|
||||
"Request additional details": additional_details,
|
||||
# Annotated fields - passed into the request dict.
|
||||
"Creator approved domains count": model.get("creator_approved_domains_count", 0),
|
||||
"Creator active requests count": model.get("creator_active_requests_count", 0),
|
||||
"Alternative domains": model.get("all_alternative_domains"),
|
||||
"Other contacts": model.get("all_other_contacts"),
|
||||
"Current websites": model.get("all_current_websites"),
|
||||
# Untouched FK fields - passed into the request dict.
|
||||
"Federal agency": model.get("federal_agency__agency"),
|
||||
"AO first name": model.get("authorizing_official__first_name"),
|
||||
"AO last name": model.get("authorizing_official__last_name"),
|
||||
"AO email": model.get("authorizing_official__email"),
|
||||
"AO title/role": model.get("authorizing_official__title"),
|
||||
"Creator first name": model.get("creator__first_name"),
|
||||
"Creator last name": model.get("creator__last_name"),
|
||||
"Creator email": model.get("creator__email"),
|
||||
"Investigator": model.get("investigator__email"),
|
||||
# Untouched fields
|
||||
"Organization name": model.get("organization_name"),
|
||||
"City": model.get("city"),
|
||||
"State/territory": model.get("state_territory"),
|
||||
"Request purpose": model.get("purpose"),
|
||||
"CISA regional representative": model.get("cisa_representative_email"),
|
||||
"Submitted at": model.get("submission_date"),
|
||||
}
|
||||
|
||||
row = [FIELDS.get(column, "") for column in columns]
|
||||
return row
|
||||
|
||||
|
||||
class DomainRequestGrowth(DomainRequestExport):
|
||||
|
||||
@classmethod
|
||||
def get_columns(cls):
|
||||
"""
|
||||
Overrides the columns for CSV export specific to DomainRequestGrowth.
|
||||
"""
|
||||
return [
|
||||
"Domain request",
|
||||
"Domain type",
|
||||
"Federal type",
|
||||
"Submitted at",
|
||||
]
|
||||
|
||||
sort_fields = [
|
||||
@classmethod
|
||||
def get_sort_fields(cls):
|
||||
"""
|
||||
Returns the sort fields.
|
||||
"""
|
||||
return [
|
||||
"requested_domain__name",
|
||||
]
|
||||
filter_condition = {
|
||||
"status": DomainRequest.DomainRequestStatus.SUBMITTED,
|
||||
"submission_date__lte": end_date_formatted,
|
||||
"submission_date__gte": start_date_formatted,
|
||||
}
|
||||
|
||||
# We don't want to annotate anything, but we do want to access the requested domain name
|
||||
annotations = {}
|
||||
additional_values = ["requested_domain__name"]
|
||||
|
||||
all_requests = DomainRequest.objects.filter(**filter_condition).order_by(*sort_fields).distinct()
|
||||
|
||||
annotated_requests = cls.annotate_and_retrieve_fields(all_requests, annotations, additional_values)
|
||||
requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False)
|
||||
|
||||
cls.write_csv_for_requests(writer, columns, requests_dict)
|
||||
|
||||
|
||||
@classmethod
|
||||
def export_full_domain_request_report(cls, csv_file):
|
||||
def get_filter_conditions(cls, start_date=None, end_date=None):
|
||||
"""
|
||||
Generates a detailed domain request report to a CSV file.
|
||||
|
||||
Retrieves and annotates DomainRequest objects, excluding 'STARTED' status,
|
||||
with related data optimizations via select/prefetch and annotation.
|
||||
|
||||
Annotated with counts and aggregates of related entities.
|
||||
Converts to dict and writes to CSV using predefined columns.
|
||||
|
||||
Parameters:
|
||||
csv_file (file-like object): Target CSV file.
|
||||
Get a Q object of filter conditions to filter when building queryset.
|
||||
"""
|
||||
writer = csv.writer(csv_file)
|
||||
|
||||
requests = (
|
||||
DomainRequest.objects.select_related(
|
||||
"creator", "authorizing_official", "federal_agency", "investigator", "requested_domain"
|
||||
)
|
||||
.prefetch_related("current_websites", "other_contacts", "alternative_domains")
|
||||
.exclude(status__in=[DomainRequest.DomainRequestStatus.STARTED])
|
||||
.order_by(
|
||||
"status",
|
||||
"requested_domain__name",
|
||||
)
|
||||
.distinct()
|
||||
start_date_formatted = format_start_date(start_date)
|
||||
end_date_formatted = format_end_date(end_date)
|
||||
return Q(
|
||||
status=DomainRequest.DomainRequestStatus.SUBMITTED,
|
||||
submission_date__lte=end_date_formatted,
|
||||
submission_date__gte=start_date_formatted,
|
||||
)
|
||||
|
||||
# Annotations are custom columns returned to the queryset (AKA: computed in the DB).
|
||||
annotations = cls._full_domain_request_annotations()
|
||||
|
||||
# The .values returned from annotate_and_retrieve_fields can't go two levels deep
|
||||
# (just returns the field id of say, "creator") - so we have to include this.
|
||||
additional_values = [
|
||||
"requested_domain__name",
|
||||
"federal_agency__agency",
|
||||
"authorizing_official__first_name",
|
||||
"authorizing_official__last_name",
|
||||
"authorizing_official__email",
|
||||
"authorizing_official__title",
|
||||
"creator__first_name",
|
||||
"creator__last_name",
|
||||
"creator__email",
|
||||
"investigator__email",
|
||||
@classmethod
|
||||
def get_related_table_fields(cls):
|
||||
"""
|
||||
Get a list of fields from related tables.
|
||||
"""
|
||||
return [
|
||||
"requested_domain__name"
|
||||
]
|
||||
|
||||
|
||||
# Convert the domain request queryset to a dictionary (including annotated fields)
|
||||
annotated_requests = cls.annotate_and_retrieve_fields(requests, annotations, additional_values)
|
||||
requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False)
|
||||
|
||||
# Write the csv file
|
||||
cls.write_csv_for_requests(writer, cls.all_columns, requests_dict)
|
||||
class DomainRequestDataFull(DomainRequestExport):
|
||||
|
||||
@classmethod
|
||||
def _full_domain_request_annotations(cls, delimiter=" | "):
|
||||
"""Returns the annotations for the full domain request report"""
|
||||
def get_columns(cls):
|
||||
"""
|
||||
Overrides the columns for CSV export specific to DomainRequestGrowth.
|
||||
"""
|
||||
return [
|
||||
"Domain request",
|
||||
"Submitted at",
|
||||
"Status",
|
||||
"Domain type",
|
||||
"Federal type",
|
||||
"Federal agency",
|
||||
"Organization name",
|
||||
"Election office",
|
||||
"City",
|
||||
"State/territory",
|
||||
"Region",
|
||||
"Creator first name",
|
||||
"Creator last name",
|
||||
"Creator email",
|
||||
"Creator approved domains count",
|
||||
"Creator active requests count",
|
||||
"Alternative domains",
|
||||
"AO first name",
|
||||
"AO last name",
|
||||
"AO email",
|
||||
"AO title/role",
|
||||
"Request purpose",
|
||||
"Request additional details",
|
||||
"Other contacts",
|
||||
"CISA regional representative",
|
||||
"Current websites",
|
||||
"Investigator",
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def get_select_related(cls):
|
||||
"""
|
||||
Get a list of tables to pass to select_related when building queryset.
|
||||
"""
|
||||
return [
|
||||
"creator",
|
||||
"authorizing_official",
|
||||
"federal_agency",
|
||||
"investigator",
|
||||
"requested_domain"
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def get_prefetch_related(cls):
|
||||
"""
|
||||
Get a list of tables to pass to prefetch_related when building queryset.
|
||||
"""
|
||||
return [
|
||||
"current_websites",
|
||||
"other_contacts",
|
||||
"alternative_domains"
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def get_exclusions(cls):
|
||||
"""
|
||||
Get a Q object of exclusion conditions to use when building queryset.
|
||||
"""
|
||||
return Q(
|
||||
status__in=[DomainRequest.DomainRequestStatus.STARTED]
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_sort_fields(cls):
|
||||
"""
|
||||
Returns the sort fields.
|
||||
"""
|
||||
return [
|
||||
"status",
|
||||
"requested_domain__name",
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def get_computed_fields(cls, delimiter=", "):
|
||||
"""
|
||||
Get a dict of computed fields.
|
||||
"""
|
||||
return {
|
||||
"creator_approved_domains_count": DomainRequestExport.get_creator_approved_domains_count_query(),
|
||||
"creator_active_requests_count": DomainRequestExport.get_creator_active_requests_count_query(),
|
||||
"creator_approved_domains_count": cls.get_creator_approved_domains_count_query(),
|
||||
"creator_active_requests_count": cls.get_creator_active_requests_count_query(),
|
||||
"all_current_websites": StringAgg("current_websites__website", delimiter=delimiter, distinct=True),
|
||||
"all_alternative_domains": StringAgg("alternative_domains__website", delimiter=delimiter, distinct=True),
|
||||
# Coerce the other contacts object to "{first_name} {last_name} {email}"
|
||||
|
@ -1271,155 +1324,33 @@ class DomainRequestExport:
|
|||
distinct=True,
|
||||
),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def write_csv_for_requests(
|
||||
writer,
|
||||
columns,
|
||||
requests_dict,
|
||||
should_write_header=True,
|
||||
):
|
||||
"""Receives params from the parent methods and outputs a CSV with filtered and sorted requests.
|
||||
Works with write_header as long as the same writer object is passed."""
|
||||
|
||||
rows = []
|
||||
for request in requests_dict.values():
|
||||
try:
|
||||
row = DomainRequestExport.parse_row_for_requests(columns, request)
|
||||
rows.append(row)
|
||||
except ValueError as err:
|
||||
logger.error(f"csv_export -> Error when parsing row: {err}")
|
||||
continue
|
||||
|
||||
if should_write_header:
|
||||
write_header(writer, columns)
|
||||
|
||||
writer.writerows(rows)
|
||||
|
||||
@staticmethod
|
||||
def parse_row_for_requests(columns, request):
|
||||
"""
|
||||
Given a set of columns and a request dictionary, generate a new row from cleaned column data.
|
||||
"""
|
||||
|
||||
# Handle the federal_type field. Defaults to the wrong format.
|
||||
federal_type = request.get("federal_type")
|
||||
human_readable_federal_type = BranchChoices.get_branch_label(federal_type) if federal_type else None
|
||||
|
||||
# Handle the org_type field
|
||||
org_type = request.get("generic_org_type") or request.get("organization_type")
|
||||
human_readable_org_type = DomainRequest.OrganizationChoices.get_org_label(org_type) if org_type else None
|
||||
|
||||
# Handle the status field. Defaults to the wrong format.
|
||||
status = request.get("status")
|
||||
status_display = DomainRequest.DomainRequestStatus.get_status_label(status) if status else None
|
||||
|
||||
# Handle the region field.
|
||||
state_territory = request.get("state_territory")
|
||||
region = get_region(state_territory) if state_territory else None
|
||||
|
||||
# Handle the requested_domain field (add a default if None)
|
||||
requested_domain = request.get("requested_domain__name")
|
||||
requested_domain_name = requested_domain if requested_domain else "No requested domain"
|
||||
|
||||
# Handle the election field. N/A if None, "Yes"/"No" if boolean
|
||||
human_readable_election_board = "N/A"
|
||||
is_election_board = request.get("is_election_board")
|
||||
if is_election_board is not None:
|
||||
human_readable_election_board = "Yes" if is_election_board else "No"
|
||||
|
||||
# Handle the additional details field. Pipe seperated.
|
||||
cisa_rep_first = request.get("cisa_representative_first_name")
|
||||
cisa_rep_last = request.get("cisa_representative_last_name")
|
||||
name = [n for n in [cisa_rep_first, cisa_rep_last] if n]
|
||||
|
||||
cisa_rep = " ".join(name) if name else None
|
||||
details = [cisa_rep, request.get("anything_else")]
|
||||
additional_details = " | ".join([field for field in details if field])
|
||||
|
||||
# create a dictionary of fields which can be included in output.
|
||||
# "extra_fields" are precomputed fields (generated in the DB or parsed).
|
||||
FIELDS = {
|
||||
# Parsed fields - defined above.
|
||||
"Domain request": requested_domain_name,
|
||||
"Region": region,
|
||||
"Status": status_display,
|
||||
"Election office": human_readable_election_board,
|
||||
"Federal type": human_readable_federal_type,
|
||||
"Domain type": human_readable_org_type,
|
||||
"Request additional details": additional_details,
|
||||
# Annotated fields - passed into the request dict.
|
||||
"Creator approved domains count": request.get("creator_approved_domains_count", 0),
|
||||
"Creator active requests count": request.get("creator_active_requests_count", 0),
|
||||
"Alternative domains": request.get("all_alternative_domains"),
|
||||
"Other contacts": request.get("all_other_contacts"),
|
||||
"Current websites": request.get("all_current_websites"),
|
||||
# Untouched FK fields - passed into the request dict.
|
||||
"Federal agency": request.get("federal_agency__agency"),
|
||||
"AO first name": request.get("authorizing_official__first_name"),
|
||||
"AO last name": request.get("authorizing_official__last_name"),
|
||||
"AO email": request.get("authorizing_official__email"),
|
||||
"AO title/role": request.get("authorizing_official__title"),
|
||||
"Creator first name": request.get("creator__first_name"),
|
||||
"Creator last name": request.get("creator__last_name"),
|
||||
"Creator email": request.get("creator__email"),
|
||||
"Investigator": request.get("investigator__email"),
|
||||
# Untouched fields
|
||||
"Organization name": request.get("organization_name"),
|
||||
"City": request.get("city"),
|
||||
"State/territory": request.get("state_territory"),
|
||||
"Request purpose": request.get("purpose"),
|
||||
"CISA regional representative": request.get("cisa_representative_email"),
|
||||
"Submitted at": request.get("submission_date"),
|
||||
}
|
||||
|
||||
row = [FIELDS.get(column, "") for column in columns]
|
||||
return row
|
||||
|
||||
|
||||
@classmethod
|
||||
def annotate_and_retrieve_fields(
|
||||
cls, requests, annotations, additional_values=None, include_many_to_many=False
|
||||
) -> QuerySet:
|
||||
def get_related_table_fields(cls):
|
||||
"""
|
||||
Applies annotations to a queryset and retrieves specified fields,
|
||||
including class-defined and annotation-defined.
|
||||
|
||||
Parameters:
|
||||
requests (QuerySet): Initial queryset.
|
||||
annotations (dict, optional): Fields to compute {field_name: expression}.
|
||||
additional_values (list, optional): Extra fields to retrieve; defaults to annotation keys if None.
|
||||
include_many_to_many (bool, optional): Determines if we should include many to many fields or not
|
||||
|
||||
Returns:
|
||||
QuerySet: Contains dictionaries with the specified fields for each record.
|
||||
Get a list of fields from related tables.
|
||||
"""
|
||||
|
||||
if additional_values is None:
|
||||
additional_values = []
|
||||
|
||||
# We can infer that if we're passing in annotations,
|
||||
# we want to grab the result of said annotation.
|
||||
if annotations:
|
||||
additional_values.extend(annotations.keys())
|
||||
|
||||
# Get prexisting fields on DomainRequest
|
||||
domain_request_fields = set()
|
||||
for field in DomainRequest._meta.get_fields():
|
||||
# Exclude many to many fields unless we specify
|
||||
many_to_many = isinstance(field, ManyToManyField) and include_many_to_many
|
||||
if many_to_many or not isinstance(field, ManyToManyField):
|
||||
domain_request_fields.add(field.name)
|
||||
|
||||
queryset = requests.annotate(**annotations).values(*domain_request_fields, *additional_values)
|
||||
return queryset
|
||||
return [
|
||||
"requested_domain__name",
|
||||
"federal_agency__agency",
|
||||
"authorizing_official__first_name",
|
||||
"authorizing_official__last_name",
|
||||
"authorizing_official__email",
|
||||
"authorizing_official__title",
|
||||
"creator__first_name",
|
||||
"creator__last_name",
|
||||
"creator__email",
|
||||
"investigator__email",
|
||||
]
|
||||
|
||||
|
||||
# ============================================================= #
|
||||
# Helper functions for django ORM queries. #
|
||||
# We are using these rather than pure python for speed reasons. #
|
||||
# ============================================================= #
|
||||
|
||||
@staticmethod
|
||||
def get_creator_approved_domains_count_query():
|
||||
@classmethod
|
||||
def get_creator_approved_domains_count_query(cls):
|
||||
"""
|
||||
Generates a Count query for distinct approved domain requests per creator.
|
||||
|
||||
|
@ -1434,8 +1365,8 @@ class DomainRequestExport:
|
|||
)
|
||||
return query
|
||||
|
||||
@staticmethod
|
||||
def get_creator_active_requests_count_query():
|
||||
@classmethod
|
||||
def get_creator_active_requests_count_query(cls):
|
||||
"""
|
||||
Generates a Count query for distinct approved domain requests per creator.
|
||||
|
||||
|
|
|
@ -91,8 +91,8 @@ class AnalyticsView(View):
|
|||
filter_requests_end_date = {
|
||||
"created_at__lte": end_date_formatted,
|
||||
}
|
||||
requests_sliced_at_start_date = csv_export.get_sliced_requests(filter_requests_start_date)
|
||||
requests_sliced_at_end_date = csv_export.get_sliced_requests(filter_requests_end_date)
|
||||
requests_sliced_at_start_date = csv_export.DomainRequestExport.get_sliced_requests(filter_requests_start_date)
|
||||
requests_sliced_at_end_date = csv_export.DomainRequestExport.get_sliced_requests(filter_requests_end_date)
|
||||
|
||||
filter_submitted_requests_start_date = {
|
||||
"status": models.DomainRequest.DomainRequestStatus.SUBMITTED,
|
||||
|
@ -102,8 +102,8 @@ class AnalyticsView(View):
|
|||
"status": models.DomainRequest.DomainRequestStatus.SUBMITTED,
|
||||
"submission_date__lte": end_date_formatted,
|
||||
}
|
||||
submitted_requests_sliced_at_start_date = csv_export.get_sliced_requests(filter_submitted_requests_start_date)
|
||||
submitted_requests_sliced_at_end_date = csv_export.get_sliced_requests(filter_submitted_requests_end_date)
|
||||
submitted_requests_sliced_at_start_date = csv_export.DomainRequestExport.get_sliced_requests(filter_submitted_requests_start_date)
|
||||
submitted_requests_sliced_at_end_date = csv_export.DomainRequestExport.get_sliced_requests(filter_submitted_requests_end_date)
|
||||
|
||||
context = dict(
|
||||
# Generate a dictionary of context variables that are common across all admin templates
|
||||
|
@ -142,7 +142,6 @@ class ExportDataType(View):
|
|||
# match the CSV example with all the fields
|
||||
response = HttpResponse(content_type="text/csv")
|
||||
response["Content-Disposition"] = 'attachment; filename="domains-by-type.csv"'
|
||||
#csv_export.DomainExport.export_data_type_to_csv(response)
|
||||
csv_export.DomainDataType.export_data_to_csv(response)
|
||||
return response
|
||||
|
||||
|
@ -152,7 +151,6 @@ class ExportDataFull(View):
|
|||
# Smaller export based on 1
|
||||
response = HttpResponse(content_type="text/csv")
|
||||
response["Content-Disposition"] = 'attachment; filename="current-full.csv"'
|
||||
#csv_export.DomainExport.export_data_full_to_csv(response)
|
||||
csv_export.DomainDataFull.export_data_to_csv(response)
|
||||
return response
|
||||
|
||||
|
@ -162,7 +160,6 @@ class ExportDataFederal(View):
|
|||
# Federal only
|
||||
response = HttpResponse(content_type="text/csv")
|
||||
response["Content-Disposition"] = 'attachment; filename="current-federal.csv"'
|
||||
#csv_export.DomainExport.export_data_federal_to_csv(response)
|
||||
csv_export.DomainDataFederal.export_data_to_csv(response)
|
||||
return response
|
||||
|
||||
|
@ -174,7 +171,7 @@ class ExportDomainRequestDataFull(View):
|
|||
"""Returns a content disposition response for current-full-domain-request.csv"""
|
||||
response = HttpResponse(content_type="text/csv")
|
||||
response["Content-Disposition"] = 'attachment; filename="current-full-domain-request.csv"'
|
||||
csv_export.DomainRequestExport.export_full_domain_request_report(response)
|
||||
csv_export.DomainRequestDataFull.export_data_to_csv(response)
|
||||
return response
|
||||
|
||||
|
||||
|
@ -185,7 +182,6 @@ class ExportDataDomainsGrowth(View):
|
|||
|
||||
response = HttpResponse(content_type="text/csv")
|
||||
response["Content-Disposition"] = f'attachment; filename="domain-growth-report-{start_date}-to-{end_date}.csv"'
|
||||
#csv_export.DomainExport.export_data_domain_growth_to_csv(response, start_date, end_date)
|
||||
csv_export.DomainGrowth.export_data_to_csv(response, start_date, end_date)
|
||||
|
||||
return response
|
||||
|
@ -198,7 +194,7 @@ class ExportDataRequestsGrowth(View):
|
|||
|
||||
response = HttpResponse(content_type="text/csv")
|
||||
response["Content-Disposition"] = f'attachment; filename="requests-{start_date}-to-{end_date}.csv"'
|
||||
csv_export.DomainRequestExport.export_data_requests_growth_to_csv(response, start_date, end_date)
|
||||
csv_export.DomainRequestGrowth.export_data_to_csv(response, start_date, end_date)
|
||||
|
||||
return response
|
||||
|
||||
|
@ -209,7 +205,6 @@ class ExportDataManagedDomains(View):
|
|||
end_date = request.GET.get("end_date", "")
|
||||
response = HttpResponse(content_type="text/csv")
|
||||
response["Content-Disposition"] = f'attachment; filename="managed-domains-{start_date}-to-{end_date}.csv"'
|
||||
#csv_export.DomainExport.export_data_managed_domains_to_csv(response, start_date, end_date)
|
||||
csv_export.DomainManaged.export_data_to_csv(response, start_date, end_date)
|
||||
|
||||
return response
|
||||
|
@ -221,7 +216,6 @@ class ExportDataUnmanagedDomains(View):
|
|||
end_date = request.GET.get("end_date", "")
|
||||
response = HttpResponse(content_type="text/csv")
|
||||
response["Content-Disposition"] = f'attachment; filename="unmanaged-domains-{start_date}-to-{end_date}.csv"'
|
||||
#csv_export.DomainExport.export_data_unmanaged_domains_to_csv(response, start_date, end_date)
|
||||
csv_export.DomainUnmanaged.export_data_to_csv(response, start_date, end_date)
|
||||
|
||||
return response
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue