diff --git a/src/registrar/management/commands/generate_current_federal_report.py b/src/registrar/management/commands/generate_current_federal_report.py index 6516bf99b..97d4fd7e4 100644 --- a/src/registrar/management/commands/generate_current_federal_report.py +++ b/src/registrar/management/commands/generate_current_federal_report.py @@ -50,7 +50,7 @@ class Command(BaseCommand): # Generate a file locally for upload with open(file_path, "w") as file: - csv_export.export_data_federal_to_csv(file) + csv_export.DomainDataFederal.export_data_to_csv(file) if check_path and not os.path.exists(file_path): raise FileNotFoundError(f"Could not find newly created file at '{file_path}'") diff --git a/src/registrar/management/commands/generate_current_full_report.py b/src/registrar/management/commands/generate_current_full_report.py index be810ee10..4bcb9f502 100644 --- a/src/registrar/management/commands/generate_current_full_report.py +++ b/src/registrar/management/commands/generate_current_full_report.py @@ -49,7 +49,7 @@ class Command(BaseCommand): # Generate a file locally for upload with open(file_path, "w") as file: - csv_export.export_data_full_to_csv(file) + csv_export.DomainDataFull.export_data_to_csv(file) if check_path and not os.path.exists(file_path): raise FileNotFoundError(f"Could not find newly created file at '{file_path}'") diff --git a/src/registrar/models/domain.py b/src/registrar/models/domain.py index 767227499..7fdc56971 100644 --- a/src/registrar/models/domain.py +++ b/src/registrar/models/domain.py @@ -151,6 +151,11 @@ class Domain(TimeStampedModel, DomainHelper): # previously existed but has been deleted from the registry DELETED = "deleted", "Deleted" + @classmethod + def get_state_label(cls, state: str): + """Returns the associated label for a given state value""" + return cls(state).label if state else None + @classmethod def get_help_text(cls, state) -> str: """Returns a help message for a desired state. If none is found, an empty string is returned""" diff --git a/src/registrar/tests/test_reports.py b/src/registrar/tests/test_reports.py index 45600cb01..ded04e31b 100644 --- a/src/registrar/tests/test_reports.py +++ b/src/registrar/tests/test_reports.py @@ -1,21 +1,23 @@ -import csv import io from django.test import Client, RequestFactory from io import StringIO from registrar.models.domain_request import DomainRequest from registrar.models.domain import Domain -from registrar.models.utility.generic_helper import convert_queryset_to_dict from registrar.utility.csv_export import ( - export_data_managed_domains_to_csv, - export_data_unmanaged_domains_to_csv, - get_sliced_domains, - get_sliced_requests, - write_csv_for_domains, + DomainDataFull, + DomainDataType, + DomainDataFederal, + DomainGrowth, + DomainManaged, + DomainUnmanaged, + DomainExport, + DomainRequestExport, + DomainRequestGrowth, + DomainRequestDataFull, get_default_start_date, get_default_end_date, - DomainRequestExport, ) - +from django.db.models import Case, When from django.core.management import call_command from unittest.mock import MagicMock, call, mock_open, patch from api.views import get_current_federal, get_current_full @@ -45,10 +47,10 @@ class CsvReportsTest(MockDb): fake_open = mock_open() expected_file_content = [ call("Domain name,Domain type,Agency,Organization name,City,State,Security contact email\r\n"), - call("cdomain11.gov,Federal - Executive,World War I Centennial Commission,,,, \r\n"), - call("cdomain1.gov,Federal - Executive,World War I Centennial Commission,,,, \r\n"), - call("adomain10.gov,Federal,Armed Forces Retirement Home,,,, \r\n"), - call("ddomain3.gov,Federal,Armed Forces Retirement Home,,,, \r\n"), + call("cdomain11.gov,Federal - Executive,World War I Centennial Commission,,,,\r\n"), + call("cdomain1.gov,Federal - Executive,World War I Centennial Commission,,,,\r\n"), + call("adomain10.gov,Federal,Armed Forces Retirement Home,,,,\r\n"), + call("ddomain3.gov,Federal,Armed Forces Retirement Home,,,,\r\n"), ] # We don't actually want to write anything for a test case, # we just want to verify what is being written. @@ -67,11 +69,12 @@ class CsvReportsTest(MockDb): fake_open = mock_open() expected_file_content = [ call("Domain name,Domain type,Agency,Organization name,City,State,Security contact email\r\n"), - call("cdomain11.gov,Federal - Executive,World War I Centennial Commission,,,, \r\n"), - call("cdomain1.gov,Federal - Executive,World War I Centennial Commission,,,, \r\n"), - call("adomain10.gov,Federal,Armed Forces Retirement Home,,,, \r\n"), - call("ddomain3.gov,Federal,Armed Forces Retirement Home,,,, \r\n"), - call("adomain2.gov,Interstate,,,,, \r\n"), + call("cdomain11.gov,Federal - Executive,World War I Centennial Commission,,,,\r\n"), + call("cdomain1.gov,Federal - Executive,World War I Centennial Commission,,,,\r\n"), + call("adomain10.gov,Federal,Armed Forces Retirement Home,,,,\r\n"), + call("ddomain3.gov,Federal,Armed Forces Retirement Home,,,,\r\n"), + call("adomain2.gov,Interstate,,,,,\r\n"), + call("zdomain12.gov,Interstate,,,,,\r\n"), ] # We don't actually want to write anything for a test case, # we just want to verify what is being written. @@ -202,494 +205,299 @@ class ExportDataTest(MockDb, MockEppLib): def tearDown(self): super().tearDown() - def test_export_domains_to_writer_security_emails_and_first_ready(self): - """Test that export_domains_to_writer returns the - expected security email and first_ready value""" + @less_console_noise_decorator + def test_domain_data_type(self): + """Shows security contacts, domain managers, so""" + # Add security email information + self.domain_1.name = "defaultsecurity.gov" + self.domain_1.save() + # Invoke setter + self.domain_1.security_contact + # Invoke setter + self.domain_2.security_contact + # Invoke setter + self.domain_3.security_contact + # Add a first ready date on the first domain. Leaving the others blank. + self.domain_1.first_ready = get_default_start_date() + self.domain_1.save() + # Create a CSV file in memory + csv_file = StringIO() + # Call the export functions + DomainDataType.export_data_to_csv(csv_file) + # Reset the CSV file's position to the beginning + csv_file.seek(0) + # Read the content into a variable + csv_content = csv_file.read() + # We expect READY domains, + # sorted alphabetially by domain name + expected_content = ( + "Domain name,Status,First ready on,Expiration date,Domain type,Agency,Organization name,City,State,SO," + "SO email,Security contact email,Domain managers,Invited domain managers\n" + "cdomain11.gov,Ready,2024-04-02,(blank),Federal - Executive,World War I Centennial Commission,,,, ,,," + "meoward@rocks.com,\n" + "defaultsecurity.gov,Ready,2023-11-01,(blank),Federal - Executive,World War I Centennial Commission,,," + ', ,,dotgov@cisa.dhs.gov,"meoward@rocks.com, info@example.com, big_lebowski@dude.co",' + "woofwardthethird@rocks.com\n" + "adomain10.gov,Ready,2024-04-03,(blank),Federal,Armed Forces Retirement Home,,,, ,,,," + "squeaker@rocks.com\n" + "bdomain4.gov,Unknown,(blank),(blank),Federal,Armed Forces Retirement Home,,,, ,,,,\n" + "bdomain5.gov,Deleted,(blank),(blank),Federal,Armed Forces Retirement Home,,,, ,,,,\n" + "bdomain6.gov,Deleted,(blank),(blank),Federal,Armed Forces Retirement Home,,,, ,,,,\n" + "ddomain3.gov,On hold,(blank),2023-11-15,Federal,Armed Forces Retirement Home,,,, ,," + "security@mail.gov,,\n" + "sdomain8.gov,Deleted,(blank),(blank),Federal,Armed Forces Retirement Home,,,, ,,,,\n" + "xdomain7.gov,Deleted,(blank),(blank),Federal,Armed Forces Retirement Home,,,, ,,,,\n" + "zdomain9.gov,Deleted,(blank),(blank),Federal,Armed Forces Retirement Home,,,, ,,,,\n" + "adomain2.gov,Dns needed,(blank),(blank),Interstate,,,,, ,,registrar@dotgov.gov," + "meoward@rocks.com,squeaker@rocks.com\n" + "zdomain12.gov,Ready,2024-04-02,(blank),Interstate,,,,, ,,,meoward@rocks.com,\n" + ) + # Normalize line endings and remove commas, + # spaces and leading/trailing whitespace + csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() + expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() + self.assertEqual(csv_content, expected_content) - with less_console_noise(): - # Add security email information - self.domain_1.name = "defaultsecurity.gov" - self.domain_1.save() - # Invoke setter - self.domain_1.security_contact - # Invoke setter - self.domain_2.security_contact - # Invoke setter - self.domain_3.security_contact + @less_console_noise_decorator + def test_domain_data_full(self): + """Shows security contacts, filtered by state""" + # Add security email information + self.domain_1.name = "defaultsecurity.gov" + self.domain_1.save() + # Invoke setter + self.domain_1.security_contact + # Invoke setter + self.domain_2.security_contact + # Invoke setter + self.domain_3.security_contact + # Add a first ready date on the first domain. Leaving the others blank. + self.domain_1.first_ready = get_default_start_date() + self.domain_1.save() + # Create a CSV file in memory + csv_file = StringIO() + # Call the export functions + DomainDataFull.export_data_to_csv(csv_file) + # Reset the CSV file's position to the beginning + csv_file.seek(0) + # Read the content into a variable + csv_content = csv_file.read() + # We expect READY domains, + # sorted alphabetially by domain name + expected_content = ( + "Domain name,Domain type,Agency,Organization name,City,State,Security contact email\n" + "cdomain11.gov,Federal - Executive,World War I Centennial Commission,,,,\n" + "defaultsecurity.gov,Federal - Executive,World War I Centennial Commission,,,,dotgov@cisa.dhs.gov\n" + "adomain10.gov,Federal,Armed Forces Retirement Home,,,,\n" + "ddomain3.gov,Federal,Armed Forces Retirement Home,,,,security@mail.gov\n" + "adomain2.gov,Interstate,,,,,registrar@dotgov.gov\n" + "zdomain12.gov,Interstate,,,,,\n" + ) + # Normalize line endings and remove commas, + # spaces and leading/trailing whitespace + csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() + expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() + self.assertEqual(csv_content, expected_content) - # Add a first ready date on the first domain. Leaving the others blank. - self.domain_1.first_ready = get_default_start_date() - self.domain_1.save() + @less_console_noise_decorator + def test_domain_data_federal(self): + """Shows security contacts, filtered by state and org type""" + # Add security email information + self.domain_1.name = "defaultsecurity.gov" + self.domain_1.save() + # Invoke setter + self.domain_1.security_contact + # Invoke setter + self.domain_2.security_contact + # Invoke setter + self.domain_3.security_contact + # Add a first ready date on the first domain. Leaving the others blank. + self.domain_1.first_ready = get_default_start_date() + self.domain_1.save() + # Create a CSV file in memory + csv_file = StringIO() + # Call the export functions + DomainDataFederal.export_data_to_csv(csv_file) + # Reset the CSV file's position to the beginning + csv_file.seek(0) + # Read the content into a variable + csv_content = csv_file.read() + # We expect READY domains, + # sorted alphabetially by domain name + expected_content = ( + "Domain name,Domain type,Agency,Organization name,City,State,Security contact email\n" + "cdomain11.gov,Federal - Executive,World War I Centennial Commission,,,,\n" + "defaultsecurity.gov,Federal - Executive,World War I Centennial Commission,,,,dotgov@cisa.dhs.gov\n" + "adomain10.gov,Federal,Armed Forces Retirement Home,,,,\n" + "ddomain3.gov,Federal,Armed Forces Retirement Home,,,,security@mail.gov\n" + ) + # Normalize line endings and remove commas, + # spaces and leading/trailing whitespace + csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() + expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() + self.assertEqual(csv_content, expected_content) - # Create a CSV file in memory - csv_file = StringIO() - writer = csv.writer(csv_file) - # Define columns, sort fields, and filter condition - columns = [ - "Domain name", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "SO", - "SO email", - "Security contact email", - "Status", - "Expiration date", - "First ready on", - ] - sort_fields = ["domain__name"] - filter_condition = { - "domain__state__in": [ - Domain.State.READY, - Domain.State.DNS_NEEDED, - Domain.State.ON_HOLD, - ], - } - - # Call the export functions - write_csv_for_domains( - writer, - columns, - sort_fields, - filter_condition, - should_get_domain_managers=False, - should_write_header=True, + @less_console_noise_decorator + def test_domain_growth(self): + """Shows ready and deleted domains within a date range, sorted""" + # Remove "Created at" and "First ready" because we can't guess this immutable, dynamically generated test data + columns = [ + "Domain name", + "Domain type", + "Agency", + "Organization name", + "City", + "State", + "Status", + "Expiration date", + # "Created at", + # "First ready", + "Deleted", + ] + sort = { + "custom_sort": Case( + When(domain__state=Domain.State.READY, then="domain__created_at"), + When(domain__state=Domain.State.DELETED, then="domain__deleted"), ) + } + with patch("registrar.utility.csv_export.DomainGrowth.get_columns", return_value=columns): + with patch("registrar.utility.csv_export.DomainGrowth.get_annotations_for_sort", return_value=sort): + # Create a CSV file in memory + csv_file = StringIO() + # Call the export functions + DomainGrowth.export_data_to_csv( + csv_file, + self.start_date.strftime("%Y-%m-%d"), + self.end_date.strftime("%Y-%m-%d"), + ) + # Reset the CSV file's position to the beginning + csv_file.seek(0) + # Read the content into a variable + csv_content = csv_file.read() + # We expect READY domains first, created between day-2 and day+2, sorted by created_at then name + # and DELETED domains deleted between day-2 and day+2, sorted by deleted then name + expected_content = ( + "Domain name,Domain type,Agency,Organization name,City," + "State,Status,Expiration date, Deleted\n" + "cdomain1.gov,Federal-Executive,World War I Centennial Commission,,,,Ready,(blank)\n" + "adomain10.gov,Federal,Armed Forces Retirement Home,,,,Ready,(blank)\n" + "cdomain11.govFederal-ExecutiveWorldWarICentennialCommissionReady(blank)\n" + "zdomain12.govInterstateReady(blank)\n" + "zdomain9.gov,Federal,ArmedForcesRetirementHome,Deleted,(blank),2024-04-01\n" + "sdomain8.gov,Federal,Armed Forces Retirement Home,,,,Deleted,(blank),2024-04-02\n" + "xdomain7.gov,FederalArmedForcesRetirementHome,Deleted,(blank),2024-04-02\n" + ) + # Normalize line endings and remove commas, + # spaces and leading/trailing whitespace + csv_content = ( + csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() + ) + expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() + self.assertEqual(csv_content, expected_content) - # Reset the CSV file's position to the beginning - csv_file.seek(0) - # Read the content into a variable - csv_content = csv_file.read() - # We expect READY domains, - # sorted alphabetially by domain name - expected_content = ( - "Domain name,Domain type,Agency,Organization name,City,State,SO," - "SO email,Security contact email,Status,Expiration date, First ready on\n" - "adomain10.gov,Federal,Armed Forces Retirement Home,Ready,(blank),2024-04-03\n" - "adomain2.gov,Interstate,(blank),Dns needed,(blank),(blank)\n" - "cdomain11.gov,Federal-Executive,WorldWarICentennialCommission,Ready,(blank),2024-04-02\n" - "ddomain3.gov,Federal,Armed Forces Retirement Home,security@mail.gov,On hold,2023-11-15,(blank)\n" - "defaultsecurity.gov,Federal - Executive,World War I Centennial Commission," - "(blank),Ready,(blank),2023-11-01\n" - "zdomain12.govInterstateReady,(blank),2024-04-02\n" - ) - # Normalize line endings and remove commas, - # spaces and leading/trailing whitespace - csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() - expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() - self.assertEqual(csv_content, expected_content) - - def test_write_csv_for_domains(self): - """Test that write_body returns the - existing domain, test that sort by domain name works, - test that filter works""" - - with less_console_noise(): - # Create a CSV file in memory - csv_file = StringIO() - writer = csv.writer(csv_file) - - # Define columns, sort fields, and filter condition - columns = [ - "Domain name", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "SO", - "SO email", - "Submitter", - "Submitter title", - "Submitter email", - "Submitter phone", - "Security contact email", - "Status", - ] - sort_fields = ["domain__name"] - filter_condition = { - "domain__state__in": [ - Domain.State.READY, - Domain.State.DNS_NEEDED, - Domain.State.ON_HOLD, - ], - } - # Call the export functions - write_csv_for_domains( - writer, - columns, - sort_fields, - filter_condition, - should_get_domain_managers=False, - should_write_header=True, - ) - # Reset the CSV file's position to the beginning - csv_file.seek(0) - # Read the content into a variable - csv_content = csv_file.read() - # We expect READY domains, - # sorted alphabetially by domain name - expected_content = ( - "Domain name,Domain type,Agency,Organization name,City,State,SO," - "SO email,Submitter,Submitter title,Submitter email,Submitter phone," - "Security contact email,Status\n" - "adomain10.gov,Federal,Armed Forces Retirement Home,Ready\n" - "adomain2.gov,Interstate,Dns needed\n" - "cdomain11.govFederal-ExecutiveWorldWarICentennialCommissionReady\n" - "cdomain1.gov,Federal - Executive,World War I Centennial Commission,Ready\n" - "ddomain3.gov,Federal,Armed Forces Retirement Home,On hold\n" - "zdomain12.govInterstateReady\n" - ) - # Normalize line endings and remove commas, - # spaces and leading/trailing whitespace - csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() - expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() - self.assertEqual(csv_content, expected_content) - - def test_write_domains_body_additional(self): - """An additional test for filters and multi-column sort""" - - with less_console_noise(): - # Create a CSV file in memory - csv_file = StringIO() - writer = csv.writer(csv_file) - # Define columns, sort fields, and filter condition - columns = [ - "Domain name", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "Security contact email", - ] - sort_fields = ["domain__name", "federal_agency", "generic_org_type"] - filter_condition = { - "generic_org_type__icontains": "federal", - "domain__state__in": [ - Domain.State.READY, - Domain.State.DNS_NEEDED, - Domain.State.ON_HOLD, - ], - } - # Call the export functions - write_csv_for_domains( - writer, - columns, - sort_fields, - filter_condition, - should_get_domain_managers=False, - should_write_header=True, - ) - # Reset the CSV file's position to the beginning - csv_file.seek(0) - # Read the content into a variable - csv_content = csv_file.read() - # We expect READY domains, - # federal only - # sorted alphabetially by domain name - expected_content = ( - "Domain name,Domain type,Agency,Organization name,City," - "State,Security contact email\n" - "adomain10.gov,Federal,Armed Forces Retirement Home\n" - "cdomain11.govFederal-ExecutiveWorldWarICentennialCommission\n" - "cdomain1.gov,Federal - Executive,World War I Centennial Commission\n" - "ddomain3.gov,Federal,Armed Forces Retirement Home\n" - ) - # Normalize line endings and remove commas, - # spaces and leading/trailing whitespace - csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() - expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() - self.assertEqual(csv_content, expected_content) - - def test_write_domains_body_with_date_filter_pulls_domains_in_range(self): - """Test that domains that are - 1. READY and their first_ready dates are in range - 2. DELETED and their deleted dates are in range - are pulled when the growth report conditions are applied to export_domains_to_writed. - Test that ready domains are sorted by first_ready/deleted dates first, names second. - - We considered testing export_data_domain_growth_to_csv which calls write_body - and would have been easy to set up, but expected_content would contain created_at dates - which are hard to mock. - - TODO: Simplify if created_at is not needed for the report.""" - - with less_console_noise(): - # Create a CSV file in memory - csv_file = StringIO() - writer = csv.writer(csv_file) - # Define columns, sort fields, and filter condition - columns = [ - "Domain name", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "Status", - "Expiration date", - ] - sort_fields = [ - "created_at", - "domain__name", - ] - sort_fields_for_deleted_domains = [ - "domain__deleted", - "domain__name", - ] - filter_condition = { - "domain__state__in": [ - Domain.State.READY, - ], - "domain__first_ready__lte": self.end_date, - "domain__first_ready__gte": self.start_date, - } - filter_conditions_for_deleted_domains = { - "domain__state__in": [ - Domain.State.DELETED, - ], - "domain__deleted__lte": self.end_date, - "domain__deleted__gte": self.start_date, - } - - # Call the export functions - write_csv_for_domains( - writer, - columns, - sort_fields, - filter_condition, - should_get_domain_managers=False, - should_write_header=True, - ) - write_csv_for_domains( - writer, - columns, - sort_fields_for_deleted_domains, - filter_conditions_for_deleted_domains, - should_get_domain_managers=False, - should_write_header=False, - ) - # Reset the CSV file's position to the beginning - csv_file.seek(0) - - # Read the content into a variable - csv_content = csv_file.read() - - # We expect READY domains first, created between day-2 and day+2, sorted by created_at then name - # and DELETED domains deleted between day-2 and day+2, sorted by deleted then name - expected_content = ( - "Domain name,Domain type,Agency,Organization name,City," - "State,Status,Expiration date\n" - "cdomain1.gov,Federal-Executive,World War I Centennial Commission,,,,Ready,(blank)\n" - "adomain10.gov,Federal,Armed Forces Retirement Home,,,,Ready,(blank)\n" - "cdomain11.govFederal-ExecutiveWorldWarICentennialCommissionReady(blank)\n" - "zdomain12.govInterstateReady(blank)\n" - "zdomain9.gov,Federal,ArmedForcesRetirementHome,Deleted,(blank)\n" - "sdomain8.gov,Federal,Armed Forces Retirement Home,,,,Deleted,(blank)\n" - "xdomain7.gov,FederalArmedForcesRetirementHome,Deleted,(blank)\n" - ) - - # Normalize line endings and remove commas, - # spaces and leading/trailing whitespace - csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() - expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() - - self.assertEqual(csv_content, expected_content) - - def test_export_domains_to_writer_domain_managers(self): - """Test that export_domains_to_writer returns the - expected domain managers. + @less_console_noise_decorator + def test_domain_managed(self): + """Shows ready and deleted domains by an end date, sorted An invited user, woofwardthethird, should also be pulled into this report. squeaker@rocks.com is invited to domain2 (DNS_NEEDED) and domain10 (No managers). - She should show twice in this report but not in test_export_data_managed_domains_to_csv.""" + She should show twice in this report but not in test_DomainManaged.""" + # Create a CSV file in memory + csv_file = StringIO() + # Call the export functions + DomainManaged.export_data_to_csv( + csv_file, + self.start_date.strftime("%Y-%m-%d"), + self.end_date.strftime("%Y-%m-%d"), + ) + # Reset the CSV file's position to the beginning + csv_file.seek(0) + # Read the content into a variable + csv_content = csv_file.read() + # We expect the READY domain names with the domain managers: Their counts, and listing at end_date. + expected_content = ( + "MANAGED DOMAINS COUNTS AT START DATE\n" + "Total,Federal,Interstate,State or territory,Tribal,County,City,Special district," + "School district,Election office\n" + "0,0,0,0,0,0,0,0,0,0\n" + "\n" + "MANAGED DOMAINS COUNTS AT END DATE\n" + "Total,Federal,Interstate,State or territory,Tribal,County,City," + "Special district,School district,Election office\n" + "3,2,1,0,0,0,0,0,0,0\n" + "\n" + "Domain name,Domain type,Domain managers,Invited domain managers\n" + "cdomain11.gov,Federal - Executive,meoward@rocks.com,\n" + 'cdomain1.gov,Federal - Executive,"meoward@rocks.com, info@example.com, big_lebowski@dude.co",' + "woofwardthethird@rocks.com\n" + "zdomain12.gov,Interstate,meoward@rocks.com,\n" + ) + # Normalize line endings and remove commas, + # spaces and leading/trailing whitespace + csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() + expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() + self.assertEqual(csv_content, expected_content) - with less_console_noise(): + @less_console_noise_decorator + def test_domain_unmanaged(self): + """Shows unmanaged domains by an end date, sorted""" + # Create a CSV file in memory + csv_file = StringIO() + DomainUnmanaged.export_data_to_csv( + csv_file, self.start_date.strftime("%Y-%m-%d"), self.end_date.strftime("%Y-%m-%d") + ) + + # Reset the CSV file's position to the beginning + csv_file.seek(0) + # Read the content into a variable + csv_content = csv_file.read() + + # We expect the READY domain names with the domain managers: Their counts, and listing at end_date. + expected_content = ( + "UNMANAGED DOMAINS AT START DATE\n" + "Total,Federal,Interstate,State or territory,Tribal,County,City,Special district," + "School district,Election office\n" + "0,0,0,0,0,0,0,0,0,0\n" + "\n" + "UNMANAGED DOMAINS AT END DATE\n" + "Total,Federal,Interstate,State or territory,Tribal,County,City,Special district," + "School district,Election office\n" + "1,1,0,0,0,0,0,0,0,0\n" + "\n" + "Domain name,Domain type\n" + "adomain10.gov,Federal\n" + ) + + # Normalize line endings and remove commas, + # spaces and leading/trailing whitespace + csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() + expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() + + self.assertEqual(csv_content, expected_content) + + @less_console_noise_decorator + def test_domain_request_growth(self): + """Shows submitted requests within a date range, sorted""" + # Remove "Submitted at" because we can't guess this immutable, dynamically generated test data + columns = [ + "Domain request", + "Domain type", + "Federal type", + # "Submitted at", + ] + with patch("registrar.utility.csv_export.DomainRequestGrowth.get_columns", return_value=columns): # Create a CSV file in memory csv_file = StringIO() - writer = csv.writer(csv_file) - # Define columns, sort fields, and filter condition - columns = [ - "Domain name", - "Status", - "Expiration date", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "SO", - "SO email", - "Security contact email", - ] - sort_fields = ["domain__name"] - filter_condition = { - "domain__state__in": [ - Domain.State.READY, - Domain.State.DNS_NEEDED, - Domain.State.ON_HOLD, - ], - } - # Call the export functions - write_csv_for_domains( - writer, - columns, - sort_fields, - filter_condition, - should_get_domain_managers=True, - should_write_header=True, + DomainRequestGrowth.export_data_to_csv( + csv_file, + self.start_date.strftime("%Y-%m-%d"), + self.end_date.strftime("%Y-%m-%d"), ) - # Reset the CSV file's position to the beginning csv_file.seek(0) # Read the content into a variable csv_content = csv_file.read() - # We expect READY domains, - # sorted alphabetially by domain name - expected_content = ( - "Domain name,Status,Expiration date,Domain type,Agency," - "Organization name,City,State,SO,SO email," - "Security contact email,Domain manager 1,DM1 status,Domain manager 2,DM2 status," - "Domain manager 3,DM3 status,Domain manager 4,DM4 status\n" - "adomain10.gov,Ready,(blank),Federal,Armed Forces Retirement Home,,,, , ,squeaker@rocks.com, I\n" - "adomain2.gov,Dns needed,(blank),Interstate,,,,, , , ,meoward@rocks.com, R,squeaker@rocks.com, I\n" - "cdomain11.govReady,(blank),Federal-ExecutiveWorldWarICentennialCommissionmeoward@rocks.comR\n" - "cdomain1.gov,Ready,(blank),Federal - Executive,World War I Centennial Commission,,," - ", , , ,meoward@rocks.com,R,info@example.com,R,big_lebowski@dude.co,R," - "woofwardthethird@rocks.com,I\n" - "ddomain3.gov,On hold,(blank),Federal,Armed Forces Retirement Home,,,, , , ,,\n" - "zdomain12.gov,Ready,(blank),Interstate,meoward@rocks.com,R\n" - ) - # Normalize line endings and remove commas, - # spaces and leading/trailing whitespace - csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() - expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() - self.assertEqual(csv_content, expected_content) - - def test_export_data_managed_domains_to_csv(self): - """Test get counts for domains that have domain managers for two different dates, - get list of managed domains at end_date. - - An invited user, woofwardthethird, should also be pulled into this report.""" - - with less_console_noise(): - # Create a CSV file in memory - csv_file = StringIO() - export_data_managed_domains_to_csv( - csv_file, self.start_date.strftime("%Y-%m-%d"), self.end_date.strftime("%Y-%m-%d") - ) - - # Reset the CSV file's position to the beginning - csv_file.seek(0) - # Read the content into a variable - csv_content = csv_file.read() - - # We expect the READY domain names with the domain managers: Their counts, and listing at end_date. - expected_content = ( - "MANAGED DOMAINS COUNTS AT START DATE\n" - "Total,Federal,Interstate,State or territory,Tribal,County,City,Special district," - "School district,Election office\n" - "0,0,0,0,0,0,0,0,0,0\n" - "\n" - "MANAGED DOMAINS COUNTS AT END DATE\n" - "Total,Federal,Interstate,State or territory,Tribal,County,City," - "Special district,School district,Election office\n" - "3,2,1,0,0,0,0,0,0,0\n" - "\n" - "Domain name,Domain type,Domain manager 1,DM1 status,Domain manager 2,DM2 status," - "Domain manager 3,DM3 status,Domain manager 4,DM4 status\n" - "cdomain11.govFederal-Executivemeoward@rocks.com, R\n" - "cdomain1.gov,Federal - Executive,meoward@rocks.com,R,info@example.com,R," - "big_lebowski@dude.co,R,woofwardthethird@rocks.com,I\n" - "zdomain12.govInterstatemeoward@rocks.com,R\n" - ) - - # Normalize line endings and remove commas, - # spaces and leading/trailing whitespace - csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() - expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() - - self.assertEqual(csv_content, expected_content) - - def test_export_data_unmanaged_domains_to_csv(self): - """Test get counts for domains that do not have domain managers for two different dates, - get list of unmanaged domains at end_date.""" - - with less_console_noise(): - # Create a CSV file in memory - csv_file = StringIO() - export_data_unmanaged_domains_to_csv( - csv_file, self.start_date.strftime("%Y-%m-%d"), self.end_date.strftime("%Y-%m-%d") - ) - - # Reset the CSV file's position to the beginning - csv_file.seek(0) - # Read the content into a variable - csv_content = csv_file.read() - - # We expect the READY domain names with the domain managers: Their counts, and listing at end_date. - expected_content = ( - "UNMANAGED DOMAINS AT START DATE\n" - "Total,Federal,Interstate,State or territory,Tribal,County,City,Special district," - "School district,Election office\n" - "0,0,0,0,0,0,0,0,0,0\n" - "\n" - "UNMANAGED DOMAINS AT END DATE\n" - "Total,Federal,Interstate,State or territory,Tribal,County,City,Special district," - "School district,Election office\n" - "1,1,0,0,0,0,0,0,0,0\n" - "\n" - "Domain name,Domain type\n" - "adomain10.gov,Federal\n" - ) - - # Normalize line endings and remove commas, - # spaces and leading/trailing whitespace - csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() - expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() - - self.assertEqual(csv_content, expected_content) - - def test_write_requests_body_with_date_filter_pulls_requests_in_range(self): - """Test that requests that are - 1. SUBMITTED and their submission_date are in range - are pulled when the growth report conditions are applied to export_requests_to_writed. - Test that requests are sorted by requested domain name. - """ - - with less_console_noise(): - # Create a CSV file in memory - csv_file = StringIO() - writer = csv.writer(csv_file) - # Define columns, sort fields, and filter condition - # We'll skip submission date because it's dynamic and therefore - # impossible to set in expected_content - columns = ["Domain request", "Domain type", "Federal type"] - sort_fields = [ - "requested_domain__name", - ] - filter_condition = { - "status": DomainRequest.DomainRequestStatus.SUBMITTED, - "submission_date__lte": self.end_date, - "submission_date__gte": self.start_date, - } - - additional_values = ["requested_domain__name"] - all_requests = DomainRequest.objects.filter(**filter_condition).order_by(*sort_fields).distinct() - annotated_requests = DomainRequestExport.annotate_and_retrieve_fields(all_requests, {}, additional_values) - requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False) - DomainRequestExport.write_csv_for_requests(writer, columns, requests_dict) - # Reset the CSV file's position to the beginning - csv_file.seek(0) - # Read the content into a variable - csv_content = csv_file.read() - # We expect READY domains first, created between today-2 and today+2, sorted by created_at then name - # and DELETED domains deleted between today-2 and today+2, sorted by deleted then name expected_content = ( "Domain request,Domain type,Federal type\n" "city3.gov,Federal,Executive\n" @@ -705,68 +513,82 @@ class ExportDataTest(MockDb, MockEppLib): self.assertEqual(csv_content, expected_content) @less_console_noise_decorator - def test_full_domain_request_report(self): + def test_domain_request_data_full(self): """Tests the full domain request report.""" - - # Create a CSV file in memory - csv_file = StringIO() - writer = csv.writer(csv_file) - - # Call the report. Get existing fields from the report itself. - annotations = DomainRequestExport._full_domain_request_annotations() - additional_values = [ - "requested_domain__name", - "federal_agency__agency", - "senior_official__first_name", - "senior_official__last_name", - "senior_official__email", - "senior_official__title", - "creator__first_name", - "creator__last_name", - "creator__email", - "investigator__email", + # Remove "Submitted at" because we can't guess this immutable, dynamically generated test data + columns = [ + "Domain request", + # "Submitted at", + "Status", + "Domain type", + "Federal type", + "Federal agency", + "Organization name", + "Election office", + "City", + "State/territory", + "Region", + "Creator first name", + "Creator last name", + "Creator email", + "Creator approved domains count", + "Creator active requests count", + "Alternative domains", + "SO first name", + "SO last name", + "SO email", + "SO title/role", + "Request purpose", + "Request additional details", + "Other contacts", + "CISA regional representative", + "Current websites", + "Investigator", ] - requests = DomainRequest.objects.exclude(status=DomainRequest.DomainRequestStatus.STARTED) - annotated_requests = DomainRequestExport.annotate_and_retrieve_fields(requests, annotations, additional_values) - requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False) - DomainRequestExport.write_csv_for_requests(writer, DomainRequestExport.all_columns, requests_dict) - - # Reset the CSV file's position to the beginning - csv_file.seek(0) - # Read the content into a variable - csv_content = csv_file.read() - expected_content = ( - # Header - "Domain request,Submitted at,Status,Domain type,Federal type," - "Federal agency,Organization name,Election office,City,State/territory," - "Region,Creator first name,Creator last name,Creator email,Creator approved domains count," - "Creator active requests count,Alternative domains,SO first name,SO last name,SO email," - "SO title/role,Request purpose,Request additional details,Other contacts," - "CISA regional representative,Current websites,Investigator\n" - # Content - "city2.gov,,In review,Federal,Executive,,Testorg,N/A,,NY,2,,,,0,1,city1.gov,Testy,Tester,testy@town.com," - "Chief Tester,Purpose of the site,There is more,Testy Tester testy2@town.com,,city.com,\n" - "city3.gov,2024-04-02,Submitted,Federal,Executive,,Testorg,N/A,,NY,2,,,,0,1," - "cheeseville.gov | city1.gov | igorville.gov,Testy,Tester,testy@town.com,Chief Tester," - "Purpose of the site,CISA-first-name CISA-last-name | There is more,Meow Tester24 te2@town.com | " - "Testy1232 Tester24 te2@town.com | Testy Tester testy2@town.com,test@igorville.com," - "city.com | https://www.example2.com | https://www.example.com,\n" - "city4.gov,2024-04-02,Submitted,City,Executive,,Testorg,Yes,,NY,2,,,,0,1,city1.gov,Testy,Tester," - "testy@town.com,Chief Tester,Purpose of the site,CISA-first-name CISA-last-name | There is more," - "Testy Tester testy2@town.com,cisaRep@igorville.gov,city.com,\n" - "city5.gov,,Approved,Federal,Executive,,Testorg,N/A,,NY,2,,,,1,0,city1.gov,Testy,Tester,testy@town.com," - "Chief Tester,Purpose of the site,There is more,Testy Tester testy2@town.com,,city.com,\n" - "city6.gov,2024-04-02,Submitted,Federal,Executive,,Testorg,N/A,,NY,2,,,,0,1,city1.gov,Testy,Tester," - "testy@town.com,Chief Tester,Purpose of the site,CISA-first-name CISA-last-name | There is more," - "Testy Tester testy2@town.com,cisaRep@igorville.gov,city.com," - ) - - # Normalize line endings and remove commas, - # spaces and leading/trailing whitespace - csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() - expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() - - self.assertEqual(csv_content, expected_content) + with patch("registrar.utility.csv_export.DomainRequestDataFull.get_columns", return_value=columns): + # Create a CSV file in memory + csv_file = StringIO() + # Call the export functions + DomainRequestDataFull.export_data_to_csv(csv_file) + # Reset the CSV file's position to the beginning + csv_file.seek(0) + # Read the content into a variable + csv_content = csv_file.read() + print(csv_content) + expected_content = ( + # Header + "Domain request,Status,Domain type,Federal type," + "Federal agency,Organization name,Election office,City,State/territory," + "Region,Creator first name,Creator last name,Creator email,Creator approved domains count," + "Creator active requests count,Alternative domains,SO first name,SO last name,SO email," + "SO title/role,Request purpose,Request additional details,Other contacts," + "CISA regional representative,Current websites,Investigator\n" + # Content + "city5.gov,,Approved,Federal,Executive,,Testorg,N/A,,NY,2,,,,1,0,city1.gov,Testy,Tester,testy@town.com," + "Chief Tester,Purpose of the site,There is more,Testy Tester testy2@town.com,,city.com,\n" + "city2.gov,,In review,Federal,Executive,,Testorg,N/A,,NY,2,,,,0,1,city1.gov,Testy,Tester," + "testy@town.com," + "Chief Tester,Purpose of the site,There is more,Testy Tester testy2@town.com,,city.com,\n" + 'city3.gov,Submitted,Federal,Executive,,Testorg,N/A,,NY,2,,,,0,1,"cheeseville.gov, city1.gov,' + 'igorville.gov",Testy,Tester,testy@town.com,Chief Tester,Purpose of the site,CISA-first-name ' + "CISA-last-name " + '| There is more,"Meow Tester24 te2@town.com, Testy1232 Tester24 te2@town.com, Testy Tester ' + 'testy2@town.com"' + ',test@igorville.com,"city.com, https://www.example2.com, https://www.example.com",\n' + "city4.gov,Submitted,City,Executive,,Testorg,Yes,,NY,2,,,,0,1,city1.gov,Testy,Tester,testy@town.com," + "Chief Tester,Purpose of the site,CISA-first-name CISA-last-name | There is more,Testy Tester " + "testy2@town.com" + ",cisaRep@igorville.gov,city.com,\n" + "city6.gov,Submitted,Federal,Executive,,Testorg,N/A,,NY,2,,,,0,1,city1.gov,Testy,Tester,testy@town.com," + "Chief Tester,Purpose of the site,CISA-first-name CISA-last-name | There is more,Testy Tester " + "testy2@town.com," + "cisaRep@igorville.gov,city.com,\n" + ) + # Normalize line endings and remove commas, + # spaces and leading/trailing whitespace + csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() + expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() + self.assertEqual(csv_content, expected_content) class HelperFunctions(MockDb): @@ -792,12 +614,12 @@ class HelperFunctions(MockDb): "domain__first_ready__lte": self.end_date, } # Test with distinct - managed_domains_sliced_at_end_date = get_sliced_domains(filter_condition) + managed_domains_sliced_at_end_date = DomainExport.get_sliced_domains(filter_condition) expected_content = [3, 2, 1, 0, 0, 0, 0, 0, 0, 0] self.assertEqual(managed_domains_sliced_at_end_date, expected_content) # Test without distinct - managed_domains_sliced_at_end_date = get_sliced_domains(filter_condition) + managed_domains_sliced_at_end_date = DomainExport.get_sliced_domains(filter_condition) expected_content = [3, 2, 1, 0, 0, 0, 0, 0, 0, 0] self.assertEqual(managed_domains_sliced_at_end_date, expected_content) @@ -809,6 +631,6 @@ class HelperFunctions(MockDb): "status": DomainRequest.DomainRequestStatus.SUBMITTED, "submission_date__lte": self.end_date, } - submitted_requests_sliced_at_end_date = get_sliced_requests(filter_condition) + submitted_requests_sliced_at_end_date = DomainRequestExport.get_sliced_requests(filter_condition) expected_content = [3, 2, 0, 0, 0, 0, 1, 0, 0, 1] self.assertEqual(submitted_requests_sliced_at_end_date, expected_content) diff --git a/src/registrar/utility/csv_export.py b/src/registrar/utility/csv_export.py index a3a9f3280..334742d17 100644 --- a/src/registrar/utility/csv_export.py +++ b/src/registrar/utility/csv_export.py @@ -1,3 +1,5 @@ +from abc import ABC, abstractmethod +from collections import defaultdict import csv import logging from datetime import datetime @@ -9,15 +11,12 @@ from registrar.models import ( PublicContact, UserDomainRole, ) -from django.db.models import QuerySet, Value, CharField, Count, Q, F -from django.db.models import ManyToManyField +from django.db.models import Case, CharField, Count, DateField, F, ManyToManyField, Q, QuerySet, Value, When from django.utils import timezone -from django.core.paginator import Paginator from django.db.models.functions import Concat, Coalesce from django.contrib.postgres.aggregates import StringAgg from registrar.models.utility.generic_helper import convert_queryset_to_dict from registrar.templatetags.custom_filters import get_region -from registrar.utility.enums import DefaultEmail from registrar.utility.constants import BranchChoices @@ -32,390 +31,13 @@ def write_header(writer, columns): writer.writerow(columns) -def get_domain_infos(filter_condition, sort_fields): - """ - Returns DomainInformation objects filtered and sorted based on the provided conditions. - filter_condition -> A dictionary of conditions to filter the objects. - sort_fields -> A list of fields to sort the resulting query set. - returns: A queryset of DomainInformation objects - """ - domain_infos = ( - DomainInformation.objects.select_related("domain", "senior_official") - .filter(**filter_condition) - .order_by(*sort_fields) - .distinct() - ) - - # Do a mass concat of the first and last name fields for senior_official. - # The old operation was computationally heavy for some reason, so if we precompute - # this here, it is vastly more efficient. - domain_infos_cleaned = domain_infos.annotate( - so=Concat( - Coalesce(F("senior_official__first_name"), Value("")), - Value(" "), - Coalesce(F("senior_official__last_name"), Value("")), - output_field=CharField(), - ) - ) - return domain_infos_cleaned - - -def parse_row_for_domain( - columns, - domain_info: DomainInformation, - dict_security_emails=None, - should_get_domain_managers=False, - dict_domain_invitations_with_invited_status=None, - dict_user_domain_roles=None, -): - """Given a set of columns, generate a new row from cleaned column data""" - - # Domain should never be none when parsing this information - if domain_info.domain is None: - logger.error("Attemting to parse row for csv exports but Domain is none in a DomainInfo") - raise ValueError("Domain is none") - - domain = domain_info.domain # type: ignore - - # Grab the security email from a preset dictionary. - # If nothing exists in the dictionary, grab from .contacts. - if dict_security_emails is not None and domain.name in dict_security_emails: - _email = dict_security_emails.get(domain.name) - security_email = _email if _email is not None else " " - else: - # If the dictionary doesn't contain that data, lets filter for it manually. - # This is a last resort as this is a more expensive operation. - security_contacts = domain.contacts.filter(contact_type=PublicContact.ContactTypeChoices.SECURITY) - _email = security_contacts[0].email if security_contacts else None - security_email = _email if _email is not None else " " - - # These are default emails that should not be displayed in the csv report - invalid_emails = {DefaultEmail.LEGACY_DEFAULT.value, DefaultEmail.PUBLIC_CONTACT_DEFAULT.value} - if security_email.lower() in invalid_emails: - security_email = "(blank)" - - if domain_info.federal_type and domain_info.organization_type == DomainRequest.OrgChoicesElectionOffice.FEDERAL: - domain_type = f"{domain_info.get_organization_type_display()} - {domain_info.get_federal_type_display()}" - else: - domain_type = domain_info.get_organization_type_display() - - # create a dictionary of fields which can be included in output - FIELDS = { - "Domain name": domain.name, - "Status": domain.get_state_display(), - "First ready on": domain.first_ready or "(blank)", - "Expiration date": domain.expiration_date or "(blank)", - "Domain type": domain_type, - "Agency": domain_info.federal_agency, - "Organization name": domain_info.organization_name, - "City": domain_info.city, - "State": domain_info.state_territory, - "SO": domain_info.so, # type: ignore - "SO email": domain_info.senior_official.email if domain_info.senior_official else " ", - "Security contact email": security_email, - "Created at": domain.created_at, - "Deleted": domain.deleted, - } - - if should_get_domain_managers: - # Get lists of emails for active and invited domain managers - - dms_active_emails = dict_user_domain_roles.get(domain_info.domain.name, []) - dms_invited_emails = dict_domain_invitations_with_invited_status.get(domain_info.domain.name, []) - - # Set up the "matching headers" + row field data for email and status - i = 0 # Declare i outside of the loop to avoid a reference before assignment in the second loop - for i, dm_email in enumerate(dms_active_emails, start=1): - FIELDS[f"Domain manager {i}"] = dm_email - FIELDS[f"DM{i} status"] = "R" - - # Continue enumeration from where we left off and add data for invited domain managers - for j, dm_email in enumerate(dms_invited_emails, start=i + 1): - FIELDS[f"Domain manager {j}"] = dm_email - FIELDS[f"DM{j} status"] = "I" - - row = [FIELDS.get(column, "") for column in columns] - return row - - -def _get_security_emails(sec_contact_ids): - """ - Retrieve security contact emails for the given security contact IDs. - """ - dict_security_emails = {} - public_contacts = ( - PublicContact.objects.only("email", "domain__name") - .select_related("domain") - .filter(registry_id__in=sec_contact_ids) - ) - - # Populate a dictionary of domain names and their security contacts - for contact in public_contacts: - domain: Domain = contact.domain - if domain is not None and domain.name not in dict_security_emails: - dict_security_emails[domain.name] = contact.email - else: - logger.warning("csv_export -> Domain was none for PublicContact") - - return dict_security_emails - - -def count_domain_managers(domain_name, dict_domain_invitations_with_invited_status, dict_user_domain_roles): - """Count active and invited domain managers""" - dms_active = len(dict_user_domain_roles.get(domain_name, [])) - dms_invited = len(dict_domain_invitations_with_invited_status.get(domain_name, [])) - return dms_active, dms_invited - - -def update_columns(columns, dms_total, should_update_columns): - """Update columns if necessary""" - if should_update_columns: - for i in range(1, dms_total + 1): - email_column_header = f"Domain manager {i}" - status_column_header = f"DM{i} status" - if email_column_header not in columns: - columns.append(email_column_header) - columns.append(status_column_header) - should_update_columns = False - return columns, should_update_columns, dms_total - - -def update_columns_with_domain_managers( - columns, - domain_info, - should_update_columns, - dms_total, - dict_domain_invitations_with_invited_status, - dict_user_domain_roles, -): - """Helper function to update columns with domain manager information""" - - domain_name = domain_info.domain.name - - try: - dms_active, dms_invited = count_domain_managers( - domain_name, dict_domain_invitations_with_invited_status, dict_user_domain_roles - ) - - if dms_active + dms_invited > dms_total: - dms_total = dms_active + dms_invited - should_update_columns = True - - except Exception as err: - logger.error(f"Exception while parsing domain managers for reports: {err}") - - return update_columns(columns, dms_total, should_update_columns) - - -def build_dictionaries_for_domain_managers(dict_user_domain_roles, dict_domain_invitations_with_invited_status): - """Helper function that builds dicts for invited users and active domain - managers. We do so to avoid filtering within loops.""" - - user_domain_roles = UserDomainRole.objects.all() - - # Iterate through each user domain role and populate the dictionary - for user_domain_role in user_domain_roles: - domain_name = user_domain_role.domain.name - email = user_domain_role.user.email - if domain_name not in dict_user_domain_roles: - dict_user_domain_roles[domain_name] = [] - dict_user_domain_roles[domain_name].append(email) - - domain_invitations_with_invited_status = None - domain_invitations_with_invited_status = DomainInvitation.objects.filter( - status=DomainInvitation.DomainInvitationStatus.INVITED - ).select_related("domain") - - # Iterate through each domain invitation and populate the dictionary - for invite in domain_invitations_with_invited_status: - domain_name = invite.domain.name - email = invite.email - if domain_name not in dict_domain_invitations_with_invited_status: - dict_domain_invitations_with_invited_status[domain_name] = [] - dict_domain_invitations_with_invited_status[domain_name].append(email) - - return dict_user_domain_roles, dict_domain_invitations_with_invited_status - - -def write_csv_for_domains( - writer, - columns, - sort_fields, - filter_condition, - should_get_domain_managers=False, - should_write_header=True, -): - """ - Receives params from the parent methods and outputs a CSV with filtered and sorted domains. - Works with write_header as long as the same writer object is passed. - should_get_domain_managers: Conditional bc we only use domain manager info for export_data_full_to_csv - should_write_header: Conditional bc export_data_domain_growth_to_csv calls write_body twice - """ - - # Retrieve domain information and all sec emails - all_domain_infos = get_domain_infos(filter_condition, sort_fields) - sec_contact_ids = all_domain_infos.values_list("domain__security_contact_registry_id", flat=True) - dict_security_emails = _get_security_emails(sec_contact_ids) - paginator = Paginator(all_domain_infos, 1000) - - # Initialize variables - dms_total = 0 - should_update_columns = False - total_body_rows = [] - dict_user_domain_roles = {} - dict_domain_invitations_with_invited_status = {} - - # Build dictionaries if necessary - if should_get_domain_managers: - dict_user_domain_roles, dict_domain_invitations_with_invited_status = build_dictionaries_for_domain_managers( - dict_user_domain_roles, dict_domain_invitations_with_invited_status - ) - - # Process domain information - for page_num in paginator.page_range: - rows = [] - page = paginator.page(page_num) - for domain_info in page.object_list: - if should_get_domain_managers: - columns, dms_total, should_update_columns = update_columns_with_domain_managers( - columns, - domain_info, - should_update_columns, - dms_total, - dict_domain_invitations_with_invited_status, - dict_user_domain_roles, - ) - - try: - row = parse_row_for_domain( - columns, - domain_info, - dict_security_emails, - should_get_domain_managers, - dict_domain_invitations_with_invited_status, - dict_user_domain_roles, - ) - rows.append(row) - except ValueError: - logger.error("csv_export -> Error when parsing row, domain was None") - continue - total_body_rows.extend(rows) - - if should_write_header: - write_header(writer, columns) - writer.writerows(total_body_rows) - - -def export_data_type_to_csv(csv_file): - """ - All domains report with extra columns. - This maps to the "All domain metadata" button. - Exports domains of all statuses. - """ - - writer = csv.writer(csv_file) - # define columns to include in export - columns = [ - "Domain name", - "Status", - "First ready on", - "Expiration date", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "SO", - "SO email", - "Security contact email", - # For domain manager we are pass it in as a parameter below in write_body - ] - - # Coalesce is used to replace federal_type of None with ZZZZZ - sort_fields = [ - "organization_type", - Coalesce("federal_type", Value("ZZZZZ")), - "federal_agency", - "domain__name", - ] - write_csv_for_domains( - writer, columns, sort_fields, filter_condition={}, should_get_domain_managers=True, should_write_header=True - ) - - -def export_data_full_to_csv(csv_file): - """All domains report""" - - writer = csv.writer(csv_file) - # define columns to include in export - columns = [ - "Domain name", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "Security contact email", - ] - # Coalesce is used to replace federal_type of None with ZZZZZ - sort_fields = [ - "organization_type", - Coalesce("federal_type", Value("ZZZZZ")), - "federal_agency", - "domain__name", - ] - filter_condition = { - "domain__state__in": [ - Domain.State.READY, - Domain.State.DNS_NEEDED, - Domain.State.ON_HOLD, - ], - } - write_csv_for_domains( - writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True - ) - - -def export_data_federal_to_csv(csv_file): - """Federal domains report""" - writer = csv.writer(csv_file) - # define columns to include in export - columns = [ - "Domain name", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "Security contact email", - ] - # Coalesce is used to replace federal_type of None with ZZZZZ - sort_fields = [ - "organization_type", - Coalesce("federal_type", Value("ZZZZZ")), - "federal_agency", - "domain__name", - ] - filter_condition = { - "organization_type__icontains": "federal", - "domain__state__in": [ - Domain.State.READY, - Domain.State.DNS_NEEDED, - Domain.State.ON_HOLD, - ], - } - write_csv_for_domains( - writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True - ) - - def get_default_start_date(): - # Default to a date that's prior to our first deployment + """Default to a date that's prior to our first deployment""" return timezone.make_aware(datetime(2023, 11, 1)) def get_default_end_date(): - # Default to now() + """Default to now()""" return timezone.now() @@ -427,419 +49,1289 @@ def format_end_date(end_date): return timezone.make_aware(datetime.strptime(end_date, "%Y-%m-%d")) if end_date else get_default_end_date() -def export_data_domain_growth_to_csv(csv_file, start_date, end_date): +class BaseExport(ABC): """ - Growth report: - Receive start and end dates from the view, parse them. - Request from write_body READY domains that are created between - the start and end dates, as well as DELETED domains that are deleted between - the start and end dates. Specify sort params for both lists. + A generic class for exporting data which returns a csv file for the given model. + Base class in an inheritance tree of 3. """ - start_date_formatted = format_start_date(start_date) - end_date_formatted = format_end_date(end_date) - writer = csv.writer(csv_file) - # define columns to include in export - columns = [ - "Domain name", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "Status", - "Expiration date", - "Created at", - "First ready", - "Deleted", - ] - sort_fields = [ - "domain__first_ready", - "domain__name", - ] - filter_condition = { - "domain__state__in": [Domain.State.READY], - "domain__first_ready__lte": end_date_formatted, - "domain__first_ready__gte": start_date_formatted, - } - - # We also want domains deleted between sar and end dates, sorted - sort_fields_for_deleted_domains = [ - "domain__deleted", - "domain__name", - ] - filter_condition_for_deleted_domains = { - "domain__state__in": [Domain.State.DELETED], - "domain__deleted__lte": end_date_formatted, - "domain__deleted__gte": start_date_formatted, - } - - write_csv_for_domains( - writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True - ) - write_csv_for_domains( - writer, - columns, - sort_fields_for_deleted_domains, - filter_condition_for_deleted_domains, - should_get_domain_managers=False, - should_write_header=False, - ) - - -def get_sliced_domains(filter_condition): - """Get filtered domains counts sliced by org type and election office. - Pass distinct=True when filtering by permissions so we do not to count multiples - when a domain has more that one manager. - """ - - domains = DomainInformation.objects.all().filter(**filter_condition).distinct() - domains_count = domains.count() - federal = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.FEDERAL).distinct().count() - interstate = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.INTERSTATE).count() - state_or_territory = ( - domains.filter(generic_org_type=DomainRequest.OrganizationChoices.STATE_OR_TERRITORY).distinct().count() - ) - tribal = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.TRIBAL).distinct().count() - county = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.COUNTY).distinct().count() - city = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.CITY).distinct().count() - special_district = ( - domains.filter(generic_org_type=DomainRequest.OrganizationChoices.SPECIAL_DISTRICT).distinct().count() - ) - school_district = ( - domains.filter(generic_org_type=DomainRequest.OrganizationChoices.SCHOOL_DISTRICT).distinct().count() - ) - election_board = domains.filter(is_election_board=True).distinct().count() - - return [ - domains_count, - federal, - interstate, - state_or_territory, - tribal, - county, - city, - special_district, - school_district, - election_board, - ] - - -def get_sliced_requests(filter_condition): - """Get filtered requests counts sliced by org type and election office.""" - requests = DomainRequest.objects.all().filter(**filter_condition).distinct() - requests_count = requests.count() - federal = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.FEDERAL).distinct().count() - interstate = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.INTERSTATE).distinct().count() - state_or_territory = ( - requests.filter(generic_org_type=DomainRequest.OrganizationChoices.STATE_OR_TERRITORY).distinct().count() - ) - tribal = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.TRIBAL).distinct().count() - county = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.COUNTY).distinct().count() - city = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.CITY).distinct().count() - special_district = ( - requests.filter(generic_org_type=DomainRequest.OrganizationChoices.SPECIAL_DISTRICT).distinct().count() - ) - school_district = ( - requests.filter(generic_org_type=DomainRequest.OrganizationChoices.SCHOOL_DISTRICT).distinct().count() - ) - election_board = requests.filter(is_election_board=True).distinct().count() - - return [ - requests_count, - federal, - interstate, - state_or_territory, - tribal, - county, - city, - special_district, - school_district, - election_board, - ] - - -def export_data_managed_domains_to_csv(csv_file, start_date, end_date): - """Get counts for domains that have domain managers for two different dates, - get list of managed domains at end_date.""" - - start_date_formatted = format_start_date(start_date) - end_date_formatted = format_end_date(end_date) - writer = csv.writer(csv_file) - columns = [ - "Domain name", - "Domain type", - ] - sort_fields = [ - "domain__name", - ] - filter_managed_domains_start_date = { - "domain__permissions__isnull": False, - "domain__first_ready__lte": start_date_formatted, - } - managed_domains_sliced_at_start_date = get_sliced_domains(filter_managed_domains_start_date) - - writer.writerow(["MANAGED DOMAINS COUNTS AT START DATE"]) - writer.writerow( - [ - "Total", - "Federal", - "Interstate", - "State or territory", - "Tribal", - "County", - "City", - "Special district", - "School district", - "Election office", - ] - ) - writer.writerow(managed_domains_sliced_at_start_date) - writer.writerow([]) - - filter_managed_domains_end_date = { - "domain__permissions__isnull": False, - "domain__first_ready__lte": end_date_formatted, - } - managed_domains_sliced_at_end_date = get_sliced_domains(filter_managed_domains_end_date) - - writer.writerow(["MANAGED DOMAINS COUNTS AT END DATE"]) - writer.writerow( - [ - "Total", - "Federal", - "Interstate", - "State or territory", - "Tribal", - "County", - "City", - "Special district", - "School district", - "Election office", - ] - ) - writer.writerow(managed_domains_sliced_at_end_date) - writer.writerow([]) - - write_csv_for_domains( - writer, - columns, - sort_fields, - filter_managed_domains_end_date, - should_get_domain_managers=True, - should_write_header=True, - ) - - -def export_data_unmanaged_domains_to_csv(csv_file, start_date, end_date): - """Get counts for domains that do not have domain managers for two different dates, - get list of unmanaged domains at end_date.""" - - start_date_formatted = format_start_date(start_date) - end_date_formatted = format_end_date(end_date) - writer = csv.writer(csv_file) - columns = [ - "Domain name", - "Domain type", - ] - sort_fields = [ - "domain__name", - ] - - filter_unmanaged_domains_start_date = { - "domain__permissions__isnull": True, - "domain__first_ready__lte": start_date_formatted, - } - unmanaged_domains_sliced_at_start_date = get_sliced_domains(filter_unmanaged_domains_start_date) - - writer.writerow(["UNMANAGED DOMAINS AT START DATE"]) - writer.writerow( - [ - "Total", - "Federal", - "Interstate", - "State or territory", - "Tribal", - "County", - "City", - "Special district", - "School district", - "Election office", - ] - ) - writer.writerow(unmanaged_domains_sliced_at_start_date) - writer.writerow([]) - - filter_unmanaged_domains_end_date = { - "domain__permissions__isnull": True, - "domain__first_ready__lte": end_date_formatted, - } - unmanaged_domains_sliced_at_end_date = get_sliced_domains(filter_unmanaged_domains_end_date) - - writer.writerow(["UNMANAGED DOMAINS AT END DATE"]) - writer.writerow( - [ - "Total", - "Federal", - "Interstate", - "State or territory", - "Tribal", - "County", - "City", - "Special district", - "School district", - "Election office", - ] - ) - writer.writerow(unmanaged_domains_sliced_at_end_date) - writer.writerow([]) - - write_csv_for_domains( - writer, - columns, - sort_fields, - filter_unmanaged_domains_end_date, - should_get_domain_managers=False, - should_write_header=True, - ) - - -class DomainRequestExport: - """ - A collection of functions which return csv files regarding the DomainRequest model. - """ - - # Get all columns on the full metadata report - all_columns = [ - "Domain request", - "Submitted at", - "Status", - "Domain type", - "Federal type", - "Federal agency", - "Organization name", - "Election office", - "City", - "State/territory", - "Region", - "Creator first name", - "Creator last name", - "Creator email", - "Creator approved domains count", - "Creator active requests count", - "Alternative domains", - "SO first name", - "SO last name", - "SO email", - "SO title/role", - "Request purpose", - "Request additional details", - "Other contacts", - "CISA regional representative", - "Current websites", - "Investigator", - ] - @classmethod - def export_data_requests_growth_to_csv(cls, csv_file, start_date, end_date): + @abstractmethod + def model(self): """ - Growth report: - Receive start and end dates from the view, parse them. - Request from write_requests_body SUBMITTED requests that are created between - the start and end dates. Specify sort params. + Property to specify the model that the export class will handle. + Must be implemented by subclasses. + """ + pass + + @classmethod + def get_columns(cls): + """ + Returns the columns for CSV export. Override in subclasses as needed. + """ + return [] + + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields for the CSV export. Override in subclasses as needed. + """ + return [] + + @classmethod + def get_additional_args(cls): + """ + Returns additional keyword arguments as an empty dictionary. + Override in subclasses to provide specific arguments. + """ + return {} + + @classmethod + def get_select_related(cls): + """ + Get a list of tables to pass to select_related when building queryset. + """ + return [] + + @classmethod + def get_prefetch_related(cls): + """ + Get a list of tables to pass to prefetch_related when building queryset. + """ + return [] + + @classmethod + def get_exclusions(cls): + """ + Get a Q object of exclusion conditions to pass to .exclude() when building queryset. + """ + return Q() + + @classmethod + def get_filter_conditions(cls, start_date=None, end_date=None): + """ + Get a Q object of filter conditions to filter when building queryset. + """ + return Q() + + @classmethod + def get_computed_fields(cls): + """ + Get a dict of computed fields. These are fields that do not exist on the model normally + and will be passed to .annotate() when building a queryset. + """ + return {} + + @classmethod + def get_annotations_for_sort(cls): + """ + Get a dict of annotations to make available for order_by clause. + """ + return {} + + @classmethod + def get_related_table_fields(cls): + """ + Get a list of fields from related tables. + """ + return [] + + @classmethod + def update_queryset(cls, queryset, **kwargs): + """ + Returns an updated queryset. Override in subclass to update queryset. + """ + return queryset + + @classmethod + def write_csv_before(cls, csv_writer, start_date=None, end_date=None): + """ + Write to csv file before the write_csv method. + Override in subclasses where needed. + """ + pass + + @classmethod + def annotate_and_retrieve_fields( + cls, initial_queryset, computed_fields, related_table_fields=None, include_many_to_many=False, **kwargs + ) -> QuerySet: + """ + Applies annotations to a queryset and retrieves specified fields, + including class-defined and annotation-defined. + + Parameters: + initial_queryset (QuerySet): Initial queryset. + computed_fields (dict, optional): Fields to compute {field_name: expression}. + related_table_fields (list, optional): Extra fields to retrieve; defaults to annotation keys if None. + include_many_to_many (bool, optional): Determines if we should include many to many fields or not + **kwargs: Additional keyword arguments for specific parameters (e.g., public_contacts, domain_invitations, + user_domain_roles). + + Returns: + QuerySet: Contains dictionaries with the specified fields for each record. + """ + if related_table_fields is None: + related_table_fields = [] + + # We can infer that if we're passing in annotations, + # we want to grab the result of said annotation. + if computed_fields: + related_table_fields.extend(computed_fields.keys()) + + # Get prexisting fields on the model + model_fields = set() + for field in cls.model()._meta.get_fields(): + # Exclude many to many fields unless we specify + many_to_many = isinstance(field, ManyToManyField) and include_many_to_many + if many_to_many or not isinstance(field, ManyToManyField): + model_fields.add(field.name) + + queryset = initial_queryset.annotate(**computed_fields).values(*model_fields, *related_table_fields) + + return cls.update_queryset(queryset, **kwargs) + + @classmethod + def export_data_to_csv(cls, csv_file, start_date=None, end_date=None): + """ + All domain metadata: + Exports domains of all statuses plus domain managers. + """ + writer = csv.writer(csv_file) + columns = cls.get_columns() + sort_fields = cls.get_sort_fields() + kwargs = cls.get_additional_args() + select_related = cls.get_select_related() + prefetch_related = cls.get_prefetch_related() + exclusions = cls.get_exclusions() + annotations_for_sort = cls.get_annotations_for_sort() + filter_conditions = cls.get_filter_conditions(start_date, end_date) + computed_fields = cls.get_computed_fields() + related_table_fields = cls.get_related_table_fields() + + model_queryset = ( + cls.model() + .objects.select_related(*select_related) + .prefetch_related(*prefetch_related) + .filter(filter_conditions) + .exclude(exclusions) + .annotate(**annotations_for_sort) + .order_by(*sort_fields) + .distinct() + ) + + # Convert the queryset to a dictionary (including annotated fields) + annotated_queryset = cls.annotate_and_retrieve_fields( + model_queryset, computed_fields, related_table_fields, **kwargs + ) + models_dict = convert_queryset_to_dict(annotated_queryset, is_model=False) + + # Write to csv file before the write_csv + cls.write_csv_before(writer, start_date, end_date) + + # Write the csv file + cls.write_csv(writer, columns, models_dict) + + @classmethod + def write_csv( + cls, + writer, + columns, + models_dict, + should_write_header=True, + ): + """Receives params from the parent methods and outputs a CSV with filtered and sorted objects. + Works with write_header as long as the same writer object is passed.""" + + rows = [] + for object in models_dict.values(): + try: + row = cls.parse_row(columns, object) + rows.append(row) + except ValueError as err: + logger.error(f"csv_export -> Error when parsing row: {err}") + continue + + if should_write_header: + write_header(writer, columns) + + writer.writerows(rows) + + @classmethod + @abstractmethod + def parse_row(cls, columns, model): + """ + Given a set of columns and a model dictionary, generate a new row from cleaned column data. + Must be implemented by subclasses + """ + pass + + +class DomainExport(BaseExport): + """ + A collection of functions which return csv files regarding Domains. Although class is + named DomainExport, the base model for the export is DomainInformation. + Second class in an inheritance tree of 3. + """ + + @classmethod + def model(cls): + # Return the model class that this export handles + return DomainInformation + + @classmethod + def update_queryset(cls, queryset, **kwargs): + """ + Returns an updated queryset. + + Add security_contact_email, invited_users, and managers to the queryset, + based on public_contacts, domain_invitations and user_domain_roles + passed through kwargs. + """ + public_contacts = kwargs.get("public_contacts", {}) + domain_invitations = kwargs.get("domain_invitations", {}) + user_domain_roles = kwargs.get("user_domain_roles", {}) + + annotated_domain_infos = [] + + # Create mapping of domain to a list of invited users and managers + invited_users_dict = defaultdict(list) + for domain, email in domain_invitations: + invited_users_dict[domain].append(email) + + managers_dict = defaultdict(list) + for domain, email in user_domain_roles: + managers_dict[domain].append(email) + + # Annotate with security_contact from public_contacts, invited users + # from domain_invitations, and managers from user_domain_roles + for domain_info in queryset: + domain_info["security_contact_email"] = public_contacts.get( + domain_info.get("domain__security_contact_registry_id") + ) + domain_info["invited_users"] = ", ".join(invited_users_dict.get(domain_info.get("domain__name"), [])) + domain_info["managers"] = ", ".join(managers_dict.get(domain_info.get("domain__name"), [])) + annotated_domain_infos.append(domain_info) + + if annotated_domain_infos: + return annotated_domain_infos + + return queryset + + # ============================================================= # + # Helper functions for django ORM queries. # + # We are using these rather than pure python for speed reasons. # + # ============================================================= # + + @classmethod + def get_all_security_emails(cls): + """ + Fetch all PublicContact entries and return a mapping of registry_id to email. + """ + public_contacts = PublicContact.objects.values_list("registry_id", "email") + return {registry_id: email for registry_id, email in public_contacts} + + @classmethod + def get_all_domain_invitations(cls): + """ + Fetch all DomainInvitation entries and return a mapping of domain to email. + """ + domain_invitations = DomainInvitation.objects.filter(status="invited").values_list("domain__name", "email") + return list(domain_invitations) + + @classmethod + def get_all_user_domain_roles(cls): + """ + Fetch all UserDomainRole entries and return a mapping of domain to user__email. + """ + user_domain_roles = UserDomainRole.objects.select_related("user").values_list("domain__name", "user__email") + return list(user_domain_roles) + + @classmethod + def parse_row(cls, columns, model): + """ + Given a set of columns and a model dictionary, generate a new row from cleaned column data. """ + status = model.get("domain__state") + human_readable_status = Domain.State.get_state_label(status) + + expiration_date = model.get("domain__expiration_date") + if expiration_date is None: + expiration_date = "(blank)" + + first_ready_on = model.get("domain__first_ready") + if first_ready_on is None: + first_ready_on = "(blank)" + + domain_org_type = model.get("generic_org_type") + human_readable_domain_org_type = DomainRequest.OrganizationChoices.get_org_label(domain_org_type) + domain_federal_type = model.get("federal_type") + human_readable_domain_federal_type = BranchChoices.get_branch_label(domain_federal_type) + domain_type = human_readable_domain_org_type + if domain_federal_type and domain_org_type == DomainRequest.OrgChoicesElectionOffice.FEDERAL: + domain_type = f"{human_readable_domain_org_type} - {human_readable_domain_federal_type}" + + # create a dictionary of fields which can be included in output. + # "extra_fields" are precomputed fields (generated in the DB or parsed). + FIELDS = { + "Domain name": model.get("domain__name"), + "Status": human_readable_status, + "First ready on": first_ready_on, + "Expiration date": expiration_date, + "Domain type": domain_type, + "Agency": model.get("federal_agency__agency"), + "Organization name": model.get("organization_name"), + "City": model.get("city"), + "State": model.get("state_territory"), + "SO": model.get("so_name"), + "SO email": model.get("senior_official__email"), + "Security contact email": model.get("security_contact_email"), + "Created at": model.get("domain__created_at"), + "Deleted": model.get("domain__deleted"), + "Domain managers": model.get("managers"), + "Invited domain managers": model.get("invited_users"), + } + + row = [FIELDS.get(column, "") for column in columns] + return row + + @classmethod + def get_sliced_domains(cls, filter_condition): + """Get filtered domains counts sliced by org type and election office. + Pass distinct=True when filtering by permissions so we do not to count multiples + when a domain has more that one manager. + """ + + domains = DomainInformation.objects.all().filter(**filter_condition).distinct() + domains_count = domains.count() + federal = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.FEDERAL).distinct().count() + interstate = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.INTERSTATE).count() + state_or_territory = ( + domains.filter(generic_org_type=DomainRequest.OrganizationChoices.STATE_OR_TERRITORY).distinct().count() + ) + tribal = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.TRIBAL).distinct().count() + county = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.COUNTY).distinct().count() + city = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.CITY).distinct().count() + special_district = ( + domains.filter(generic_org_type=DomainRequest.OrganizationChoices.SPECIAL_DISTRICT).distinct().count() + ) + school_district = ( + domains.filter(generic_org_type=DomainRequest.OrganizationChoices.SCHOOL_DISTRICT).distinct().count() + ) + election_board = domains.filter(is_election_board=True).distinct().count() + + return [ + domains_count, + federal, + interstate, + state_or_territory, + tribal, + county, + city, + special_district, + school_district, + election_board, + ] + + +class DomainDataType(DomainExport): + """ + Shows security contacts, domain managers, so + Inherits from BaseExport -> DomainExport + """ + + @classmethod + def get_columns(cls): + """ + Overrides the columns for CSV export specific to DomainExport. + """ + return [ + "Domain name", + "Status", + "First ready on", + "Expiration date", + "Domain type", + "Agency", + "Organization name", + "City", + "State", + "SO", + "SO email", + "Security contact email", + "Domain managers", + "Invited domain managers", + ] + + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields. + """ + # Coalesce is used to replace federal_type of None with ZZZZZ + return [ + "organization_type", + Coalesce("federal_type", Value("ZZZZZ")), + "federal_agency", + "domain__name", + ] + + @classmethod + def get_additional_args(cls): + """ + Returns additional keyword arguments specific to DomainExport. + + Returns: + dict: Dictionary containing public_contacts, domain_invitations, and user_domain_roles. + """ + # Fetch all relevant PublicContact entries + public_contacts = cls.get_all_security_emails() + + # Fetch all relevant Invite entries + domain_invitations = cls.get_all_domain_invitations() + + # Fetch all relevant UserDomainRole entries + user_domain_roles = cls.get_all_user_domain_roles() + + return { + "public_contacts": public_contacts, + "domain_invitations": domain_invitations, + "user_domain_roles": user_domain_roles, + } + + @classmethod + def get_select_related(cls): + """ + Get a list of tables to pass to select_related when building queryset. + """ + return ["domain", "senior_official"] + + @classmethod + def get_prefetch_related(cls): + """ + Get a list of tables to pass to prefetch_related when building queryset. + """ + return ["permissions"] + + @classmethod + def get_computed_fields(cls, delimiter=", "): + """ + Get a dict of computed fields. + """ + return { + "so_name": Concat( + Coalesce(F("senior_official__first_name"), Value("")), + Value(" "), + Coalesce(F("senior_official__last_name"), Value("")), + output_field=CharField(), + ), + } + + @classmethod + def get_related_table_fields(cls): + """ + Get a list of fields from related tables. + """ + return [ + "domain__name", + "domain__state", + "domain__first_ready", + "domain__expiration_date", + "domain__created_at", + "domain__deleted", + "domain__security_contact_registry_id", + "senior_official__email", + "federal_agency__agency", + ] + + +class DomainDataFull(DomainExport): + """ + Shows security contacts, filtered by state + Inherits from BaseExport -> DomainExport + """ + + @classmethod + def get_columns(cls): + """ + Overrides the columns for CSV export specific to DomainExport. + """ + return [ + "Domain name", + "Domain type", + "Agency", + "Organization name", + "City", + "State", + "Security contact email", + ] + + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields. + """ + # Coalesce is used to replace federal_type of None with ZZZZZ + return [ + "organization_type", + Coalesce("federal_type", Value("ZZZZZ")), + "federal_agency", + "domain__name", + ] + + @classmethod + def get_additional_args(cls): + """ + Returns additional keyword arguments specific to DomainExport. + + Returns: + dict: Dictionary containing public_contacts, domain_invitations, and user_domain_roles. + """ + # Fetch all relevant PublicContact entries + public_contacts = cls.get_all_security_emails() + + return { + "public_contacts": public_contacts, + } + + @classmethod + def get_select_related(cls): + """ + Get a list of tables to pass to select_related when building queryset. + """ + return ["domain"] + + @classmethod + def get_filter_conditions(cls, start_date=None, end_date=None): + """ + Get a Q object of filter conditions to filter when building queryset. + """ + return Q( + domain__state__in=[ + Domain.State.READY, + Domain.State.DNS_NEEDED, + Domain.State.ON_HOLD, + ], + ) + + @classmethod + def get_computed_fields(cls, delimiter=", "): + """ + Get a dict of computed fields. + """ + return { + "so_name": Concat( + Coalesce(F("senior_official__first_name"), Value("")), + Value(" "), + Coalesce(F("senior_official__last_name"), Value("")), + output_field=CharField(), + ), + } + + @classmethod + def get_related_table_fields(cls): + """ + Get a list of fields from related tables. + """ + return [ + "domain__name", + "domain__security_contact_registry_id", + "federal_agency__agency", + ] + + +class DomainDataFederal(DomainExport): + """ + Shows security contacts, filtered by state and org type + Inherits from BaseExport -> DomainExport + """ + + @classmethod + def get_columns(cls): + """ + Overrides the columns for CSV export specific to DomainExport. + """ + return [ + "Domain name", + "Domain type", + "Agency", + "Organization name", + "City", + "State", + "Security contact email", + ] + + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields. + """ + # Coalesce is used to replace federal_type of None with ZZZZZ + return [ + "organization_type", + Coalesce("federal_type", Value("ZZZZZ")), + "federal_agency", + "domain__name", + ] + + @classmethod + def get_additional_args(cls): + """ + Returns additional keyword arguments specific to DomainExport. + + Returns: + dict: Dictionary containing public_contacts, domain_invitations, and user_domain_roles. + """ + # Fetch all relevant PublicContact entries + public_contacts = cls.get_all_security_emails() + + return { + "public_contacts": public_contacts, + } + + @classmethod + def get_select_related(cls): + """ + Get a list of tables to pass to select_related when building queryset. + """ + return ["domain"] + + @classmethod + def get_filter_conditions(cls, start_date=None, end_date=None): + """ + Get a Q object of filter conditions to filter when building queryset. + """ + return Q( + organization_type__icontains="federal", + domain__state__in=[ + Domain.State.READY, + Domain.State.DNS_NEEDED, + Domain.State.ON_HOLD, + ], + ) + + @classmethod + def get_computed_fields(cls, delimiter=", "): + """ + Get a dict of computed fields. + """ + return { + "so_name": Concat( + Coalesce(F("senior_official__first_name"), Value("")), + Value(" "), + Coalesce(F("senior_official__last_name"), Value("")), + output_field=CharField(), + ), + } + + @classmethod + def get_related_table_fields(cls): + """ + Get a list of fields from related tables. + """ + return [ + "domain__name", + "domain__security_contact_registry_id", + "federal_agency__agency", + ] + + +class DomainGrowth(DomainExport): + """ + Shows ready and deleted domains within a date range, sorted + Inherits from BaseExport -> DomainExport + """ + + @classmethod + def get_columns(cls): + """ + Overrides the columns for CSV export specific to DomainExport. + """ + return [ + "Domain name", + "Domain type", + "Agency", + "Organization name", + "City", + "State", + "Status", + "Expiration date", + "Created at", + "First ready", + "Deleted", + ] + + @classmethod + def get_annotations_for_sort(cls, delimiter=", "): + """ + Get a dict of annotations to make available for sorting. + """ + today = timezone.now().date() + return { + "custom_sort": Case( + When(domain__state=Domain.State.READY, then="domain__first_ready"), + When(domain__state=Domain.State.DELETED, then="domain__deleted"), + default=Value(today), # Default value if no conditions match + output_field=DateField(), + ) + } + + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields. + """ + return [ + "-domain__state", + "custom_sort", + "domain__name", + ] + + @classmethod + def get_select_related(cls): + """ + Get a list of tables to pass to select_related when building queryset. + """ + return ["domain"] + + @classmethod + def get_filter_conditions(cls, start_date=None, end_date=None): + """ + Get a Q object of filter conditions to filter when building queryset. + """ + filter_ready = Q( + domain__state__in=[Domain.State.READY], + domain__first_ready__gte=start_date, + domain__first_ready__lte=end_date, + ) + filter_deleted = Q( + domain__state__in=[Domain.State.DELETED], domain__deleted__gte=start_date, domain__deleted__lte=end_date + ) + return filter_ready | filter_deleted + + @classmethod + def get_related_table_fields(cls): + """ + Get a list of fields from related tables. + """ + return [ + "domain__name", + "domain__state", + "domain__first_ready", + "domain__expiration_date", + "domain__created_at", + "domain__deleted", + "federal_agency__agency", + ] + + +class DomainManaged(DomainExport): + """ + Shows managed domains by an end date, sorted + Inherits from BaseExport -> DomainExport + """ + + @classmethod + def get_columns(cls): + """ + Overrides the columns for CSV export specific to DomainExport. + """ + return [ + "Domain name", + "Domain type", + "Domain managers", + "Invited domain managers", + ] + + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields. + """ + return [ + "domain__name", + ] + + @classmethod + def get_select_related(cls): + """ + Get a list of tables to pass to select_related when building queryset. + """ + return ["domain"] + + @classmethod + def get_prefetch_related(cls): + """ + Get a list of tables to pass to prefetch_related when building queryset. + """ + return ["permissions"] + + @classmethod + def get_filter_conditions(cls, start_date=None, end_date=None): + """ + Get a Q object of filter conditions to filter when building queryset. + """ + end_date_formatted = format_end_date(end_date) + return Q( + domain__permissions__isnull=False, + domain__first_ready__lte=end_date_formatted, + ) + + @classmethod + def get_additional_args(cls): + """ + Returns additional keyword arguments specific to DomainExport. + + Returns: + dict: Dictionary containing public_contacts, domain_invitations, and user_domain_roles. + """ + + # Fetch all relevant Invite entries + domain_invitations = cls.get_all_domain_invitations() + + # Fetch all relevant UserDomainRole entries + user_domain_roles = cls.get_all_user_domain_roles() + + return { + "domain_invitations": domain_invitations, + "user_domain_roles": user_domain_roles, + } + + @classmethod + def get_related_table_fields(cls): + """ + Get a list of fields from related tables. + """ + return [ + "domain__name", + ] + + @classmethod + def write_csv_before(cls, csv_writer, start_date=None, end_date=None): + """ + Write to csv file before the write_csv method. + """ start_date_formatted = format_start_date(start_date) end_date_formatted = format_end_date(end_date) - writer = csv.writer(csv_file) - # define columns to include in export - columns = [ + filter_managed_domains_start_date = { + "domain__permissions__isnull": False, + "domain__first_ready__lte": start_date_formatted, + } + managed_domains_sliced_at_start_date = cls.get_sliced_domains(filter_managed_domains_start_date) + + csv_writer.writerow(["MANAGED DOMAINS COUNTS AT START DATE"]) + csv_writer.writerow( + [ + "Total", + "Federal", + "Interstate", + "State or territory", + "Tribal", + "County", + "City", + "Special district", + "School district", + "Election office", + ] + ) + csv_writer.writerow(managed_domains_sliced_at_start_date) + csv_writer.writerow([]) + + filter_managed_domains_end_date = { + "domain__permissions__isnull": False, + "domain__first_ready__lte": end_date_formatted, + } + managed_domains_sliced_at_end_date = cls.get_sliced_domains(filter_managed_domains_end_date) + + csv_writer.writerow(["MANAGED DOMAINS COUNTS AT END DATE"]) + csv_writer.writerow( + [ + "Total", + "Federal", + "Interstate", + "State or territory", + "Tribal", + "County", + "City", + "Special district", + "School district", + "Election office", + ] + ) + csv_writer.writerow(managed_domains_sliced_at_end_date) + csv_writer.writerow([]) + + +class DomainUnmanaged(DomainExport): + """ + Shows unmanaged domains by an end date, sorted + Inherits from BaseExport -> DomainExport + """ + + @classmethod + def get_columns(cls): + """ + Overrides the columns for CSV export specific to DomainExport. + """ + return [ + "Domain name", + "Domain type", + ] + + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields. + """ + return [ + "domain__name", + ] + + @classmethod + def get_select_related(cls): + """ + Get a list of tables to pass to select_related when building queryset. + """ + return ["domain"] + + @classmethod + def get_prefetch_related(cls): + """ + Get a list of tables to pass to prefetch_related when building queryset. + """ + return ["permissions"] + + @classmethod + def get_filter_conditions(cls, start_date=None, end_date=None): + """ + Get a Q object of filter conditions to filter when building queryset. + """ + end_date_formatted = format_end_date(end_date) + return Q( + domain__permissions__isnull=True, + domain__first_ready__lte=end_date_formatted, + ) + + @classmethod + def get_related_table_fields(cls): + """ + Get a list of fields from related tables. + """ + return [ + "domain__name", + ] + + @classmethod + def write_csv_before(cls, csv_writer, start_date=None, end_date=None): + """ + Write to csv file before the write_csv method. + + """ + start_date_formatted = format_start_date(start_date) + end_date_formatted = format_end_date(end_date) + filter_unmanaged_domains_start_date = { + "domain__permissions__isnull": True, + "domain__first_ready__lte": start_date_formatted, + } + unmanaged_domains_sliced_at_start_date = cls.get_sliced_domains(filter_unmanaged_domains_start_date) + + csv_writer.writerow(["UNMANAGED DOMAINS AT START DATE"]) + csv_writer.writerow( + [ + "Total", + "Federal", + "Interstate", + "State or territory", + "Tribal", + "County", + "City", + "Special district", + "School district", + "Election office", + ] + ) + csv_writer.writerow(unmanaged_domains_sliced_at_start_date) + csv_writer.writerow([]) + + filter_unmanaged_domains_end_date = { + "domain__permissions__isnull": True, + "domain__first_ready__lte": end_date_formatted, + } + unmanaged_domains_sliced_at_end_date = cls.get_sliced_domains(filter_unmanaged_domains_end_date) + + csv_writer.writerow(["UNMANAGED DOMAINS AT END DATE"]) + csv_writer.writerow( + [ + "Total", + "Federal", + "Interstate", + "State or territory", + "Tribal", + "County", + "City", + "Special district", + "School district", + "Election office", + ] + ) + csv_writer.writerow(unmanaged_domains_sliced_at_end_date) + csv_writer.writerow([]) + + +class DomainRequestExport(BaseExport): + """ + A collection of functions which return csv files regarding the DomainRequest model. + Second class in an inheritance tree of 3. + """ + + @classmethod + def model(cls): + # Return the model class that this export handles + return DomainRequest + + @classmethod + def get_sliced_requests(cls, filter_condition): + """Get filtered requests counts sliced by org type and election office.""" + requests = DomainRequest.objects.all().filter(**filter_condition).distinct() + requests_count = requests.count() + federal = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.FEDERAL).distinct().count() + interstate = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.INTERSTATE).distinct().count() + state_or_territory = ( + requests.filter(generic_org_type=DomainRequest.OrganizationChoices.STATE_OR_TERRITORY).distinct().count() + ) + tribal = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.TRIBAL).distinct().count() + county = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.COUNTY).distinct().count() + city = requests.filter(generic_org_type=DomainRequest.OrganizationChoices.CITY).distinct().count() + special_district = ( + requests.filter(generic_org_type=DomainRequest.OrganizationChoices.SPECIAL_DISTRICT).distinct().count() + ) + school_district = ( + requests.filter(generic_org_type=DomainRequest.OrganizationChoices.SCHOOL_DISTRICT).distinct().count() + ) + election_board = requests.filter(is_election_board=True).distinct().count() + + return [ + requests_count, + federal, + interstate, + state_or_territory, + tribal, + county, + city, + special_district, + school_district, + election_board, + ] + + @classmethod + def parse_row(cls, columns, model): + """ + Given a set of columns and a model dictionary, generate a new row from cleaned column data. + """ + + # Handle the federal_type field. Defaults to the wrong format. + federal_type = model.get("federal_type") + human_readable_federal_type = BranchChoices.get_branch_label(federal_type) if federal_type else None + + # Handle the org_type field + org_type = model.get("generic_org_type") or model.get("organization_type") + human_readable_org_type = DomainRequest.OrganizationChoices.get_org_label(org_type) if org_type else None + + # Handle the status field. Defaults to the wrong format. + status = model.get("status") + status_display = DomainRequest.DomainRequestStatus.get_status_label(status) if status else None + + # Handle the region field. + state_territory = model.get("state_territory") + region = get_region(state_territory) if state_territory else None + + # Handle the requested_domain field (add a default if None) + requested_domain = model.get("requested_domain__name") + requested_domain_name = requested_domain if requested_domain else "No requested domain" + + # Handle the election field. N/A if None, "Yes"/"No" if boolean + human_readable_election_board = "N/A" + is_election_board = model.get("is_election_board") + if is_election_board is not None: + human_readable_election_board = "Yes" if is_election_board else "No" + + # Handle the additional details field. Pipe seperated. + cisa_rep_first = model.get("cisa_representative_first_name") + cisa_rep_last = model.get("cisa_representative_last_name") + name = [n for n in [cisa_rep_first, cisa_rep_last] if n] + + cisa_rep = " ".join(name) if name else None + details = [cisa_rep, model.get("anything_else")] + additional_details = " | ".join([field for field in details if field]) + + # create a dictionary of fields which can be included in output. + # "extra_fields" are precomputed fields (generated in the DB or parsed). + FIELDS = { + # Parsed fields - defined above. + "Domain request": requested_domain_name, + "Region": region, + "Status": status_display, + "Election office": human_readable_election_board, + "Federal type": human_readable_federal_type, + "Domain type": human_readable_org_type, + "Request additional details": additional_details, + # Annotated fields - passed into the request dict. + "Creator approved domains count": model.get("creator_approved_domains_count", 0), + "Creator active requests count": model.get("creator_active_requests_count", 0), + "Alternative domains": model.get("all_alternative_domains"), + "Other contacts": model.get("all_other_contacts"), + "Current websites": model.get("all_current_websites"), + # Untouched FK fields - passed into the request dict. + "Federal agency": model.get("federal_agency__agency"), + "SO first name": model.get("senior_official__first_name"), + "SO last name": model.get("senior_official__last_name"), + "SO email": model.get("senior_official__email"), + "SO title/role": model.get("senior_official__title"), + "Creator first name": model.get("creator__first_name"), + "Creator last name": model.get("creator__last_name"), + "Creator email": model.get("creator__email"), + "Investigator": model.get("investigator__email"), + # Untouched fields + "Organization name": model.get("organization_name"), + "City": model.get("city"), + "State/territory": model.get("state_territory"), + "Request purpose": model.get("purpose"), + "CISA regional representative": model.get("cisa_representative_email"), + "Submitted at": model.get("submission_date"), + } + + row = [FIELDS.get(column, "") for column in columns] + return row + + +class DomainRequestGrowth(DomainRequestExport): + """ + Shows submitted requests within a date range, sorted + Inherits from BaseExport -> DomainRequestExport + """ + + @classmethod + def get_columns(cls): + """ + Overrides the columns for CSV export specific to DomainRequestGrowth. + """ + return [ "Domain request", "Domain type", "Federal type", "Submitted at", ] - sort_fields = [ + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields. + """ + return [ "requested_domain__name", ] - filter_condition = { - "status": DomainRequest.DomainRequestStatus.SUBMITTED, - "submission_date__lte": end_date_formatted, - "submission_date__gte": start_date_formatted, - } - - # We don't want to annotate anything, but we do want to access the requested domain name - annotations = {} - additional_values = ["requested_domain__name"] - - all_requests = DomainRequest.objects.filter(**filter_condition).order_by(*sort_fields).distinct() - - annotated_requests = cls.annotate_and_retrieve_fields(all_requests, annotations, additional_values) - requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False) - - cls.write_csv_for_requests(writer, columns, requests_dict) @classmethod - def export_full_domain_request_report(cls, csv_file): + def get_filter_conditions(cls, start_date=None, end_date=None): """ - Generates a detailed domain request report to a CSV file. - - Retrieves and annotates DomainRequest objects, excluding 'STARTED' status, - with related data optimizations via select/prefetch and annotation. - - Annotated with counts and aggregates of related entities. - Converts to dict and writes to CSV using predefined columns. - - Parameters: - csv_file (file-like object): Target CSV file. + Get a Q object of filter conditions to filter when building queryset. """ - writer = csv.writer(csv_file) - requests = ( - DomainRequest.objects.select_related( - "creator", "senior_official", "federal_agency", "investigator", "requested_domain" - ) - .prefetch_related("current_websites", "other_contacts", "alternative_domains") - .exclude(status__in=[DomainRequest.DomainRequestStatus.STARTED]) - .order_by( - "status", - "requested_domain__name", - ) - .distinct() + start_date_formatted = format_start_date(start_date) + end_date_formatted = format_end_date(end_date) + return Q( + status=DomainRequest.DomainRequestStatus.SUBMITTED, + submission_date__lte=end_date_formatted, + submission_date__gte=start_date_formatted, ) - # Annotations are custom columns returned to the queryset (AKA: computed in the DB). - annotations = cls._full_domain_request_annotations() + @classmethod + def get_related_table_fields(cls): + """ + Get a list of fields from related tables. + """ + return ["requested_domain__name"] - # The .values returned from annotate_and_retrieve_fields can't go two levels deep - # (just returns the field id of say, "creator") - so we have to include this. - additional_values = [ - "requested_domain__name", - "federal_agency__agency", - "senior_official__first_name", - "senior_official__last_name", - "senior_official__email", - "senior_official__title", - "creator__first_name", - "creator__last_name", - "creator__email", - "investigator__email", - ] - # Convert the domain request queryset to a dictionary (including annotated fields) - annotated_requests = cls.annotate_and_retrieve_fields(requests, annotations, additional_values) - requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False) - - # Write the csv file - cls.write_csv_for_requests(writer, cls.all_columns, requests_dict) +class DomainRequestDataFull(DomainRequestExport): + """ + Shows all but STARTED requests + Inherits from BaseExport -> DomainRequestExport + """ @classmethod - def _full_domain_request_annotations(cls, delimiter=" | "): - """Returns the annotations for the full domain request report""" + def get_columns(cls): + """ + Overrides the columns for CSV export specific to DomainRequestGrowth. + """ + return [ + "Domain request", + "Submitted at", + "Status", + "Domain type", + "Federal type", + "Federal agency", + "Organization name", + "Election office", + "City", + "State/territory", + "Region", + "Creator first name", + "Creator last name", + "Creator email", + "Creator approved domains count", + "Creator active requests count", + "Alternative domains", + "SO first name", + "SO last name", + "SO email", + "SO title/role", + "Request purpose", + "Request additional details", + "Other contacts", + "CISA regional representative", + "Current websites", + "Investigator", + ] + + @classmethod + def get_select_related(cls): + """ + Get a list of tables to pass to select_related when building queryset. + """ + return ["creator", "senior_official", "federal_agency", "investigator", "requested_domain"] + + @classmethod + def get_prefetch_related(cls): + """ + Get a list of tables to pass to prefetch_related when building queryset. + """ + return ["current_websites", "other_contacts", "alternative_domains"] + + @classmethod + def get_exclusions(cls): + """ + Get a Q object of exclusion conditions to use when building queryset. + """ + return Q(status__in=[DomainRequest.DomainRequestStatus.STARTED]) + + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields. + """ + return [ + "status", + "requested_domain__name", + ] + + @classmethod + def get_computed_fields(cls, delimiter=", "): + """ + Get a dict of computed fields. + """ return { - "creator_approved_domains_count": DomainRequestExport.get_creator_approved_domains_count_query(), - "creator_active_requests_count": DomainRequestExport.get_creator_active_requests_count_query(), + "creator_approved_domains_count": cls.get_creator_approved_domains_count_query(), + "creator_active_requests_count": cls.get_creator_active_requests_count_query(), "all_current_websites": StringAgg("current_websites__website", delimiter=delimiter, distinct=True), "all_alternative_domains": StringAgg("alternative_domains__website", delimiter=delimiter, distinct=True), # Coerce the other contacts object to "{first_name} {last_name} {email}" @@ -856,154 +1348,31 @@ class DomainRequestExport: ), } - @staticmethod - def write_csv_for_requests( - writer, - columns, - requests_dict, - should_write_header=True, - ): - """Receives params from the parent methods and outputs a CSV with filtered and sorted requests. - Works with write_header as long as the same writer object is passed.""" - - rows = [] - for request in requests_dict.values(): - try: - row = DomainRequestExport.parse_row_for_requests(columns, request) - rows.append(row) - except ValueError as err: - logger.error(f"csv_export -> Error when parsing row: {err}") - continue - - if should_write_header: - write_header(writer, columns) - - writer.writerows(rows) - - @staticmethod - def parse_row_for_requests(columns, request): - """ - Given a set of columns and a request dictionary, generate a new row from cleaned column data. - """ - - # Handle the federal_type field. Defaults to the wrong format. - federal_type = request.get("federal_type") - human_readable_federal_type = BranchChoices.get_branch_label(federal_type) if federal_type else None - - # Handle the org_type field - org_type = request.get("generic_org_type") or request.get("organization_type") - human_readable_org_type = DomainRequest.OrganizationChoices.get_org_label(org_type) if org_type else None - - # Handle the status field. Defaults to the wrong format. - status = request.get("status") - status_display = DomainRequest.DomainRequestStatus.get_status_label(status) if status else None - - # Handle the region field. - state_territory = request.get("state_territory") - region = get_region(state_territory) if state_territory else None - - # Handle the requested_domain field (add a default if None) - requested_domain = request.get("requested_domain__name") - requested_domain_name = requested_domain if requested_domain else "No requested domain" - - # Handle the election field. N/A if None, "Yes"/"No" if boolean - human_readable_election_board = "N/A" - is_election_board = request.get("is_election_board") - if is_election_board is not None: - human_readable_election_board = "Yes" if is_election_board else "No" - - # Handle the additional details field. Pipe seperated. - cisa_rep_first = request.get("cisa_representative_first_name") - cisa_rep_last = request.get("cisa_representative_last_name") - name = [n for n in [cisa_rep_first, cisa_rep_last] if n] - - cisa_rep = " ".join(name) if name else None - details = [cisa_rep, request.get("anything_else")] - additional_details = " | ".join([field for field in details if field]) - - # create a dictionary of fields which can be included in output. - # "extra_fields" are precomputed fields (generated in the DB or parsed). - FIELDS = { - # Parsed fields - defined above. - "Domain request": requested_domain_name, - "Region": region, - "Status": status_display, - "Election office": human_readable_election_board, - "Federal type": human_readable_federal_type, - "Domain type": human_readable_org_type, - "Request additional details": additional_details, - # Annotated fields - passed into the request dict. - "Creator approved domains count": request.get("creator_approved_domains_count", 0), - "Creator active requests count": request.get("creator_active_requests_count", 0), - "Alternative domains": request.get("all_alternative_domains"), - "Other contacts": request.get("all_other_contacts"), - "Current websites": request.get("all_current_websites"), - # Untouched FK fields - passed into the request dict. - "Federal agency": request.get("federal_agency__agency"), - "SO first name": request.get("senior_official__first_name"), - "SO last name": request.get("senior_official__last_name"), - "SO email": request.get("senior_official__email"), - "SO title/role": request.get("senior_official__title"), - "Creator first name": request.get("creator__first_name"), - "Creator last name": request.get("creator__last_name"), - "Creator email": request.get("creator__email"), - "Investigator": request.get("investigator__email"), - # Untouched fields - "Organization name": request.get("organization_name"), - "City": request.get("city"), - "State/territory": request.get("state_territory"), - "Request purpose": request.get("purpose"), - "CISA regional representative": request.get("cisa_representative_email"), - "Submitted at": request.get("submission_date"), - } - - row = [FIELDS.get(column, "") for column in columns] - return row - @classmethod - def annotate_and_retrieve_fields( - cls, requests, annotations, additional_values=None, include_many_to_many=False - ) -> QuerySet: + def get_related_table_fields(cls): """ - Applies annotations to a queryset and retrieves specified fields, - including class-defined and annotation-defined. - - Parameters: - requests (QuerySet): Initial queryset. - annotations (dict, optional): Fields to compute {field_name: expression}. - additional_values (list, optional): Extra fields to retrieve; defaults to annotation keys if None. - include_many_to_many (bool, optional): Determines if we should include many to many fields or not - - Returns: - QuerySet: Contains dictionaries with the specified fields for each record. + Get a list of fields from related tables. """ - - if additional_values is None: - additional_values = [] - - # We can infer that if we're passing in annotations, - # we want to grab the result of said annotation. - if annotations: - additional_values.extend(annotations.keys()) - - # Get prexisting fields on DomainRequest - domain_request_fields = set() - for field in DomainRequest._meta.get_fields(): - # Exclude many to many fields unless we specify - many_to_many = isinstance(field, ManyToManyField) and include_many_to_many - if many_to_many or not isinstance(field, ManyToManyField): - domain_request_fields.add(field.name) - - queryset = requests.annotate(**annotations).values(*domain_request_fields, *additional_values) - return queryset + return [ + "requested_domain__name", + "federal_agency__agency", + "senior_official__first_name", + "senior_official__last_name", + "senior_official__email", + "senior_official__title", + "creator__first_name", + "creator__last_name", + "creator__email", + "investigator__email", + ] # ============================================================= # # Helper functions for django ORM queries. # # We are using these rather than pure python for speed reasons. # # ============================================================= # - @staticmethod - def get_creator_approved_domains_count_query(): + @classmethod + def get_creator_approved_domains_count_query(cls): """ Generates a Count query for distinct approved domain requests per creator. @@ -1018,8 +1387,8 @@ class DomainRequestExport: ) return query - @staticmethod - def get_creator_active_requests_count_query(): + @classmethod + def get_creator_active_requests_count_query(cls): """ Generates a Count query for distinct approved domain requests per creator. diff --git a/src/registrar/views/admin_views.py b/src/registrar/views/admin_views.py index f1baa72bd..4d015ab37 100644 --- a/src/registrar/views/admin_views.py +++ b/src/registrar/views/admin_views.py @@ -49,8 +49,10 @@ class AnalyticsView(View): "domain__permissions__isnull": False, "domain__first_ready__lte": end_date_formatted, } - managed_domains_sliced_at_start_date = csv_export.get_sliced_domains(filter_managed_domains_start_date) - managed_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_managed_domains_end_date) + managed_domains_sliced_at_start_date = csv_export.DomainExport.get_sliced_domains( + filter_managed_domains_start_date + ) + managed_domains_sliced_at_end_date = csv_export.DomainExport.get_sliced_domains(filter_managed_domains_end_date) filter_unmanaged_domains_start_date = { "domain__permissions__isnull": True, @@ -60,8 +62,12 @@ class AnalyticsView(View): "domain__permissions__isnull": True, "domain__first_ready__lte": end_date_formatted, } - unmanaged_domains_sliced_at_start_date = csv_export.get_sliced_domains(filter_unmanaged_domains_start_date) - unmanaged_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_unmanaged_domains_end_date) + unmanaged_domains_sliced_at_start_date = csv_export.DomainExport.get_sliced_domains( + filter_unmanaged_domains_start_date + ) + unmanaged_domains_sliced_at_end_date = csv_export.DomainExport.get_sliced_domains( + filter_unmanaged_domains_end_date + ) filter_ready_domains_start_date = { "domain__state__in": [models.Domain.State.READY], @@ -71,8 +77,8 @@ class AnalyticsView(View): "domain__state__in": [models.Domain.State.READY], "domain__first_ready__lte": end_date_formatted, } - ready_domains_sliced_at_start_date = csv_export.get_sliced_domains(filter_ready_domains_start_date) - ready_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_ready_domains_end_date) + ready_domains_sliced_at_start_date = csv_export.DomainExport.get_sliced_domains(filter_ready_domains_start_date) + ready_domains_sliced_at_end_date = csv_export.DomainExport.get_sliced_domains(filter_ready_domains_end_date) filter_deleted_domains_start_date = { "domain__state__in": [models.Domain.State.DELETED], @@ -82,8 +88,10 @@ class AnalyticsView(View): "domain__state__in": [models.Domain.State.DELETED], "domain__deleted__lte": end_date_formatted, } - deleted_domains_sliced_at_start_date = csv_export.get_sliced_domains(filter_deleted_domains_start_date) - deleted_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_deleted_domains_end_date) + deleted_domains_sliced_at_start_date = csv_export.DomainExport.get_sliced_domains( + filter_deleted_domains_start_date + ) + deleted_domains_sliced_at_end_date = csv_export.DomainExport.get_sliced_domains(filter_deleted_domains_end_date) filter_requests_start_date = { "created_at__lte": start_date_formatted, @@ -91,8 +99,8 @@ class AnalyticsView(View): filter_requests_end_date = { "created_at__lte": end_date_formatted, } - requests_sliced_at_start_date = csv_export.get_sliced_requests(filter_requests_start_date) - requests_sliced_at_end_date = csv_export.get_sliced_requests(filter_requests_end_date) + requests_sliced_at_start_date = csv_export.DomainRequestExport.get_sliced_requests(filter_requests_start_date) + requests_sliced_at_end_date = csv_export.DomainRequestExport.get_sliced_requests(filter_requests_end_date) filter_submitted_requests_start_date = { "status": models.DomainRequest.DomainRequestStatus.SUBMITTED, @@ -102,8 +110,12 @@ class AnalyticsView(View): "status": models.DomainRequest.DomainRequestStatus.SUBMITTED, "submission_date__lte": end_date_formatted, } - submitted_requests_sliced_at_start_date = csv_export.get_sliced_requests(filter_submitted_requests_start_date) - submitted_requests_sliced_at_end_date = csv_export.get_sliced_requests(filter_submitted_requests_end_date) + submitted_requests_sliced_at_start_date = csv_export.DomainRequestExport.get_sliced_requests( + filter_submitted_requests_start_date + ) + submitted_requests_sliced_at_end_date = csv_export.DomainRequestExport.get_sliced_requests( + filter_submitted_requests_end_date + ) context = dict( # Generate a dictionary of context variables that are common across all admin templates @@ -142,7 +154,7 @@ class ExportDataType(View): # match the CSV example with all the fields response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = 'attachment; filename="domains-by-type.csv"' - csv_export.export_data_type_to_csv(response) + csv_export.DomainDataType.export_data_to_csv(response) return response @@ -151,7 +163,7 @@ class ExportDataFull(View): # Smaller export based on 1 response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = 'attachment; filename="current-full.csv"' - csv_export.export_data_full_to_csv(response) + csv_export.DomainDataFull.export_data_to_csv(response) return response @@ -160,7 +172,7 @@ class ExportDataFederal(View): # Federal only response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = 'attachment; filename="current-federal.csv"' - csv_export.export_data_federal_to_csv(response) + csv_export.DomainDataFederal.export_data_to_csv(response) return response @@ -171,63 +183,51 @@ class ExportDomainRequestDataFull(View): """Returns a content disposition response for current-full-domain-request.csv""" response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = 'attachment; filename="current-full-domain-request.csv"' - csv_export.DomainRequestExport.export_full_domain_request_report(response) + csv_export.DomainRequestDataFull.export_data_to_csv(response) return response class ExportDataDomainsGrowth(View): def get(self, request, *args, **kwargs): - # Get start_date and end_date from the request's GET parameters - # #999: not needed if we switch to django forms start_date = request.GET.get("start_date", "") end_date = request.GET.get("end_date", "") response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = f'attachment; filename="domain-growth-report-{start_date}-to-{end_date}.csv"' - # For #999: set export_data_domain_growth_to_csv to return the resulting queryset, which we can then use - # in context to display this data in the template. - csv_export.export_data_domain_growth_to_csv(response, start_date, end_date) + csv_export.DomainGrowth.export_data_to_csv(response, start_date, end_date) return response class ExportDataRequestsGrowth(View): def get(self, request, *args, **kwargs): - # Get start_date and end_date from the request's GET parameters - # #999: not needed if we switch to django forms start_date = request.GET.get("start_date", "") end_date = request.GET.get("end_date", "") response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = f'attachment; filename="requests-{start_date}-to-{end_date}.csv"' - # For #999: set export_data_domain_growth_to_csv to return the resulting queryset, which we can then use - # in context to display this data in the template. - csv_export.DomainRequestExport.export_data_requests_growth_to_csv(response, start_date, end_date) + csv_export.DomainRequestGrowth.export_data_to_csv(response, start_date, end_date) return response class ExportDataManagedDomains(View): def get(self, request, *args, **kwargs): - # Get start_date and end_date from the request's GET parameters - # #999: not needed if we switch to django forms start_date = request.GET.get("start_date", "") end_date = request.GET.get("end_date", "") response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = f'attachment; filename="managed-domains-{start_date}-to-{end_date}.csv"' - csv_export.export_data_managed_domains_to_csv(response, start_date, end_date) + csv_export.DomainManaged.export_data_to_csv(response, start_date, end_date) return response class ExportDataUnmanagedDomains(View): def get(self, request, *args, **kwargs): - # Get start_date and end_date from the request's GET parameters - # #999: not needed if we switch to django forms start_date = request.GET.get("start_date", "") end_date = request.GET.get("end_date", "") response = HttpResponse(content_type="text/csv") - response["Content-Disposition"] = f'attachment; filename="unamanaged-domains-{start_date}-to-{end_date}.csv"' - csv_export.export_data_unmanaged_domains_to_csv(response, start_date, end_date) + response["Content-Disposition"] = f'attachment; filename="unmanaged-domains-{start_date}-to-{end_date}.csv"' + csv_export.DomainUnmanaged.export_data_to_csv(response, start_date, end_date) return response