Merge pull request #1956 from cisagov/rjm/1813-metadata

Issue #1813: Add invited users to reports - (staging)
This commit is contained in:
Rachid Mrad 2024-04-04 12:35:28 -04:00 committed by GitHub
commit fd921ea09b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 281 additions and 174 deletions

View file

@ -693,6 +693,24 @@ class MockDb(TestCase):
user=meoward_user, domain=self.domain_12, role=UserDomainRole.Roles.MANAGER
)
_, created = DomainInvitation.objects.get_or_create(
email=meoward_user.email, domain=self.domain_1, status=DomainInvitation.DomainInvitationStatus.RETRIEVED
)
_, created = DomainInvitation.objects.get_or_create(
email="woofwardthethird@rocks.com",
domain=self.domain_1,
status=DomainInvitation.DomainInvitationStatus.INVITED,
)
_, created = DomainInvitation.objects.get_or_create(
email="squeaker@rocks.com", domain=self.domain_2, status=DomainInvitation.DomainInvitationStatus.INVITED
)
_, created = DomainInvitation.objects.get_or_create(
email="squeaker@rocks.com", domain=self.domain_10, status=DomainInvitation.DomainInvitationStatus.INVITED
)
with less_console_noise():
self.domain_request_1 = completed_domain_request(
status=DomainRequest.DomainRequestStatus.STARTED, name="city1.gov"
@ -722,6 +740,7 @@ class MockDb(TestCase):
DomainRequest.objects.all().delete()
User.objects.all().delete()
UserDomainRole.objects.all().delete()
DomainInvitation.objects.all().delete()
def mock_user():

View file

@ -9,10 +9,10 @@ from registrar.utility.csv_export import (
export_data_unmanaged_domains_to_csv,
get_sliced_domains,
get_sliced_requests,
write_domains_csv,
write_csv_for_domains,
get_default_start_date,
get_default_end_date,
write_requests_csv,
write_csv_for_requests,
)
from django.core.management import call_command
@ -242,8 +242,13 @@ class ExportDataTest(MockDb, MockEppLib):
}
self.maxDiff = None
# Call the export functions
write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True
write_csv_for_domains(
writer,
columns,
sort_fields,
filter_condition,
should_get_domain_managers=False,
should_write_header=True,
)
# Reset the CSV file's position to the beginning
@ -268,7 +273,7 @@ class ExportDataTest(MockDb, MockEppLib):
expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip()
self.assertEqual(csv_content, expected_content)
def test_write_domains_csv(self):
def test_write_csv_for_domains(self):
"""Test that write_body returns the
existing domain, test that sort by domain name works,
test that filter works"""
@ -304,8 +309,13 @@ class ExportDataTest(MockDb, MockEppLib):
],
}
# Call the export functions
write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True
write_csv_for_domains(
writer,
columns,
sort_fields,
filter_condition,
should_get_domain_managers=False,
should_write_header=True,
)
# Reset the CSV file's position to the beginning
csv_file.seek(0)
@ -357,8 +367,13 @@ class ExportDataTest(MockDb, MockEppLib):
],
}
# Call the export functions
write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True
write_csv_for_domains(
writer,
columns,
sort_fields,
filter_condition,
should_get_domain_managers=False,
should_write_header=True,
)
# Reset the CSV file's position to the beginning
csv_file.seek(0)
@ -433,20 +448,20 @@ class ExportDataTest(MockDb, MockEppLib):
}
# Call the export functions
write_domains_csv(
write_csv_for_domains(
writer,
columns,
sort_fields,
filter_condition,
get_domain_managers=False,
should_get_domain_managers=False,
should_write_header=True,
)
write_domains_csv(
write_csv_for_domains(
writer,
columns,
sort_fields_for_deleted_domains,
filter_conditions_for_deleted_domains,
get_domain_managers=False,
should_get_domain_managers=False,
should_write_header=False,
)
# Reset the CSV file's position to the beginning
@ -478,7 +493,12 @@ class ExportDataTest(MockDb, MockEppLib):
def test_export_domains_to_writer_domain_managers(self):
"""Test that export_domains_to_writer returns the
expected domain managers."""
expected domain managers.
An invited user, woofwardthethird, should also be pulled into this report.
squeaker@rocks.com is invited to domain2 (DNS_NEEDED) and domain10 (No managers).
She should show twice in this report but not in test_export_data_managed_domains_to_csv."""
with less_console_noise():
# Create a CSV file in memory
@ -508,8 +528,13 @@ class ExportDataTest(MockDb, MockEppLib):
}
self.maxDiff = None
# Call the export functions
write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=True, should_write_header=True
write_csv_for_domains(
writer,
columns,
sort_fields,
filter_condition,
should_get_domain_managers=True,
should_write_header=True,
)
# Reset the CSV file's position to the beginning
@ -521,14 +546,16 @@ class ExportDataTest(MockDb, MockEppLib):
expected_content = (
"Domain name,Status,Expiration date,Domain type,Agency,"
"Organization name,City,State,AO,AO email,"
"Security contact email,Domain manager email 1,Domain manager email 2,Domain manager email 3\n"
"adomain10.gov,Ready,,Federal,Armed Forces Retirement Home,,,, , ,\n"
"adomain2.gov,Dns needed,,Interstate,,,,, , , ,meoward@rocks.com\n"
"cdomain11.govReadyFederal-ExecutiveWorldWarICentennialCommissionmeoward@rocks.com\n"
"Security contact email,Domain manager 1,DM1 status,Domain manager 2,DM2 status,"
"Domain manager 3,DM3 status,Domain manager 4,DM4 status\n"
"adomain10.gov,Ready,,Federal,Armed Forces Retirement Home,,,, , ,squeaker@rocks.com, I\n"
"adomain2.gov,Dns needed,,Interstate,,,,, , , ,meoward@rocks.com, R,squeaker@rocks.com, I\n"
"cdomain11.govReadyFederal-ExecutiveWorldWarICentennialCommissionmeoward@rocks.comR\n"
"cdomain1.gov,Ready,,Federal - Executive,World War I Centennial Commission,,,"
", , , ,meoward@rocks.com,info@example.com,big_lebowski@dude.co\n"
", , , ,meoward@rocks.com,R,info@example.com,R,big_lebowski@dude.co,R,"
"woofwardthethird@rocks.com,I\n"
"ddomain3.gov,On hold,,Federal,Armed Forces Retirement Home,,,, , , ,,\n"
"zdomain12.govReadyInterstatemeoward@rocks.com\n"
"zdomain12.govReadyInterstatemeoward@rocks.comR\n"
)
# Normalize line endings and remove commas,
# spaces and leading/trailing whitespace
@ -538,7 +565,9 @@ class ExportDataTest(MockDb, MockEppLib):
def test_export_data_managed_domains_to_csv(self):
"""Test get counts for domains that have domain managers for two different dates,
get list of managed domains at end_date."""
get list of managed domains at end_date.
An invited user, woofwardthethird, should also be pulled into this report."""
with less_console_noise():
# Create a CSV file in memory
@ -564,10 +593,12 @@ class ExportDataTest(MockDb, MockEppLib):
"Special district,School district,Election office\n"
"3,2,1,0,0,0,0,0,0,2\n"
"\n"
"Domain name,Domain type,Domain manager email 1,Domain manager email 2,Domain manager email 3\n"
"cdomain11.govFederal-Executivemeoward@rocks.com\n"
"cdomain1.gov,Federal - Executive,meoward@rocks.com,info@example.com,big_lebowski@dude.co\n"
"zdomain12.govInterstatemeoward@rocks.com\n"
"Domain name,Domain type,Domain manager 1,DM1 status,Domain manager 2,DM2 status,"
"Domain manager 3,DM3 status,Domain manager 4,DM4 status\n"
"cdomain11.govFederal-Executivemeoward@rocks.com, R\n"
"cdomain1.gov,Federal - Executive,meoward@rocks.com,R,info@example.com,R,"
"big_lebowski@dude.co,R,woofwardthethird@rocks.com,I\n"
"zdomain12.govInterstatemeoward@rocks.com,R\n"
)
# Normalize line endings and remove commas,
@ -642,7 +673,7 @@ class ExportDataTest(MockDb, MockEppLib):
"submission_date__lte": self.end_date,
"submission_date__gte": self.start_date,
}
write_requests_csv(writer, columns, sort_fields, filter_condition, should_write_header=True)
write_csv_for_requests(writer, columns, sort_fields, filter_condition, should_write_header=True)
# Reset the CSV file's position to the beginning
csv_file.seek(0)
# Read the content into a variable
@ -686,13 +717,8 @@ class HelperFunctions(MockDb):
"domain__first_ready__lte": self.end_date,
}
# Test with distinct
managed_domains_sliced_at_end_date = get_sliced_domains(filter_condition, True)
expected_content = [3, 2, 1, 0, 0, 0, 0, 0, 0, 2]
self.assertEqual(managed_domains_sliced_at_end_date, expected_content)
# Test without distinct
managed_domains_sliced_at_end_date = get_sliced_domains(filter_condition)
expected_content = [3, 4, 1, 0, 0, 0, 0, 0, 0, 2]
expected_content = [3, 2, 1, 0, 0, 0, 0, 0, 0, 2]
self.assertEqual(managed_domains_sliced_at_end_date, expected_content)
def test_get_sliced_requests(self):

