Change org type

This commit is contained in:
zandercymatics 2024-04-09 15:47:04 -06:00
parent 7e630761cb
commit fc4ccd72ae
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7

View file

@ -3,6 +3,7 @@ import logging
from datetime import datetime from datetime import datetime
from registrar.models.domain import Domain from registrar.models.domain import Domain
from registrar.models.domain_invitation import DomainInvitation from registrar.models.domain_invitation import DomainInvitation
from django.db.models import Case, When, Count, Value
from registrar.models.domain_request import DomainRequest from registrar.models.domain_request import DomainRequest
from registrar.models.domain_information import DomainInformation from registrar.models.domain_information import DomainInformation
from django.utils import timezone from django.utils import timezone
@ -87,10 +88,10 @@ def parse_row_for_domain(
if security_email.lower() in invalid_emails: if security_email.lower() in invalid_emails:
security_email = "(blank)" security_email = "(blank)"
if domain_info.federal_type and domain_info.generic_org_type == DomainRequest.OrganizationChoices.FEDERAL: if domain_info.federal_type and domain_info.organization_type == DomainRequest.OrgChoicesElectionOffice.FEDERAL:
domain_type = f"{domain_info.get_generic_org_type_display()} - {domain_info.get_federal_type_display()}" domain_type = f"{domain_info.organization_type} - {domain_info.get_federal_type_display()}"
else: else:
domain_type = domain_info.get_generic_org_type_display() domain_type = domain_info.organization_type
# create a dictionary of fields which can be included in output # create a dictionary of fields which can be included in output
FIELDS = { FIELDS = {
@ -319,9 +320,9 @@ def parse_row_for_requests(columns, request: DomainRequest):
requested_domain_name = request.requested_domain.name requested_domain_name = request.requested_domain.name
if request.federal_type: if request.federal_type:
request_type = f"{request.get_generic_org_type_display()} - {request.get_federal_type_display()}" request_type = f"{request.organization_type} - {request.get_federal_type_display()}"
else: else:
request_type = request.get_generic_org_type_display() request_type = request.organization_type
# create a dictionary of fields which can be included in output # create a dictionary of fields which can be included in output
FIELDS = { FIELDS = {
@ -399,7 +400,7 @@ def export_data_type_to_csv(csv_file):
# Coalesce is used to replace federal_type of None with ZZZZZ # Coalesce is used to replace federal_type of None with ZZZZZ
sort_fields = [ sort_fields = [
"generic_org_type", "organization_type",
Coalesce("federal_type", Value("ZZZZZ")), Coalesce("federal_type", Value("ZZZZZ")),
"federal_agency", "federal_agency",
"domain__name", "domain__name",
@ -432,7 +433,7 @@ def export_data_full_to_csv(csv_file):
] ]
# Coalesce is used to replace federal_type of None with ZZZZZ # Coalesce is used to replace federal_type of None with ZZZZZ
sort_fields = [ sort_fields = [
"generic_org_type", "organization_type",
Coalesce("federal_type", Value("ZZZZZ")), Coalesce("federal_type", Value("ZZZZZ")),
"federal_agency", "federal_agency",
"domain__name", "domain__name",
@ -465,13 +466,13 @@ def export_data_federal_to_csv(csv_file):
] ]
# Coalesce is used to replace federal_type of None with ZZZZZ # Coalesce is used to replace federal_type of None with ZZZZZ
sort_fields = [ sort_fields = [
"generic_org_type", "organization_type",
Coalesce("federal_type", Value("ZZZZZ")), Coalesce("federal_type", Value("ZZZZZ")),
"federal_agency", "federal_agency",
"domain__name", "domain__name",
] ]
filter_condition = { filter_condition = {
"generic_org_type__icontains": "federal", "organization_type__icontains": "federal",
"domain__state__in": [ "domain__state__in": [
Domain.State.READY, Domain.State.READY,
Domain.State.DNS_NEEDED, Domain.State.DNS_NEEDED,
@ -566,74 +567,52 @@ def get_sliced_domains(filter_condition):
Pass distinct=True when filtering by permissions so we do not to count multiples Pass distinct=True when filtering by permissions so we do not to count multiples
when a domain has more that one manager. when a domain has more that one manager.
""" """
return get_org_type_counts(DomainInformation, filter_condition)
domains = DomainInformation.objects.all().filter(**filter_condition).distinct()
domains_count = domains.count()
federal = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.FEDERAL).distinct().count()
interstate = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.INTERSTATE).count()
state_or_territory = (
domains.filter(generic_org_type=DomainRequest.OrganizationChoices.STATE_OR_TERRITORY).distinct().count()
)
tribal = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.TRIBAL).distinct().count()
county = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.COUNTY).distinct().count()
city = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.CITY).distinct().count()
special_district = (
domains.filter(generic_org_type=DomainRequest.OrganizationChoices.SPECIAL_DISTRICT).distinct().count()
)
school_district = (
domains.filter(generic_org_type=DomainRequest.OrganizationChoices.SCHOOL_DISTRICT).distinct().count()
)
election_board = domains.filter(is_election_board=True).distinct().count()
return [
domains_count,
federal,
interstate,
state_or_territory,
tribal,
county,
city,
special_district,
school_district,
election_board,
]
def get_sliced_requests(filter_condition): def get_sliced_requests(filter_condition):
"""Get filtered requests counts sliced by org type and election office.""" """Get filtered requests counts sliced by org type and election office."""
return get_org_type_counts(DomainRequest, filter_condition)
requests = DomainRequest.objects.all().filter(**filter_condition).distinct()
requests_count = requests.count()
federal = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.FEDERAL).distinct().count()
interstate = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.INTERSTATE).distinct().count()
state_or_territory = (
requests.filter(generic_org_type=DomainRequest.OrganizationChoices.STATE_OR_TERRITORY).distinct().count()
)
tribal = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.TRIBAL).distinct().count()
county = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.COUNTY).distinct().count()
city = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.CITY).distinct().count()
special_district = (
requests.filter(generic_org_type=DomainRequest.OrganizationChoices.SPECIAL_DISTRICT).distinct().count()
)
school_district = (
requests.filter(generic_org_type=DomainRequest.OrganizationChoices.SCHOOL_DISTRICT).distinct().count()
)
election_board = requests.filter(is_election_board=True).distinct().count()
def _org_type_count_query_builder(generic_org_type):
return Count(Case(When(generic_org_type=generic_org_type, then=1)))
def get_org_type_counts(model_class, filter_condition):
"""Returns a list of counts for each org type"""
dynamic_count_dict = {}
for choice in DomainRequest.OrganizationChoices:
choice_name = f"{choice}_count"
dynamic_count_dict[choice_name] = _org_type_count_query_builder(choice)
# Static counts
static_count_dict = {
# Count all distinct records
"total_count": Count('id'),
"election_board_count": Count(Case(When(is_election_board=True, then=1))),
}
# Merge static aggregates with dynamic organization type counts
merged_count_dict = {**static_count_dict, **dynamic_count_dict}
# Perform a single query with conditional aggregation
model_queryset = model_class.objects.filter(**filter_condition).distinct()
aggregates = model_queryset.aggregate(**merged_count_dict)
# TODO - automate this
return [ return [
requests_count, aggregates['total_count'], # total count
federal, aggregates['federal_count'], # federal count
interstate, aggregates['interstate_count'], # interstate count
state_or_territory, aggregates['state_or_territory_count'], # state or territory count
tribal, aggregates['tribal_count'], # tribal count
county, aggregates['county_count'], # county count
city, aggregates['city_count'], # city count
special_district, aggregates['special_district_count'], # special district count
school_district, aggregates['school_district_count'], # school district count
election_board, aggregates['election_board_count'], # election board count
] ]
def export_data_managed_domains_to_csv(csv_file, start_date, end_date): def export_data_managed_domains_to_csv(csv_file, start_date, end_date):
"""Get counts for domains that have domain managers for two different dates, """Get counts for domains that have domain managers for two different dates,
get list of managed domains at end_date.""" get list of managed domains at end_date."""