refactored domain exports

This commit is contained in:
David Kennedy 2024-06-27 16:24:50 -04:00
parent cd279468e5
commit 10529e4094
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B
2 changed files with 15 additions and 651 deletions

View file

@ -317,7 +317,7 @@ class BaseExport(ABC):
pass
class NewDomainExport(BaseExport):
class DomainExport(BaseExport):
"""
A collection of functions which return csv files regarding the Domain model.
"""
@ -490,7 +490,7 @@ class NewDomainExport(BaseExport):
]
class DomainDataType(NewDomainExport):
class DomainDataType(DomainExport):
@classmethod
def get_columns(cls):
@ -602,7 +602,7 @@ class DomainDataType(NewDomainExport):
]
class DomainDataFull(NewDomainExport):
class DomainDataFull(DomainExport):
@classmethod
def get_columns(cls):
@ -695,7 +695,7 @@ class DomainDataFull(NewDomainExport):
]
class DomainDataFederal(NewDomainExport):
class DomainDataFederal(DomainExport):
@classmethod
def get_columns(cls):
@ -789,7 +789,7 @@ class DomainDataFederal(NewDomainExport):
]
class DomainGrowth(NewDomainExport):
class DomainGrowth(DomainExport):
@classmethod
def get_columns(cls):
@ -866,7 +866,7 @@ class DomainGrowth(NewDomainExport):
]
class DomainManaged(NewDomainExport):
class DomainManaged(DomainExport):
@classmethod
def get_columns(cls):
@ -1004,7 +1004,7 @@ class DomainManaged(NewDomainExport):
csv_writer.writerow([])
class DomainUnmanaged(NewDomainExport):
class DomainUnmanaged(DomainExport):
@classmethod
def get_columns(cls):
@ -1120,642 +1120,6 @@ class DomainUnmanaged(NewDomainExport):
csv_writer.writerow([])
class DomainExport:
@classmethod
def export_data_type_to_csv(cls, csv_file):
"""
All domain metadata:
Exports domains of all statuses plus domain managers.
"""
writer = csv.writer(csv_file)
columns = [
"Domain name",
"Status",
"First ready on",
"Expiration date",
"Domain type",
"Agency",
"Organization name",
"City",
"State",
"AO",
"AO email",
"Security contact email",
"Domain managers",
"Invited domain managers",
]
# Coalesce is used to replace federal_type of None with ZZZZZ
sort_fields = [
"organization_type",
Coalesce("federal_type", Value("ZZZZZ")),
"federal_agency",
"domain__name",
]
# Fetch all relevant PublicContact entries
public_contacts = cls.get_all_security_emails()
# Fetch all relevant Invite entries
domain_invitations = cls.get_all_domain_invitations()
# Fetch all relevant ComainUserRole entries
user_domain_roles = cls.get_all_user_domain_roles()
domain_infos = (
DomainInformation.objects.select_related("domain", "authorizing_official")
.prefetch_related("permissions")
.order_by(*sort_fields)
.distinct()
)
annotations = cls._domain_metadata_annotations()
# The .values returned from annotate_and_retrieve_fields can't go two levels deep
# (just returns the field id of say, "creator") - so we have to include this.
additional_values = [
"domain__name",
"domain__state",
"domain__first_ready",
"domain__expiration_date",
"domain__created_at",
"domain__deleted",
"domain__security_contact_registry_id",
"authorizing_official__email",
"federal_agency__agency",
]
# Convert the domain request queryset to a dictionary (including annotated fields)
annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, public_contacts, domain_invitations, user_domain_roles, additional_values)
requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False)
# Write the csv file
cls.write_csv_for_domains(writer, columns, requests_dict)
@classmethod
def export_data_full_to_csv(cls, csv_file):
"""Current full"""
writer = csv.writer(csv_file)
columns = [
"Domain name",
"Domain type",
"Agency",
"Organization name",
"City",
"State",
"Security contact email",
]
# Coalesce is used to replace federal_type of None with ZZZZZ
sort_fields = [
"organization_type",
Coalesce("federal_type", Value("ZZZZZ")),
"federal_agency",
"domain__name",
]
filter_condition = {
"domain__state__in": [
Domain.State.READY,
Domain.State.DNS_NEEDED,
Domain.State.ON_HOLD,
],
}
# Fetch all relevant PublicContact entries
public_contacts = cls.get_all_security_emails()
domain_infos = (
DomainInformation.objects.select_related("domain")
.filter(**filter_condition)
.order_by(*sort_fields)
.distinct()
)
annotations = {}
additional_values = [
"domain__name",
"domain__security_contact_registry_id",
"federal_agency__agency",
]
# Convert the domain request queryset to a dictionary (including annotated fields)
annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, public_contacts, {}, {}, additional_values)
requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False)
# Write the csv file
cls.write_csv_for_domains(writer, columns, requests_dict)
@classmethod
def export_data_federal_to_csv(cls, csv_file):
"""Current federal"""
writer = csv.writer(csv_file)
columns = [
"Domain name",
"Domain type",
"Agency",
"Organization name",
"City",
"State",
"Security contact email",
]
# Coalesce is used to replace federal_type of None with ZZZZZ
sort_fields = [
"organization_type",
Coalesce("federal_type", Value("ZZZZZ")),
"federal_agency",
"domain__name",
]
filter_condition = {
"organization_type__icontains": "federal",
"domain__state__in": [
Domain.State.READY,
Domain.State.DNS_NEEDED,
Domain.State.ON_HOLD,
],
}
# Fetch all relevant PublicContact entries
public_contacts = cls.get_all_security_emails()
domain_infos = (
DomainInformation.objects.select_related("domain")
.filter(**filter_condition)
.order_by(*sort_fields)
.distinct()
)
annotations = {}
additional_values = [
"domain__name",
"domain__security_contact_registry_id",
"federal_agency__agency",
]
# Convert the domain request queryset to a dictionary (including annotated fields)
annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, public_contacts, {}, {}, additional_values)
requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False)
# Write the csv file
cls.write_csv_for_domains(writer, columns, requests_dict)
@classmethod
def export_data_domain_growth_to_csv(cls, csv_file, start_date, end_date):
"""
Domain growth:
Receive start and end dates from the view, parse them.
Request from write_body READY domains that are created between
the start and end dates, as well as DELETED domains that are deleted between
the start and end dates. Specify sort params for both lists.
"""
start_date_formatted = format_start_date(start_date)
end_date_formatted = format_end_date(end_date)
writer = csv.writer(csv_file)
# define columns to include in export
columns = [
"Domain name",
"Domain type",
"Agency",
"Organization name",
"City",
"State",
"Status",
"Expiration date",
"Created at",
"First ready",
"Deleted",
]
sort_fields = [
"domain__first_ready",
"domain__name",
]
filter_condition = {
"domain__state__in": [Domain.State.READY],
"domain__first_ready__lte": end_date_formatted,
"domain__first_ready__gte": start_date_formatted,
}
# We also want domains deleted between sar and end dates, sorted
sort_fields_for_deleted_domains = [
"domain__deleted",
"domain__name",
]
filter_condition_for_deleted_domains = {
"domain__state__in": [Domain.State.DELETED],
"domain__deleted__lte": end_date_formatted,
"domain__deleted__gte": start_date_formatted,
}
domain_infos = (
DomainInformation.objects.select_related("domain")
.filter(**filter_condition)
.order_by(*sort_fields)
.distinct()
)
deleted_domain_infos = (
DomainInformation.objects.select_related("domain")
.filter(**filter_condition_for_deleted_domains)
.order_by(*sort_fields_for_deleted_domains)
.distinct()
)
annotations = {}
additional_values = [
"domain__name",
"domain__state",
"domain__first_ready",
"domain__expiration_date",
"domain__created_at",
"domain__deleted",
"federal_agency__agency",
]
# Convert the domain request queryset to a dictionary (including annotated fields)
annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, {}, {}, additional_values)
requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False)
# Convert the domain request queryset to a dictionary (including annotated fields)
deleted_annotated_domains = cls.annotate_and_retrieve_fields(deleted_domain_infos, annotations, {}, {}, {}, additional_values)
deleted_requests_dict = convert_queryset_to_dict(deleted_annotated_domains, is_model=False)
cls.write_csv_for_domains(
writer, columns, requests_dict
)
cls.write_csv_for_domains(
writer,
columns,
deleted_requests_dict,
should_write_header=False,
)
@classmethod
def export_data_managed_domains_to_csv(cls, csv_file, start_date, end_date):
"""
Managed domains:
Get counts for domains that have domain managers for two different dates,
get list of managed domains at end_date."""
start_date_formatted = format_start_date(start_date)
end_date_formatted = format_end_date(end_date)
writer = csv.writer(csv_file)
columns = [
"Domain name",
"Domain type",
"Domain managers",
"Invited domain managers",
]
sort_fields = [
"domain__name",
]
filter_managed_domains_start_date = {
"domain__permissions__isnull": False,
"domain__first_ready__lte": start_date_formatted,
}
managed_domains_sliced_at_start_date = get_sliced_domains(filter_managed_domains_start_date)
writer.writerow(["MANAGED DOMAINS COUNTS AT START DATE"])
writer.writerow(
[
"Total",
"Federal",
"Interstate",
"State or territory",
"Tribal",
"County",
"City",
"Special district",
"School district",
"Election office",
]
)
writer.writerow(managed_domains_sliced_at_start_date)
writer.writerow([])
filter_managed_domains_end_date = {
"domain__permissions__isnull": False,
"domain__first_ready__lte": end_date_formatted,
}
managed_domains_sliced_at_end_date = get_sliced_domains(filter_managed_domains_end_date)
writer.writerow(["MANAGED DOMAINS COUNTS AT END DATE"])
writer.writerow(
[
"Total",
"Federal",
"Interstate",
"State or territory",
"Tribal",
"County",
"City",
"Special district",
"School district",
"Election office",
]
)
writer.writerow(managed_domains_sliced_at_end_date)
writer.writerow([])
domain_invitations = cls.get_all_domain_invitations()
# Fetch all relevant ComainUserRole entries
user_domain_roles = cls.get_all_user_domain_roles()
annotations = {}
# The .values returned from annotate_and_retrieve_fields can't go two levels deep
# (just returns the field id of say, "creator") - so we have to include this.
additional_values = [
"domain__name",
]
domain_infos = (
DomainInformation.objects.select_related("domain")
.prefetch_related("permissions")
.filter(**filter_managed_domains_end_date)
.order_by(*sort_fields)
.distinct()
)
# Convert the domain request queryset to a dictionary (including annotated fields)
annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, domain_invitations, user_domain_roles, additional_values)
requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False)
cls.write_csv_for_domains(
writer,
columns,
requests_dict
)
@classmethod
def export_data_unmanaged_domains_to_csv(cls, csv_file, start_date, end_date):
"""
Unmanaged domains:
Get counts for domains that have domain managers for two different dates,
get list of managed domains at end_date."""
start_date_formatted = format_start_date(start_date)
end_date_formatted = format_end_date(end_date)
writer = csv.writer(csv_file)
columns = [
"Domain name",
"Domain type",
]
sort_fields = [
"domain__name",
]
filter_unmanaged_domains_start_date = {
"domain__permissions__isnull": True,
"domain__first_ready__lte": start_date_formatted,
}
unmanaged_domains_sliced_at_start_date = get_sliced_domains(filter_unmanaged_domains_start_date)
writer.writerow(["UNMANAGED DOMAINS AT START DATE"])
writer.writerow(
[
"Total",
"Federal",
"Interstate",
"State or territory",
"Tribal",
"County",
"City",
"Special district",
"School district",
"Election office",
]
)
writer.writerow(unmanaged_domains_sliced_at_start_date)
writer.writerow([])
filter_unmanaged_domains_end_date = {
"domain__permissions__isnull": True,
"domain__first_ready__lte": end_date_formatted,
}
unmanaged_domains_sliced_at_end_date = get_sliced_domains(filter_unmanaged_domains_end_date)
writer.writerow(["UNMANAGED DOMAINS AT END DATE"])
writer.writerow(
[
"Total",
"Federal",
"Interstate",
"State or territory",
"Tribal",
"County",
"City",
"Special district",
"School district",
"Election office",
]
)
writer.writerow(unmanaged_domains_sliced_at_end_date)
writer.writerow([])
annotations = {}
# The .values returned from annotate_and_retrieve_fields can't go two levels deep
# (just returns the field id of say, "creator") - so we have to include this.
additional_values = [
"domain__name",
]
domain_infos = (
DomainInformation.objects.select_related("domain")
.filter(**filter_unmanaged_domains_end_date)
.order_by(*sort_fields)
.distinct()
)
# Convert the domain request queryset to a dictionary (including annotated fields)
annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, {}, {}, additional_values)
requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False)
cls.write_csv_for_domains(
writer,
columns,
requests_dict
)
@classmethod
def _domain_metadata_annotations(cls, delimiter=", "):
""""""
return {
"ao_name": Concat(
Coalesce(F("authorizing_official__first_name"), Value("")),
Value(" "),
Coalesce(F("authorizing_official__last_name"), Value("")),
output_field=CharField(),
),
}
@classmethod
def annotate_and_retrieve_fields(
cls, domains, annotations, public_contacts={}, domain_invitations={}, user_domain_roles={}, additional_values=None, include_many_to_many=False
) -> QuerySet:
"""
Applies annotations to a queryset and retrieves specified fields,
including class-defined and annotation-defined.
Parameters:
requests (QuerySet): Initial queryset.
annotations (dict, optional): Fields to compute {field_name: expression}.
additional_values (list, optional): Extra fields to retrieve; defaults to annotation keys if None.
include_many_to_many (bool, optional): Determines if we should include many to many fields or not
Returns:
QuerySet: Contains dictionaries with the specified fields for each record.
"""
if additional_values is None:
additional_values = []
# We can infer that if we're passing in annotations,
# we want to grab the result of said annotation.
if annotations:
additional_values.extend(annotations.keys())
# Get prexisting fields on DomainRequest
domain_fields = set()
for field in DomainInformation._meta.get_fields():
# Exclude many to many fields unless we specify
many_to_many = isinstance(field, ManyToManyField) and include_many_to_many
if many_to_many or not isinstance(field, ManyToManyField):
domain_fields.add(field.name)
queryset = domains.annotate(**annotations).values(*domain_fields, *additional_values)
annotated_domains = []
# Create mapping of domain to a list of invited users and managers
invited_users_dict = defaultdict(list)
for domain, email in domain_invitations:
invited_users_dict[domain].append(email)
managers_dict = defaultdict(list)
for domain, email in user_domain_roles:
managers_dict[domain].append(email)
# Annotate with security_contact from public_contacts
for domain in queryset:
domain['security_contact_email'] = public_contacts.get(domain.get('domain__security_contact_registry_id'))
domain['invited_users'] = ', '.join(invited_users_dict.get(domain.get('domain__name'), []))
domain['managers'] = ', '.join(managers_dict.get(domain.get('domain__name'), []))
annotated_domains.append(domain)
if annotated_domains:
return annotated_domains
return queryset
@staticmethod
def parse_row_for_domains(columns, domain):
"""
Given a set of columns and a request dictionary, generate a new row from cleaned column data.
"""
status = domain.get("domain__state")
human_readable_status = Domain.State.get_state_label(status)
expiration_date = domain.get("domain__expiration_date")
if expiration_date is None:
expiration_date = "(blank)"
first_ready_on = domain.get("domain__first_ready")
if first_ready_on is None:
first_ready_on = "(blank)"
domain_org_type = domain.get("generic_org_type")
human_readable_domain_org_type = DomainRequest.OrganizationChoices.get_org_label(domain_org_type)
domain_federal_type = domain.get("federal_type")
human_readable_domain_federal_type = BranchChoices.get_branch_label(domain_federal_type)
domain_type = human_readable_domain_org_type
if domain_federal_type and domain_org_type == DomainRequest.OrgChoicesElectionOffice.FEDERAL:
domain_type = f"{human_readable_domain_org_type} - {human_readable_domain_federal_type}"
if domain.get("domain__name") == "18f.gov":
print(f'domain_type {domain_type}')
print(f'federal_agency {domain.get("federal_agency")}')
print(f'city {domain.get("city")}')
print(f'agency {domain.get("agency")}')
print(f'federal_agency__agency {domain.get("federal_agency__agency")}')
# create a dictionary of fields which can be included in output.
# "extra_fields" are precomputed fields (generated in the DB or parsed).
FIELDS = {
"Domain name": domain.get("domain__name"),
"Status": human_readable_status,
"First ready on": first_ready_on,
"Expiration date": expiration_date,
"Domain type": domain_type,
"Agency": domain.get("federal_agency__agency"),
"Organization name": domain.get("organization_name"),
"City": domain.get("city"),
"State": domain.get("state_territory"),
"AO": domain.get("ao_name"),
"AO email": domain.get("authorizing_official__email"),
"Security contact email": domain.get("security_contact_email"),
"Created at": domain.get("domain__created_at"),
"Deleted": domain.get("domain__deleted"),
"Domain managers": domain.get("managers"),
"Invited domain managers": domain.get("invited_users"),
}
row = [FIELDS.get(column, "") for column in columns]
return row
@staticmethod
def write_csv_for_domains(
writer,
columns,
domains_dict,
should_write_header=True,
):
"""Receives params from the parent methods and outputs a CSV with filtered and sorted requests.
Works with write_header as long as the same writer object is passed."""
rows = []
for domain in domains_dict.values():
try:
row = DomainExport.parse_row_for_domains(columns, domain)
rows.append(row)
except ValueError as err:
logger.error(f"csv_export -> Error when parsing row: {err}")
continue
if should_write_header:
write_header(writer, columns)
writer.writerows(rows)
# ============================================================= #
# Helper functions for django ORM queries. #
# We are using these rather than pure python for speed reasons. #
# ============================================================= #
@classmethod
def get_all_security_emails(cls):
"""
Fetch all PublicContact entries and return a mapping of registry_id to email.
"""
public_contacts = PublicContact.objects.values_list('registry_id', 'email')
return {registry_id: email for registry_id, email in public_contacts}
@classmethod
def get_all_domain_invitations(cls):
"""
Fetch all DomainInvitation entries and return a mapping of domain to email.
"""
domain_invitations = DomainInvitation.objects.filter(status="invited").values_list('domain__name', 'email')
return list(domain_invitations)
@classmethod
def get_all_user_domain_roles(cls):
"""
Fetch all UserDomainRole entries and return a mapping of domain to user__email.
"""
user_domain_roles = UserDomainRole.objects.select_related('user').values_list('domain__name', 'user__email')
return list(user_domain_roles)
class DomainRequestExport:
"""
A collection of functions which return csv files regarding the DomainRequest model.

View file

@ -49,8 +49,8 @@ class AnalyticsView(View):
"domain__permissions__isnull": False,
"domain__first_ready__lte": end_date_formatted,
}
managed_domains_sliced_at_start_date = csv_export.NewDomainExport.get_sliced_domains(filter_managed_domains_start_date)
managed_domains_sliced_at_end_date = csv_export.NewDomainExport.get_sliced_domains(filter_managed_domains_end_date)
managed_domains_sliced_at_start_date = csv_export.DomainExport.get_sliced_domains(filter_managed_domains_start_date)
managed_domains_sliced_at_end_date = csv_export.DomainExport.get_sliced_domains(filter_managed_domains_end_date)
filter_unmanaged_domains_start_date = {
"domain__permissions__isnull": True,
@ -60,8 +60,8 @@ class AnalyticsView(View):
"domain__permissions__isnull": True,
"domain__first_ready__lte": end_date_formatted,
}
unmanaged_domains_sliced_at_start_date = csv_export.NewDomainExport.get_sliced_domains(filter_unmanaged_domains_start_date)
unmanaged_domains_sliced_at_end_date = csv_export.NewDomainExport.get_sliced_domains(filter_unmanaged_domains_end_date)
unmanaged_domains_sliced_at_start_date = csv_export.DomainExport.get_sliced_domains(filter_unmanaged_domains_start_date)
unmanaged_domains_sliced_at_end_date = csv_export.DomainExport.get_sliced_domains(filter_unmanaged_domains_end_date)
filter_ready_domains_start_date = {
"domain__state__in": [models.Domain.State.READY],
@ -71,8 +71,8 @@ class AnalyticsView(View):
"domain__state__in": [models.Domain.State.READY],
"domain__first_ready__lte": end_date_formatted,
}
ready_domains_sliced_at_start_date = csv_export.NewDomainExport.get_sliced_domains(filter_ready_domains_start_date)
ready_domains_sliced_at_end_date = csv_export.NewDomainExport.get_sliced_domains(filter_ready_domains_end_date)
ready_domains_sliced_at_start_date = csv_export.DomainExport.get_sliced_domains(filter_ready_domains_start_date)
ready_domains_sliced_at_end_date = csv_export.DomainExport.get_sliced_domains(filter_ready_domains_end_date)
filter_deleted_domains_start_date = {
"domain__state__in": [models.Domain.State.DELETED],
@ -82,8 +82,8 @@ class AnalyticsView(View):
"domain__state__in": [models.Domain.State.DELETED],
"domain__deleted__lte": end_date_formatted,
}
deleted_domains_sliced_at_start_date = csv_export.NewDomainExport.get_sliced_domains(filter_deleted_domains_start_date)
deleted_domains_sliced_at_end_date = csv_export.NewDomainExport.get_sliced_domains(filter_deleted_domains_end_date)
deleted_domains_sliced_at_start_date = csv_export.DomainExport.get_sliced_domains(filter_deleted_domains_start_date)
deleted_domains_sliced_at_end_date = csv_export.DomainExport.get_sliced_domains(filter_deleted_domains_end_date)
filter_requests_start_date = {
"created_at__lte": start_date_formatted,