View file

@ -1,8 +1,8 @@
from collections import Counter
import csv
import logging
from datetime import datetime
from registrar.models.domain import Domain
from registrar.models.domain_invitation import DomainInvitation
from registrar.models.domain_request import DomainRequest
from registrar.models.domain_information import DomainInformation
from django.utils import timezone
@ -11,6 +11,7 @@ from django.db.models import F, Value, CharField
from django.db.models.functions import Concat, Coalesce
from registrar.models.public_contact import PublicContact
from registrar.models.user_domain_role import UserDomainRole
from registrar.utility.enums import DefaultEmail
logger = logging.getLogger(__name__)
@ -33,7 +34,6 @@ def get_domain_infos(filter_condition, sort_fields):
"""
domain_infos = (
DomainInformation.objects.select_related("domain", "authorizing_official")
.prefetch_related("domain__permissions")
.filter(**filter_condition)
.order_by(*sort_fields)
.distinct()
@ -53,7 +53,14 @@ def get_domain_infos(filter_condition, sort_fields):
return domain_infos_cleaned
def parse_domain_row(columns, domain_info: DomainInformation, security_emails_dict=None, get_domain_managers=False):
def parse_row_for_domain(
columns,
domain_info: DomainInformation,
dict_security_emails=None,
should_get_domain_managers=False,
dict_domain_invitations_with_invited_status=None,
dict_user_domain_roles=None,
):
"""Given a set of columns, generate a new row from cleaned column data"""
# Domain should never be none when parsing this information
@ -65,8 +72,8 @@ def parse_domain_row(columns, domain_info: DomainInformation, security_emails_di
# Grab the security email from a preset dictionary.
# If nothing exists in the dictionary, grab from .contacts.
if security_emails_dict is not None and domain.name in security_emails_dict:
_email = security_emails_dict.get(domain.name)
if dict_security_emails is not None and domain.name in dict_security_emails:
_email = dict_security_emails.get(domain.name)
security_email = _email if _email is not None else " "
else:
# If the dictionary doesn't contain that data, lets filter for it manually.
@ -103,13 +110,22 @@ def parse_domain_row(columns, domain_info: DomainInformation, security_emails_di
"Deleted": domain.deleted,
}
if get_domain_managers:
# Get each domain managers email and add to list
dm_emails = [dm.user.email for dm in domain.permissions.all()]
if should_get_domain_managers:
# Get lists of emails for active and invited domain managers
# Set up the "matching header" + row field data
for i, dm_email in enumerate(dm_emails, start=1):
FIELDS[f"Domain manager email {i}"] = dm_email
dms_active_emails = dict_user_domain_roles.get(domain_info.domain.name, [])
dms_invited_emails = dict_domain_invitations_with_invited_status.get(domain_info.domain.name, [])
# Set up the "matching headers" + row field data for email and status
i = 0 # Declare i outside of the loop to avoid a reference before assignment in the second loop
for i, dm_email in enumerate(dms_active_emails, start=1):
FIELDS[f"Domain manager {i}"] = dm_email
FIELDS[f"DM{i} status"] = "R"
# Continue enumeration from where we left off and add data for invited domain managers
for j, dm_email in enumerate(dms_invited_emails, start=i + 1):
FIELDS[f"Domain manager {j}"] = dm_email
FIELDS[f"DM{j} status"] = "I"
row = [FIELDS.get(column, "") for column in columns]
return row
@ -119,7 +135,7 @@ def _get_security_emails(sec_contact_ids):
"""
Retrieve security contact emails for the given security contact IDs.
"""
security_emails_dict = {}
dict_security_emails = {}
public_contacts = (
PublicContact.objects.only("email", "domain__name")
.select_related("domain")
@ -129,65 +145,151 @@ def _get_security_emails(sec_contact_ids):
# Populate a dictionary of domain names and their security contacts
for contact in public_contacts:
domain: Domain = contact.domain
if domain is not None and domain.name not in security_emails_dict:
security_emails_dict[domain.name] = contact.email
if domain is not None and domain.name not in dict_security_emails:
dict_security_emails[domain.name] = contact.email
else:
logger.warning("csv_export -> Domain was none for PublicContact")
return security_emails_dict
return dict_security_emails
def write_domains_csv(
def count_domain_managers(domain_name, dict_domain_invitations_with_invited_status, dict_user_domain_roles):
"""Count active and invited domain managers"""
dms_active = len(dict_user_domain_roles.get(domain_name, []))
dms_invited = len(dict_domain_invitations_with_invited_status.get(domain_name, []))
return dms_active, dms_invited
def update_columns(columns, dms_total, should_update_columns):
"""Update columns if necessary"""
if should_update_columns:
for i in range(1, dms_total + 1):
email_column_header = f"Domain manager {i}"
status_column_header = f"DM{i} status"
if email_column_header not in columns:
columns.append(email_column_header)
columns.append(status_column_header)
should_update_columns = False
return columns, should_update_columns, dms_total
def update_columns_with_domain_managers(
columns,
domain_info,
should_update_columns,
dms_total,
dict_domain_invitations_with_invited_status,
dict_user_domain_roles,
):
"""Helper function to update columns with domain manager information"""
domain_name = domain_info.domain.name
try:
dms_active, dms_invited = count_domain_managers(
domain_name, dict_domain_invitations_with_invited_status, dict_user_domain_roles
)
if dms_active + dms_invited > dms_total:
dms_total = dms_active + dms_invited
should_update_columns = True
except Exception as err:
logger.error(f"Exception while parsing domain managers for reports: {err}")
return update_columns(columns, dms_total, should_update_columns)
def build_dictionaries_for_domain_managers(dict_user_domain_roles, dict_domain_invitations_with_invited_status):
"""Helper function that builds dicts for invited users and active domain
managers. We do so to avoid filtering within loops."""
user_domain_roles = UserDomainRole.objects.all()
# Iterate through each user domain role and populate the dictionary
for user_domain_role in user_domain_roles:
domain_name = user_domain_role.domain.name
email = user_domain_role.user.email
if domain_name not in dict_user_domain_roles:
dict_user_domain_roles[domain_name] = []
dict_user_domain_roles[domain_name].append(email)
domain_invitations_with_invited_status = None
domain_invitations_with_invited_status = DomainInvitation.objects.filter(
status=DomainInvitation.DomainInvitationStatus.INVITED
).select_related("domain")
# Iterate through each domain invitation and populate the dictionary
for invite in domain_invitations_with_invited_status:
domain_name = invite.domain.name
email = invite.email
if domain_name not in dict_domain_invitations_with_invited_status:
dict_domain_invitations_with_invited_status[domain_name] = []
dict_domain_invitations_with_invited_status[domain_name].append(email)
return dict_user_domain_roles, dict_domain_invitations_with_invited_status
def write_csv_for_domains(
writer,
columns,
sort_fields,
filter_condition,
get_domain_managers=False,
should_get_domain_managers=False,
should_write_header=True,
):
"""
Receives params from the parent methods and outputs a CSV with filtered and sorted domains.
Works with write_header as long as the same writer object is passed.
get_domain_managers: Conditional bc we only use domain manager info for export_data_full_to_csv
should_get_domain_managers: Conditional bc we only use domain manager info for export_data_full_to_csv
should_write_header: Conditional bc export_data_domain_growth_to_csv calls write_body twice
"""
# Retrieve domain information and all sec emails
all_domain_infos = get_domain_infos(filter_condition, sort_fields)
# Store all security emails to avoid epp calls or excessive filters
sec_contact_ids = all_domain_infos.values_list("domain__security_contact_registry_id", flat=True)
security_emails_dict = _get_security_emails(sec_contact_ids)
# Reduce the memory overhead when performing the write operation
dict_security_emails = _get_security_emails(sec_contact_ids)
paginator = Paginator(all_domain_infos, 1000)
# The maximum amount of domain managers an account has
# We get the max so we can set the column header accurately
max_dm_count = 0
# Initialize variables
dms_total = 0
should_update_columns = False
total_body_rows = []
dict_user_domain_roles = {}
dict_domain_invitations_with_invited_status = {}
# Build dictionaries if necessary
if should_get_domain_managers:
dict_user_domain_roles, dict_domain_invitations_with_invited_status = build_dictionaries_for_domain_managers(
dict_user_domain_roles, dict_domain_invitations_with_invited_status
)
# Process domain information
for page_num in paginator.page_range:
rows = []
page = paginator.page(page_num)
for domain_info in page.object_list:
# Get count of all the domain managers for an account
if get_domain_managers:
dm_count = domain_info.domain.permissions.count()
if dm_count > max_dm_count:
max_dm_count = dm_count
for i in range(1, max_dm_count + 1):
column_name = f"Domain manager email {i}"
if column_name not in columns:
columns.append(column_name)
if should_get_domain_managers:
columns, dms_total, should_update_columns = update_columns_with_domain_managers(
columns,
domain_info,
should_update_columns,
dms_total,
dict_domain_invitations_with_invited_status,
dict_user_domain_roles,
)
try:
row = parse_domain_row(columns, domain_info, security_emails_dict, get_domain_managers)
row = parse_row_for_domain(
columns,
domain_info,
dict_security_emails,
should_get_domain_managers,
dict_domain_invitations_with_invited_status,
dict_user_domain_roles,
)
rows.append(row)
except ValueError:
# This should not happen. If it does, just skip this row.
# It indicates that DomainInformation.domain is None.
logger.error("csv_export -> Error when parsing row, domain was None")
continue
total_body_rows.extend(rows)
@ -208,7 +310,7 @@ def get_requests(filter_condition, sort_fields):
return requests
def parse_request_row(columns, request: DomainRequest):
def parse_row_for_requests(columns, request: DomainRequest):
"""Given a set of columns, generate a new row from cleaned column data"""
requested_domain_name = "No requested domain"
@ -240,7 +342,7 @@ def parse_request_row(columns, request: DomainRequest):
return row
def write_requests_csv(
def write_csv_for_requests(
writer,
columns,
sort_fields,
@ -261,7 +363,7 @@ def write_requests_csv(
rows = []
for request in page.object_list:
try:
row = parse_request_row(columns, request)
row = parse_row_for_requests(columns, request)
rows.append(row)
except ValueError:
# This should not happen. If it does, just skip this row.
@ -309,8 +411,8 @@ def export_data_type_to_csv(csv_file):
Domain.State.ON_HOLD,
],
}
write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=True, should_write_header=True
write_csv_for_domains(
writer, columns, sort_fields, filter_condition, should_get_domain_managers=True, should_write_header=True
)
@ -342,8 +444,8 @@ def export_data_full_to_csv(csv_file):
Domain.State.ON_HOLD,
],
}
write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True
write_csv_for_domains(
writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True
)
@ -376,8 +478,8 @@ def export_data_federal_to_csv(csv_file):
Domain.State.ON_HOLD,
],
}
write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True
write_csv_for_domains(
writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True
)
@ -446,77 +548,42 @@ def export_data_domain_growth_to_csv(csv_file, start_date, end_date):
"domain__deleted__gte": start_date_formatted,
}
write_domains_csv(
writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True
write_csv_for_domains(
writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True
)
write_domains_csv(
write_csv_for_domains(
writer,
columns,
sort_fields_for_deleted_domains,
filter_condition_for_deleted_domains,
get_domain_managers=False,
should_get_domain_managers=False,
should_write_header=False,
)
def get_sliced_domains(filter_condition, distinct=False):
def get_sliced_domains(filter_condition):
"""Get filtered domains counts sliced by org type and election office.
Pass distinct=True when filtering by permissions so we do not to count multiples
when a domain has more that one manager.
"""
# Round trip 1: Get distinct domain names based on filter condition
domains_count = DomainInformation.objects.filter(**filter_condition).distinct().count()
# Round trip 2: Get counts for other slices
# This will require either 8 filterd and distinct DB round trips,
# or 2 DB round trips plus iteration on domain_permissions for each domain
if distinct:
generic_org_types_query = DomainInformation.objects.filter(**filter_condition).values_list(
"domain_id", "generic_org_type"
domains = DomainInformation.objects.all().filter(**filter_condition).distinct()
domains_count = domains.count()
federal = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.FEDERAL).distinct().count()
interstate = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.INTERSTATE).count()
state_or_territory = (
domains.filter(generic_org_type=DomainRequest.OrganizationChoices.STATE_OR_TERRITORY).distinct().count()
)
# Initialize Counter to store counts for each generic_org_type
generic_org_type_counts = Counter()
# Keep track of domains already counted
domains_counted = set()
# Iterate over distinct domains
for domain_id, generic_org_type in generic_org_types_query:
# Check if the domain has already been counted
if domain_id in domains_counted:
continue
# Get all permissions for the current domain
domain_permissions = DomainInformation.objects.filter(domain_id=domain_id, **filter_condition).values_list(
"domain__permissions", flat=True
tribal = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.TRIBAL).distinct().count()
county = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.COUNTY).distinct().count()
city = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.CITY).distinct().count()
special_district = (
domains.filter(generic_org_type=DomainRequest.OrganizationChoices.SPECIAL_DISTRICT).distinct().count()
)
# Check if the domain has multiple permissions
if len(domain_permissions) > 0:
# Mark the domain as counted
domains_counted.add(domain_id)
# Increment the count for the corresponding generic_org_type
generic_org_type_counts[generic_org_type] += 1
else:
generic_org_types_query = DomainInformation.objects.filter(**filter_condition).values_list(
"generic_org_type", flat=True
school_district = (
domains.filter(generic_org_type=DomainRequest.OrganizationChoices.SCHOOL_DISTRICT).distinct().count()
)
generic_org_type_counts = Counter(generic_org_types_query)
# Extract counts for each generic_org_type
federal = generic_org_type_counts.get(DomainRequest.OrganizationChoices.FEDERAL, 0)
interstate = generic_org_type_counts.get(DomainRequest.OrganizationChoices.INTERSTATE, 0)
state_or_territory = generic_org_type_counts.get(DomainRequest.OrganizationChoices.STATE_OR_TERRITORY, 0)
tribal = generic_org_type_counts.get(DomainRequest.OrganizationChoices.TRIBAL, 0)
county = generic_org_type_counts.get(DomainRequest.OrganizationChoices.COUNTY, 0)
city = generic_org_type_counts.get(DomainRequest.OrganizationChoices.CITY, 0)
special_district = generic_org_type_counts.get(DomainRequest.OrganizationChoices.SPECIAL_DISTRICT, 0)
school_district = generic_org_type_counts.get(DomainRequest.OrganizationChoices.SCHOOL_DISTRICT, 0)
# Round trip 3
election_board = DomainInformation.objects.filter(is_election_board=True, **filter_condition).distinct().count()
election_board = domains.filter(is_election_board=True).distinct().count()
return [
domains_count,
@ -535,26 +602,23 @@ def get_sliced_domains(filter_condition, distinct=False):
def get_sliced_requests(filter_condition):
"""Get filtered requests counts sliced by org type and election office."""
# Round trip 1: Get distinct requests based on filter condition
requests_count = DomainRequest.objects.filter(**filter_condition).distinct().count()
# Round trip 2: Get counts for other slices
generic_org_types_query = DomainRequest.objects.filter(**filter_condition).values_list(
"generic_org_type", flat=True
requests = DomainRequest.objects.all().filter(**filter_condition).distinct()
requests_count = requests.count()
federal = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.FEDERAL).distinct().count()
interstate = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.INTERSTATE).distinct().count()
state_or_territory = (
requests.filter(generic_org_type=DomainRequest.OrganizationChoices.STATE_OR_TERRITORY).distinct().count()
)
generic_org_type_counts = Counter(generic_org_types_query)
federal = generic_org_type_counts.get(DomainRequest.OrganizationChoices.FEDERAL, 0)
interstate = generic_org_type_counts.get(DomainRequest.OrganizationChoices.INTERSTATE, 0)
state_or_territory = generic_org_type_counts.get(DomainRequest.OrganizationChoices.STATE_OR_TERRITORY, 0)
tribal = generic_org_type_counts.get(DomainRequest.OrganizationChoices.TRIBAL, 0)
county = generic_org_type_counts.get(DomainRequest.OrganizationChoices.COUNTY, 0)
city = generic_org_type_counts.get(DomainRequest.OrganizationChoices.CITY, 0)
special_district = generic_org_type_counts.get(DomainRequest.OrganizationChoices.SPECIAL_DISTRICT, 0)
school_district = generic_org_type_counts.get(DomainRequest.OrganizationChoices.SCHOOL_DISTRICT, 0)
# Round trip 3
election_board = DomainRequest.objects.filter(is_election_board=True, **filter_condition).distinct().count()
tribal = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.TRIBAL).distinct().count()
county = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.COUNTY).distinct().count()
city = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.CITY).distinct().count()
special_district = (
requests.filter(generic_org_type=DomainRequest.OrganizationChoices.SPECIAL_DISTRICT).distinct().count()
)
school_district = (
requests.filter(generic_org_type=DomainRequest.OrganizationChoices.SCHOOL_DISTRICT).distinct().count()
)
election_board = requests.filter(is_election_board=True).distinct().count()
return [
requests_count,
@ -588,7 +652,7 @@ def export_data_managed_domains_to_csv(csv_file, start_date, end_date):
"domain__permissions__isnull": False,
"domain__first_ready__lte": start_date_formatted,
}
managed_domains_sliced_at_start_date = get_sliced_domains(filter_managed_domains_start_date, True)
managed_domains_sliced_at_start_date = get_sliced_domains(filter_managed_domains_start_date)
writer.writerow(["MANAGED DOMAINS COUNTS AT START DATE"])
writer.writerow(
@ -612,7 +676,7 @@ def export_data_managed_domains_to_csv(csv_file, start_date, end_date):
"domain__permissions__isnull": False,
"domain__first_ready__lte": end_date_formatted,
}
managed_domains_sliced_at_end_date = get_sliced_domains(filter_managed_domains_end_date, True)
managed_domains_sliced_at_end_date = get_sliced_domains(filter_managed_domains_end_date)
writer.writerow(["MANAGED DOMAINS COUNTS AT END DATE"])
writer.writerow(
@ -632,12 +696,12 @@ def export_data_managed_domains_to_csv(csv_file, start_date, end_date):
writer.writerow(managed_domains_sliced_at_end_date)
writer.writerow([])
write_domains_csv(
write_csv_for_domains(
writer,
columns,
sort_fields,
filter_managed_domains_end_date,
get_domain_managers=True,
should_get_domain_managers=True,
should_write_header=True,
)
@ -661,7 +725,7 @@ def export_data_unmanaged_domains_to_csv(csv_file, start_date, end_date):
"domain__permissions__isnull": True,
"domain__first_ready__lte": start_date_formatted,
}
unmanaged_domains_sliced_at_start_date = get_sliced_domains(filter_unmanaged_domains_start_date, True)
unmanaged_domains_sliced_at_start_date = get_sliced_domains(filter_unmanaged_domains_start_date)
writer.writerow(["UNMANAGED DOMAINS AT START DATE"])
writer.writerow(
@ -685,7 +749,7 @@ def export_data_unmanaged_domains_to_csv(csv_file, start_date, end_date):
"domain__permissions__isnull": True,
"domain__first_ready__lte": end_date_formatted,
}
unmanaged_domains_sliced_at_end_date = get_sliced_domains(filter_unmanaged_domains_end_date, True)
unmanaged_domains_sliced_at_end_date = get_sliced_domains(filter_unmanaged_domains_end_date)
writer.writerow(["UNMANAGED DOMAINS AT END DATE"])
writer.writerow(
@ -705,12 +769,12 @@ def export_data_unmanaged_domains_to_csv(csv_file, start_date, end_date):
writer.writerow(unmanaged_domains_sliced_at_end_date)
writer.writerow([])
write_domains_csv(
write_csv_for_domains(
writer,
columns,
sort_fields,
filter_unmanaged_domains_end_date,
get_domain_managers=False,
should_get_domain_managers=False,
should_write_header=True,
)
@ -741,4 +805,4 @@ def export_data_requests_growth_to_csv(csv_file, start_date, end_date):
"submission_date__gte": start_date_formatted,
}
write_requests_csv(writer, columns, sort_fields, filter_condition, should_write_header=True)
write_csv_for_requests(writer, columns, sort_fields, filter_condition, should_write_header=True)

View file

@ -49,8 +49,8 @@ class AnalyticsView(View):
"domain__permissions__isnull": False,
"domain__first_ready__lte": end_date_formatted,
}
managed_domains_sliced_at_start_date = csv_export.get_sliced_domains(filter_managed_domains_start_date, True)
managed_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_managed_domains_end_date, True)
managed_domains_sliced_at_start_date = csv_export.get_sliced_domains(filter_managed_domains_start_date)
managed_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_managed_domains_end_date)
filter_unmanaged_domains_start_date = {
"domain__permissions__isnull": True,
@ -60,10 +60,8 @@ class AnalyticsView(View):
"domain__permissions__isnull": True,
"domain__first_ready__lte": end_date_formatted,
}
unmanaged_domains_sliced_at_start_date = csv_export.get_sliced_domains(
filter_unmanaged_domains_start_date, True
)
unmanaged_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_unmanaged_domains_end_date, True)
unmanaged_domains_sliced_at_start_date = csv_export.get_sliced_domains(filter_unmanaged_domains_start_date)
unmanaged_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_unmanaged_domains_end_date)
filter_ready_domains_start_date = {
"domain__state__in": [models.Domain.State.READY],