diff --git a/src/registrar/models/domain.py b/src/registrar/models/domain.py index 767227499..7fdc56971 100644 --- a/src/registrar/models/domain.py +++ b/src/registrar/models/domain.py @@ -151,6 +151,11 @@ class Domain(TimeStampedModel, DomainHelper): # previously existed but has been deleted from the registry DELETED = "deleted", "Deleted" + @classmethod + def get_state_label(cls, state: str): + """Returns the associated label for a given state value""" + return cls(state).label if state else None + @classmethod def get_help_text(cls, state) -> str: """Returns a help message for a desired state. If none is found, an empty string is returned""" diff --git a/src/registrar/utility/csv_export.py b/src/registrar/utility/csv_export.py index 1a35c8164..345ed7be1 100644 --- a/src/registrar/utility/csv_export.py +++ b/src/registrar/utility/csv_export.py @@ -1,3 +1,4 @@ +from collections import defaultdict import csv import logging from datetime import datetime @@ -32,383 +33,6 @@ def write_header(writer, columns): writer.writerow(columns) -def get_domain_infos(filter_condition, sort_fields): - """ - Returns DomainInformation objects filtered and sorted based on the provided conditions. - filter_condition -> A dictionary of conditions to filter the objects. - sort_fields -> A list of fields to sort the resulting query set. - returns: A queryset of DomainInformation objects - """ - domain_infos = ( - DomainInformation.objects.select_related("domain", "authorizing_official") - .filter(**filter_condition) - .order_by(*sort_fields) - .distinct() - ) - - # Do a mass concat of the first and last name fields for authorizing_official. - # The old operation was computationally heavy for some reason, so if we precompute - # this here, it is vastly more efficient. - domain_infos_cleaned = domain_infos.annotate( - ao=Concat( - Coalesce(F("authorizing_official__first_name"), Value("")), - Value(" "), - Coalesce(F("authorizing_official__last_name"), Value("")), - output_field=CharField(), - ) - ) - return domain_infos_cleaned - - -def parse_row_for_domain( - columns, - domain_info: DomainInformation, - dict_security_emails=None, - should_get_domain_managers=False, - dict_domain_invitations_with_invited_status=None, - dict_user_domain_roles=None, -): - """Given a set of columns, generate a new row from cleaned column data""" - - # Domain should never be none when parsing this information - if domain_info.domain is None: - logger.error("Attemting to parse row for csv exports but Domain is none in a DomainInfo") - raise ValueError("Domain is none") - - domain = domain_info.domain # type: ignore - - # Grab the security email from a preset dictionary. - # If nothing exists in the dictionary, grab from .contacts. - if dict_security_emails is not None and domain.name in dict_security_emails: - _email = dict_security_emails.get(domain.name) - security_email = _email if _email is not None else " " - else: - # If the dictionary doesn't contain that data, lets filter for it manually. - # This is a last resort as this is a more expensive operation. - security_contacts = domain.contacts.filter(contact_type=PublicContact.ContactTypeChoices.SECURITY) - _email = security_contacts[0].email if security_contacts else None - security_email = _email if _email is not None else " " - - # These are default emails that should not be displayed in the csv report - invalid_emails = {DefaultEmail.LEGACY_DEFAULT.value, DefaultEmail.PUBLIC_CONTACT_DEFAULT.value} - if security_email.lower() in invalid_emails: - security_email = "(blank)" - - if domain_info.federal_type and domain_info.organization_type == DomainRequest.OrgChoicesElectionOffice.FEDERAL: - domain_type = f"{domain_info.get_organization_type_display()} - {domain_info.get_federal_type_display()}" - else: - domain_type = domain_info.get_organization_type_display() - - # create a dictionary of fields which can be included in output - FIELDS = { - "Domain name": domain.name, - "Status": domain.get_state_display(), - "First ready on": domain.first_ready or "(blank)", - "Expiration date": domain.expiration_date or "(blank)", - "Domain type": domain_type, - "Agency": domain_info.federal_agency, - "Organization name": domain_info.organization_name, - "City": domain_info.city, - "State": domain_info.state_territory, - "AO": domain_info.ao, # type: ignore - "AO email": domain_info.authorizing_official.email if domain_info.authorizing_official else " ", - "Security contact email": security_email, - "Created at": domain.created_at, - "Deleted": domain.deleted, - } - - if should_get_domain_managers: - # Get lists of emails for active and invited domain managers - - dms_active_emails = dict_user_domain_roles.get(domain_info.domain.name, []) - dms_invited_emails = dict_domain_invitations_with_invited_status.get(domain_info.domain.name, []) - - # Set up the "matching headers" + row field data for email and status - i = 0 # Declare i outside of the loop to avoid a reference before assignment in the second loop - for i, dm_email in enumerate(dms_active_emails, start=1): - FIELDS[f"Domain manager {i}"] = dm_email - FIELDS[f"DM{i} status"] = "R" - - # Continue enumeration from where we left off and add data for invited domain managers - for j, dm_email in enumerate(dms_invited_emails, start=i + 1): - FIELDS[f"Domain manager {j}"] = dm_email - FIELDS[f"DM{j} status"] = "I" - - row = [FIELDS.get(column, "") for column in columns] - return row - - -def _get_security_emails(sec_contact_ids): - """ - Retrieve security contact emails for the given security contact IDs. - """ - dict_security_emails = {} - public_contacts = ( - PublicContact.objects.only("email", "domain__name") - .select_related("domain") - .filter(registry_id__in=sec_contact_ids) - ) - - # Populate a dictionary of domain names and their security contacts - for contact in public_contacts: - domain: Domain = contact.domain - if domain is not None and domain.name not in dict_security_emails: - dict_security_emails[domain.name] = contact.email - else: - logger.warning("csv_export -> Domain was none for PublicContact") - - return dict_security_emails - - -def count_domain_managers(domain_name, dict_domain_invitations_with_invited_status, dict_user_domain_roles): - """Count active and invited domain managers""" - dms_active = len(dict_user_domain_roles.get(domain_name, [])) - dms_invited = len(dict_domain_invitations_with_invited_status.get(domain_name, [])) - return dms_active, dms_invited - - -def update_columns(columns, dms_total, should_update_columns): - """Update columns if necessary""" - if should_update_columns: - for i in range(1, dms_total + 1): - email_column_header = f"Domain manager {i}" - status_column_header = f"DM{i} status" - if email_column_header not in columns: - columns.append(email_column_header) - columns.append(status_column_header) - should_update_columns = False - return columns, should_update_columns, dms_total - - -def update_columns_with_domain_managers( - columns, - domain_info, - should_update_columns, - dms_total, - dict_domain_invitations_with_invited_status, - dict_user_domain_roles, -): - """Helper function to update columns with domain manager information""" - - domain_name = domain_info.domain.name - - try: - dms_active, dms_invited = count_domain_managers( - domain_name, dict_domain_invitations_with_invited_status, dict_user_domain_roles - ) - - if dms_active + dms_invited > dms_total: - dms_total = dms_active + dms_invited - should_update_columns = True - - except Exception as err: - logger.error(f"Exception while parsing domain managers for reports: {err}") - - return update_columns(columns, dms_total, should_update_columns) - - -def build_dictionaries_for_domain_managers(dict_user_domain_roles, dict_domain_invitations_with_invited_status): - """Helper function that builds dicts for invited users and active domain - managers. We do so to avoid filtering within loops.""" - - user_domain_roles = UserDomainRole.objects.all() - - # Iterate through each user domain role and populate the dictionary - for user_domain_role in user_domain_roles: - domain_name = user_domain_role.domain.name - email = user_domain_role.user.email - if domain_name not in dict_user_domain_roles: - dict_user_domain_roles[domain_name] = [] - dict_user_domain_roles[domain_name].append(email) - - domain_invitations_with_invited_status = None - domain_invitations_with_invited_status = DomainInvitation.objects.filter( - status=DomainInvitation.DomainInvitationStatus.INVITED - ).select_related("domain") - - # Iterate through each domain invitation and populate the dictionary - for invite in domain_invitations_with_invited_status: - domain_name = invite.domain.name - email = invite.email - if domain_name not in dict_domain_invitations_with_invited_status: - dict_domain_invitations_with_invited_status[domain_name] = [] - dict_domain_invitations_with_invited_status[domain_name].append(email) - - return dict_user_domain_roles, dict_domain_invitations_with_invited_status - - -def write_csv_for_domains( - writer, - columns, - sort_fields, - filter_condition, - should_get_domain_managers=False, - should_write_header=True, -): - """ - Receives params from the parent methods and outputs a CSV with filtered and sorted domains. - Works with write_header as long as the same writer object is passed. - should_get_domain_managers: Conditional bc we only use domain manager info for export_data_full_to_csv - should_write_header: Conditional bc export_data_domain_growth_to_csv calls write_body twice - """ - - # Retrieve domain information and all sec emails - all_domain_infos = get_domain_infos(filter_condition, sort_fields) - sec_contact_ids = all_domain_infos.values_list("domain__security_contact_registry_id", flat=True) - dict_security_emails = _get_security_emails(sec_contact_ids) - paginator = Paginator(all_domain_infos, 1000) - - # Initialize variables - dms_total = 0 - should_update_columns = False - total_body_rows = [] - dict_user_domain_roles = {} - dict_domain_invitations_with_invited_status = {} - - # Build dictionaries if necessary - if should_get_domain_managers: - dict_user_domain_roles, dict_domain_invitations_with_invited_status = build_dictionaries_for_domain_managers( - dict_user_domain_roles, dict_domain_invitations_with_invited_status - ) - - # Process domain information - for page_num in paginator.page_range: - rows = [] - page = paginator.page(page_num) - for domain_info in page.object_list: - if should_get_domain_managers: - columns, dms_total, should_update_columns = update_columns_with_domain_managers( - columns, - domain_info, - should_update_columns, - dms_total, - dict_domain_invitations_with_invited_status, - dict_user_domain_roles, - ) - - try: - row = parse_row_for_domain( - columns, - domain_info, - dict_security_emails, - should_get_domain_managers, - dict_domain_invitations_with_invited_status, - dict_user_domain_roles, - ) - rows.append(row) - except ValueError: - logger.error("csv_export -> Error when parsing row, domain was None") - continue - total_body_rows.extend(rows) - - if should_write_header: - write_header(writer, columns) - writer.writerows(total_body_rows) - - -def export_data_type_to_csv(csv_file): - """ - All domains report with extra columns. - This maps to the "All domain metadata" button. - Exports domains of all statuses. - """ - - writer = csv.writer(csv_file) - # define columns to include in export - columns = [ - "Domain name", - "Status", - "First ready on", - "Expiration date", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "AO", - "AO email", - "Security contact email", - # For domain manager we are pass it in as a parameter below in write_body - ] - - # Coalesce is used to replace federal_type of None with ZZZZZ - sort_fields = [ - "organization_type", - Coalesce("federal_type", Value("ZZZZZ")), - "federal_agency", - "domain__name", - ] - write_csv_for_domains( - writer, columns, sort_fields, filter_condition={}, should_get_domain_managers=True, should_write_header=True - ) - - -def export_data_full_to_csv(csv_file): - """All domains report""" - - writer = csv.writer(csv_file) - # define columns to include in export - columns = [ - "Domain name", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "Security contact email", - ] - # Coalesce is used to replace federal_type of None with ZZZZZ - sort_fields = [ - "organization_type", - Coalesce("federal_type", Value("ZZZZZ")), - "federal_agency", - "domain__name", - ] - filter_condition = { - "domain__state__in": [ - Domain.State.READY, - Domain.State.DNS_NEEDED, - Domain.State.ON_HOLD, - ], - } - write_csv_for_domains( - writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True - ) - - -def export_data_federal_to_csv(csv_file): - """Federal domains report""" - writer = csv.writer(csv_file) - # define columns to include in export - columns = [ - "Domain name", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "Security contact email", - ] - # Coalesce is used to replace federal_type of None with ZZZZZ - sort_fields = [ - "organization_type", - Coalesce("federal_type", Value("ZZZZZ")), - "federal_agency", - "domain__name", - ] - filter_condition = { - "organization_type__icontains": "federal", - "domain__state__in": [ - Domain.State.READY, - Domain.State.DNS_NEEDED, - Domain.State.ON_HOLD, - ], - } - write_csv_for_domains( - writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True - ) - - def get_default_start_date(): # Default to a date that's prior to our first deployment return timezone.make_aware(datetime(2023, 11, 1)) @@ -427,66 +51,6 @@ def format_end_date(end_date): return timezone.make_aware(datetime.strptime(end_date, "%Y-%m-%d")) if end_date else get_default_end_date() -def export_data_domain_growth_to_csv(csv_file, start_date, end_date): - """ - Growth report: - Receive start and end dates from the view, parse them. - Request from write_body READY domains that are created between - the start and end dates, as well as DELETED domains that are deleted between - the start and end dates. Specify sort params for both lists. - """ - - start_date_formatted = format_start_date(start_date) - end_date_formatted = format_end_date(end_date) - writer = csv.writer(csv_file) - # define columns to include in export - columns = [ - "Domain name", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "Status", - "Expiration date", - "Created at", - "First ready", - "Deleted", - ] - sort_fields = [ - "domain__first_ready", - "domain__name", - ] - filter_condition = { - "domain__state__in": [Domain.State.READY], - "domain__first_ready__lte": end_date_formatted, - "domain__first_ready__gte": start_date_formatted, - } - - # We also want domains deleted between sar and end dates, sorted - sort_fields_for_deleted_domains = [ - "domain__deleted", - "domain__name", - ] - filter_condition_for_deleted_domains = { - "domain__state__in": [Domain.State.DELETED], - "domain__deleted__lte": end_date_formatted, - "domain__deleted__gte": start_date_formatted, - } - - write_csv_for_domains( - writer, columns, sort_fields, filter_condition, should_get_domain_managers=False, should_write_header=True - ) - write_csv_for_domains( - writer, - columns, - sort_fields_for_deleted_domains, - filter_condition_for_deleted_domains, - should_get_domain_managers=False, - should_write_header=False, - ) - - def get_sliced_domains(filter_condition): """Get filtered domains counts sliced by org type and election office. Pass distinct=True when filtering by permissions so we do not to count multiples @@ -558,150 +122,627 @@ def get_sliced_requests(filter_condition): election_board, ] +class DomainExport: + """ + A collection of functions which return csv files regarding the Domain model. + """ -def export_data_managed_domains_to_csv(csv_file, start_date, end_date): - """Get counts for domains that have domain managers for two different dates, - get list of managed domains at end_date.""" - - start_date_formatted = format_start_date(start_date) - end_date_formatted = format_end_date(end_date) - writer = csv.writer(csv_file) - columns = [ - "Domain name", - "Domain type", - ] - sort_fields = [ - "domain__name", - ] - filter_managed_domains_start_date = { - "domain__permissions__isnull": False, - "domain__first_ready__lte": start_date_formatted, - } - managed_domains_sliced_at_start_date = get_sliced_domains(filter_managed_domains_start_date) - - writer.writerow(["MANAGED DOMAINS COUNTS AT START DATE"]) - writer.writerow( - [ - "Total", - "Federal", - "Interstate", - "State or territory", - "Tribal", - "County", + @classmethod + def export_data_type_to_csv(cls, csv_file): + """ + All domain metadata: + Exports domains of all statuses plus domain managers. + """ + writer = csv.writer(csv_file) + columns = [ + "Domain name", + "Status", + "First ready on", + "Expiration date", + "Domain type", + "Agency", + "Organization name", "City", - "Special district", - "School district", - "Election office", + "State", + "AO", + "AO email", + "Security contact email", + "Domain managers", + "Invited domain managers", ] - ) - writer.writerow(managed_domains_sliced_at_start_date) - writer.writerow([]) - filter_managed_domains_end_date = { - "domain__permissions__isnull": False, - "domain__first_ready__lte": end_date_formatted, - } - managed_domains_sliced_at_end_date = get_sliced_domains(filter_managed_domains_end_date) + # Coalesce is used to replace federal_type of None with ZZZZZ + sort_fields = [ + "organization_type", + Coalesce("federal_type", Value("ZZZZZ")), + "federal_agency", + "domain__name", + ] - writer.writerow(["MANAGED DOMAINS COUNTS AT END DATE"]) - writer.writerow( - [ - "Total", - "Federal", - "Interstate", - "State or territory", - "Tribal", - "County", + # Fetch all relevant PublicContact entries + public_contacts = cls.get_all_security_emails() + + # Fetch all relevant Invite entries + domain_invitations = cls.get_all_domain_invitations() + + # Fetch all relevant ComainUserRole entries + user_domain_roles = cls.get_all_user_domain_roles() + + domain_infos = ( + DomainInformation.objects.select_related("domain", "authorizing_official") + .prefetch_related("permissions") + .order_by(*sort_fields) + .distinct() + ) + + annotations = cls._domain_metadata_annotations() + + # The .values returned from annotate_and_retrieve_fields can't go two levels deep + # (just returns the field id of say, "creator") - so we have to include this. + additional_values = [ + "domain__name", + "domain__state", + "domain__first_ready", + "domain__expiration_date", + "domain__created_at", + "domain__deleted", + "authorizing_official__email", + ] + + # Convert the domain request queryset to a dictionary (including annotated fields) + annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, public_contacts, domain_invitations, user_domain_roles, additional_values) + requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False) + + # Write the csv file + cls.write_csv_for_domains(writer, columns, requests_dict) + + @classmethod + def export_data_full_to_csv(cls, csv_file): + """Current full""" + writer = csv.writer(csv_file) + columns = [ + "Domain name", + "Domain type", + "Agency", + "Organization name", "City", - "Special district", - "School district", - "Election office", + "State", + "Security contact email", ] - ) - writer.writerow(managed_domains_sliced_at_end_date) - writer.writerow([]) + # Coalesce is used to replace federal_type of None with ZZZZZ + sort_fields = [ + "organization_type", + Coalesce("federal_type", Value("ZZZZZ")), + "federal_agency", + "domain__name", + ] + filter_condition = { + "domain__state__in": [ + Domain.State.READY, + Domain.State.DNS_NEEDED, + Domain.State.ON_HOLD, + ], + } - write_csv_for_domains( + domain_infos = ( + DomainInformation.objects.select_related("domain") + .filter(**filter_condition) + .order_by(*sort_fields) + .distinct() + ) + + annotations = {} + additional_values = [ + "domain__name", + ] + + # Convert the domain request queryset to a dictionary (including annotated fields) + annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, {}, {}, additional_values) + requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False) + + # Write the csv file + cls.write_csv_for_domains(writer, columns, requests_dict) + + @classmethod + def export_data_federal_to_csv(cls, csv_file): + """Current federal""" + writer = csv.writer(csv_file) + columns = [ + "Domain name", + "Domain type", + "Agency", + "Organization name", + "City", + "State", + "Security contact email", + ] + # Coalesce is used to replace federal_type of None with ZZZZZ + sort_fields = [ + "organization_type", + Coalesce("federal_type", Value("ZZZZZ")), + "federal_agency", + "domain__name", + ] + filter_condition = { + "organization_type__icontains": "federal", + "domain__state__in": [ + Domain.State.READY, + Domain.State.DNS_NEEDED, + Domain.State.ON_HOLD, + ], + } + + domain_infos = ( + DomainInformation.objects.select_related("domain") + .filter(**filter_condition) + .order_by(*sort_fields) + .distinct() + ) + + annotations = {} + additional_values = [ + "domain__name", + ] + + # Convert the domain request queryset to a dictionary (including annotated fields) + annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, {}, {}, additional_values) + requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False) + + # Write the csv file + cls.write_csv_for_domains(writer, columns, requests_dict) + + @classmethod + def export_data_domain_growth_to_csv(cls, csv_file, start_date, end_date): + """ + Domain growth: + Receive start and end dates from the view, parse them. + Request from write_body READY domains that are created between + the start and end dates, as well as DELETED domains that are deleted between + the start and end dates. Specify sort params for both lists. + """ + start_date_formatted = format_start_date(start_date) + end_date_formatted = format_end_date(end_date) + writer = csv.writer(csv_file) + # define columns to include in export + columns = [ + "Domain name", + "Domain type", + "Agency", + "Organization name", + "City", + "State", + "Status", + "Expiration date", + "Created at", + "First ready", + "Deleted", + ] + sort_fields = [ + "domain__first_ready", + "domain__name", + ] + filter_condition = { + "domain__state__in": [Domain.State.READY], + "domain__first_ready__lte": end_date_formatted, + "domain__first_ready__gte": start_date_formatted, + } + + # We also want domains deleted between sar and end dates, sorted + sort_fields_for_deleted_domains = [ + "domain__deleted", + "domain__name", + ] + filter_condition_for_deleted_domains = { + "domain__state__in": [Domain.State.DELETED], + "domain__deleted__lte": end_date_formatted, + "domain__deleted__gte": start_date_formatted, + } + + domain_infos = ( + DomainInformation.objects.select_related("domain") + .filter(**filter_condition) + .order_by(*sort_fields) + .distinct() + ) + deleted_domain_infos = ( + DomainInformation.objects.select_related("domain") + .filter(**filter_condition_for_deleted_domains) + .order_by(*sort_fields_for_deleted_domains) + .distinct() + ) + + annotations = {} + additional_values = [ + "domain__name", + "domain__state", + "domain__first_ready", + "domain__expiration_date", + "domain__created_at", + "domain__deleted", + ] + + # Convert the domain request queryset to a dictionary (including annotated fields) + annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, {}, {}, additional_values) + requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False) + + # Convert the domain request queryset to a dictionary (including annotated fields) + deleted_annotated_domains = cls.annotate_and_retrieve_fields(deleted_domain_infos, annotations, {}, {}, {}, additional_values) + deleted_requests_dict = convert_queryset_to_dict(deleted_annotated_domains, is_model=False) + + cls.write_csv_for_domains( + writer, columns, requests_dict + + ) + cls.write_csv_for_domains( + writer, + columns, + deleted_requests_dict, + should_write_header=False, + ) + + @classmethod + def export_data_managed_domains_to_csv(cls, csv_file, start_date, end_date): + """ + Managed domains: + Get counts for domains that have domain managers for two different dates, + get list of managed domains at end_date.""" + start_date_formatted = format_start_date(start_date) + end_date_formatted = format_end_date(end_date) + writer = csv.writer(csv_file) + columns = [ + "Domain name", + "Domain type", + "Domain managers", + "Invited domain managers", + ] + sort_fields = [ + "domain__name", + ] + filter_managed_domains_start_date = { + "domain__permissions__isnull": False, + "domain__first_ready__lte": start_date_formatted, + } + managed_domains_sliced_at_start_date = get_sliced_domains(filter_managed_domains_start_date) + + writer.writerow(["MANAGED DOMAINS COUNTS AT START DATE"]) + writer.writerow( + [ + "Total", + "Federal", + "Interstate", + "State or territory", + "Tribal", + "County", + "City", + "Special district", + "School district", + "Election office", + ] + ) + writer.writerow(managed_domains_sliced_at_start_date) + writer.writerow([]) + + filter_managed_domains_end_date = { + "domain__permissions__isnull": False, + "domain__first_ready__lte": end_date_formatted, + } + managed_domains_sliced_at_end_date = get_sliced_domains(filter_managed_domains_end_date) + + writer.writerow(["MANAGED DOMAINS COUNTS AT END DATE"]) + writer.writerow( + [ + "Total", + "Federal", + "Interstate", + "State or territory", + "Tribal", + "County", + "City", + "Special district", + "School district", + "Election office", + ] + ) + writer.writerow(managed_domains_sliced_at_end_date) + writer.writerow([]) + + domain_invitations = cls.get_all_domain_invitations() + + # Fetch all relevant ComainUserRole entries + user_domain_roles = cls.get_all_user_domain_roles() + + annotations = {} + # The .values returned from annotate_and_retrieve_fields can't go two levels deep + # (just returns the field id of say, "creator") - so we have to include this. + additional_values = [ + "domain__name", + ] + + domain_infos = ( + DomainInformation.objects.select_related("domain") + .prefetch_related("permissions") + .filter(**filter_managed_domains_end_date) + .order_by(*sort_fields) + .distinct() + ) + + # Convert the domain request queryset to a dictionary (including annotated fields) + annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, domain_invitations, user_domain_roles, additional_values) + requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False) + + cls.write_csv_for_domains( + writer, + columns, + requests_dict + ) + + @classmethod + def export_data_unmanaged_domains_to_csv(cls, csv_file, start_date, end_date): + """ + Unmanaged domains: + Get counts for domains that have domain managers for two different dates, + get list of managed domains at end_date.""" + + start_date_formatted = format_start_date(start_date) + end_date_formatted = format_end_date(end_date) + writer = csv.writer(csv_file) + columns = [ + "Domain name", + "Domain type", + ] + sort_fields = [ + "domain__name", + ] + + filter_unmanaged_domains_start_date = { + "domain__permissions__isnull": True, + "domain__first_ready__lte": start_date_formatted, + } + unmanaged_domains_sliced_at_start_date = get_sliced_domains(filter_unmanaged_domains_start_date) + + writer.writerow(["UNMANAGED DOMAINS AT START DATE"]) + writer.writerow( + [ + "Total", + "Federal", + "Interstate", + "State or territory", + "Tribal", + "County", + "City", + "Special district", + "School district", + "Election office", + ] + ) + writer.writerow(unmanaged_domains_sliced_at_start_date) + writer.writerow([]) + + filter_unmanaged_domains_end_date = { + "domain__permissions__isnull": True, + "domain__first_ready__lte": end_date_formatted, + } + unmanaged_domains_sliced_at_end_date = get_sliced_domains(filter_unmanaged_domains_end_date) + + writer.writerow(["UNMANAGED DOMAINS AT END DATE"]) + writer.writerow( + [ + "Total", + "Federal", + "Interstate", + "State or territory", + "Tribal", + "County", + "City", + "Special district", + "School district", + "Election office", + ] + ) + writer.writerow(unmanaged_domains_sliced_at_end_date) + writer.writerow([]) + + annotations = {} + # The .values returned from annotate_and_retrieve_fields can't go two levels deep + # (just returns the field id of say, "creator") - so we have to include this. + additional_values = [ + "domain__name", + ] + domain_infos = ( + DomainInformation.objects.select_related("domain") + .filter(**filter_unmanaged_domains_end_date) + .order_by(*sort_fields) + .distinct() + ) + + # Convert the domain request queryset to a dictionary (including annotated fields) + annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, {}, {}, additional_values) + requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False) + + cls.write_csv_for_domains( + writer, + columns, + requests_dict + ) + + @classmethod + def _domain_metadata_annotations(cls, delimiter=", "): + """""" + return { + "ao_name": Concat( + Coalesce(F("authorizing_official__first_name"), Value("")), + Value(" "), + Coalesce(F("authorizing_official__last_name"), Value("")), + output_field=CharField(), + ), + } + + @classmethod + def annotate_and_retrieve_fields( + cls, domains, annotations, public_contacts={}, domain_invitations={}, user_domain_roles={}, additional_values=None, include_many_to_many=False + ) -> QuerySet: + """ + Applies annotations to a queryset and retrieves specified fields, + including class-defined and annotation-defined. + + Parameters: + requests (QuerySet): Initial queryset. + annotations (dict, optional): Fields to compute {field_name: expression}. + additional_values (list, optional): Extra fields to retrieve; defaults to annotation keys if None. + include_many_to_many (bool, optional): Determines if we should include many to many fields or not + + Returns: + QuerySet: Contains dictionaries with the specified fields for each record. + """ + + if additional_values is None: + additional_values = [] + + # We can infer that if we're passing in annotations, + # we want to grab the result of said annotation. + if annotations: + additional_values.extend(annotations.keys()) + + # Get prexisting fields on DomainRequest + domain_fields = set() + for field in DomainInformation._meta.get_fields(): + # Exclude many to many fields unless we specify + many_to_many = isinstance(field, ManyToManyField) and include_many_to_many + if many_to_many or not isinstance(field, ManyToManyField): + domain_fields.add(field.name) + + queryset = domains.annotate(**annotations).values(*domain_fields, *additional_values) + annotated_domains = [] + + # Create mapping of domain to a list of invited users and managers + invited_users_dict = defaultdict(list) + for domain, email in domain_invitations: + invited_users_dict[domain].append(email) + + managers_dict = defaultdict(list) + for domain, email in user_domain_roles: + managers_dict[domain].append(email) + + # Annotate with security_contact from public_contacts + for domain in queryset: + domain['security_contact_email'] = public_contacts.get(domain.get('domain__registry_id')) + domain['invited_users'] = ', '.join(invited_users_dict.get(domain.get('domain__name'), [])) + domain['managers'] = ', '.join(managers_dict.get(domain.get('domain__name'), [])) + annotated_domains.append(domain) + + if annotated_domains: + return annotated_domains + + return queryset + + @staticmethod + def parse_row_for_domains(columns, domain): + """ + Given a set of columns and a request dictionary, generate a new row from cleaned column data. + """ + + status = domain.get("domain__state") + human_readable_status = Domain.State.get_state_label(status) + + expiration_date = domain.get("domain__expiration_date") + if expiration_date is None: + expiration_date = "(blank)" + + first_ready_on = domain.get("domain__first_ready") + if first_ready_on is None: + first_ready_on = "(blank)" + + domain_org_type = domain.get("generic_org_type") + human_readable_domain_org_type = DomainRequest.OrganizationChoices.get_org_label(domain_org_type) + domain_federal_type = domain.get("federal_type") + human_readable_domain_federal_type = BranchChoices.get_branch_label(domain_federal_type) + domain_type = human_readable_domain_org_type + if domain_federal_type and domain_org_type == DomainRequest.OrgChoicesElectionOffice.FEDERAL: + domain_type = f"{human_readable_domain_federal_type} - {human_readable_domain_org_type}" + + if domain.get("domain__name") == "18f.gov": + print(f'domain_type {domain_type}') + print(f'federal_agency {domain.get("federal_agency")}') + print(f'city {domain.get("city")}') + + # create a dictionary of fields which can be included in output. + # "extra_fields" are precomputed fields (generated in the DB or parsed). + FIELDS = { + + "Domain name": domain.get("domain__name"), + "Status": human_readable_status, + "First ready on": first_ready_on, + "Expiration date": expiration_date, + "Domain type": domain_type, + "Agency": domain.get("federal_agency"), + "Organization name": domain.get("organization_name"), + "City": domain.get("city"), + "State": domain.get("state_territory"), + "AO": domain.get("ao_name"), + "AO email": domain.get("authorizing_official__email"), + "Security contact email": domain.get("security_contact_email"), + "Created at": domain.get("domain__created_at"), + "Deleted": domain.get("domain__deleted"), + "Domain managers": domain.get("managers"), + "Invited domain managers": domain.get("invited_users"), + } + + row = [FIELDS.get(column, "") for column in columns] + return row + + @staticmethod + def write_csv_for_domains( writer, columns, - sort_fields, - filter_managed_domains_end_date, - should_get_domain_managers=True, + domains_dict, should_write_header=True, - ) + ): + """Receives params from the parent methods and outputs a CSV with filtered and sorted requests. + Works with write_header as long as the same writer object is passed.""" + rows = [] + for domain in domains_dict.values(): + try: + row = DomainExport.parse_row_for_domains(columns, domain) + rows.append(row) + except ValueError as err: + logger.error(f"csv_export -> Error when parsing row: {err}") + continue -def export_data_unmanaged_domains_to_csv(csv_file, start_date, end_date): - """Get counts for domains that do not have domain managers for two different dates, - get list of unmanaged domains at end_date.""" + if should_write_header: + write_header(writer, columns) - start_date_formatted = format_start_date(start_date) - end_date_formatted = format_end_date(end_date) - writer = csv.writer(csv_file) - columns = [ - "Domain name", - "Domain type", - ] - sort_fields = [ - "domain__name", - ] + writer.writerows(rows) - filter_unmanaged_domains_start_date = { - "domain__permissions__isnull": True, - "domain__first_ready__lte": start_date_formatted, - } - unmanaged_domains_sliced_at_start_date = get_sliced_domains(filter_unmanaged_domains_start_date) + # ============================================================= # + # Helper functions for django ORM queries. # + # We are using these rather than pure python for speed reasons. # + # ============================================================= # - writer.writerow(["UNMANAGED DOMAINS AT START DATE"]) - writer.writerow( - [ - "Total", - "Federal", - "Interstate", - "State or territory", - "Tribal", - "County", - "City", - "Special district", - "School district", - "Election office", - ] - ) - writer.writerow(unmanaged_domains_sliced_at_start_date) - writer.writerow([]) + @classmethod + def get_all_security_emails(cls): + """ + Fetch all PublicContact entries and return a mapping of registry_id to email. + """ + public_contacts = PublicContact.objects.values_list('registry_id', 'email') + return {registry_id: email for registry_id, email in public_contacts} + + @classmethod + def get_all_domain_invitations(cls): + """ + Fetch all DomainInvitation entries and return a mapping of domain to email. + """ + domain_invitations = DomainInvitation.objects.filter(status="invited").values_list('domain__name', 'email') + return list(domain_invitations) - filter_unmanaged_domains_end_date = { - "domain__permissions__isnull": True, - "domain__first_ready__lte": end_date_formatted, - } - unmanaged_domains_sliced_at_end_date = get_sliced_domains(filter_unmanaged_domains_end_date) + @classmethod + def get_all_user_domain_roles(cls): + """ + Fetch all UserDomainRole entries and return a mapping of domain to user__email. + """ + user_domain_roles = UserDomainRole.objects.select_related('user').values_list('domain__name', 'user__email') + return list(user_domain_roles) - writer.writerow(["UNMANAGED DOMAINS AT END DATE"]) - writer.writerow( - [ - "Total", - "Federal", - "Interstate", - "State or territory", - "Tribal", - "County", - "City", - "Special district", - "School district", - "Election office", - ] - ) - writer.writerow(unmanaged_domains_sliced_at_end_date) - writer.writerow([]) - - write_csv_for_domains( - writer, - columns, - sort_fields, - filter_unmanaged_domains_end_date, - should_get_domain_managers=False, - should_write_header=True, - ) class DomainRequestExport: diff --git a/src/registrar/views/admin_views.py b/src/registrar/views/admin_views.py index f1baa72bd..9b013e36c 100644 --- a/src/registrar/views/admin_views.py +++ b/src/registrar/views/admin_views.py @@ -142,7 +142,7 @@ class ExportDataType(View): # match the CSV example with all the fields response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = 'attachment; filename="domains-by-type.csv"' - csv_export.export_data_type_to_csv(response) + csv_export.DomainExport.export_data_type_to_csv(response) return response @@ -151,7 +151,7 @@ class ExportDataFull(View): # Smaller export based on 1 response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = 'attachment; filename="current-full.csv"' - csv_export.export_data_full_to_csv(response) + csv_export.DomainExport.export_data_full_to_csv(response) return response @@ -160,7 +160,7 @@ class ExportDataFederal(View): # Federal only response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = 'attachment; filename="current-federal.csv"' - csv_export.export_data_federal_to_csv(response) + csv_export.DomainExport.export_data_federal_to_csv(response) return response @@ -177,31 +177,23 @@ class ExportDomainRequestDataFull(View): class ExportDataDomainsGrowth(View): def get(self, request, *args, **kwargs): - # Get start_date and end_date from the request's GET parameters - # #999: not needed if we switch to django forms start_date = request.GET.get("start_date", "") end_date = request.GET.get("end_date", "") response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = f'attachment; filename="domain-growth-report-{start_date}-to-{end_date}.csv"' - # For #999: set export_data_domain_growth_to_csv to return the resulting queryset, which we can then use - # in context to display this data in the template. - csv_export.export_data_domain_growth_to_csv(response, start_date, end_date) + csv_export.DomainExport.export_data_domain_growth_to_csv(response, start_date, end_date) return response class ExportDataRequestsGrowth(View): def get(self, request, *args, **kwargs): - # Get start_date and end_date from the request's GET parameters - # #999: not needed if we switch to django forms start_date = request.GET.get("start_date", "") end_date = request.GET.get("end_date", "") response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = f'attachment; filename="requests-{start_date}-to-{end_date}.csv"' - # For #999: set export_data_domain_growth_to_csv to return the resulting queryset, which we can then use - # in context to display this data in the template. csv_export.DomainRequestExport.export_data_requests_growth_to_csv(response, start_date, end_date) return response @@ -209,25 +201,21 @@ class ExportDataRequestsGrowth(View): class ExportDataManagedDomains(View): def get(self, request, *args, **kwargs): - # Get start_date and end_date from the request's GET parameters - # #999: not needed if we switch to django forms start_date = request.GET.get("start_date", "") end_date = request.GET.get("end_date", "") response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = f'attachment; filename="managed-domains-{start_date}-to-{end_date}.csv"' - csv_export.export_data_managed_domains_to_csv(response, start_date, end_date) + csv_export.DomainExport.export_data_managed_domains_to_csv(response, start_date, end_date) return response class ExportDataUnmanagedDomains(View): def get(self, request, *args, **kwargs): - # Get start_date and end_date from the request's GET parameters - # #999: not needed if we switch to django forms start_date = request.GET.get("start_date", "") end_date = request.GET.get("end_date", "") response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = f'attachment; filename="unamanaged-domains-{start_date}-to-{end_date}.csv"' - csv_export.export_data_unmanaged_domains_to_csv(response, start_date, end_date) + csv_export.DomainExport.export_data_unmanaged_domains_to_csv(response, start_date, end_date) return response