diff --git a/src/registrar/utility/csv_export.py b/src/registrar/utility/csv_export.py index 55b0bf37f..5a8e88431 100644 --- a/src/registrar/utility/csv_export.py +++ b/src/registrar/utility/csv_export.py @@ -317,7 +317,7 @@ class BaseExport(ABC): pass -class NewDomainExport(BaseExport): +class DomainExport(BaseExport): """ A collection of functions which return csv files regarding the Domain model. """ @@ -490,7 +490,7 @@ class NewDomainExport(BaseExport): ] -class DomainDataType(NewDomainExport): +class DomainDataType(DomainExport): @classmethod def get_columns(cls): @@ -602,7 +602,7 @@ class DomainDataType(NewDomainExport): ] -class DomainDataFull(NewDomainExport): +class DomainDataFull(DomainExport): @classmethod def get_columns(cls): @@ -695,7 +695,7 @@ class DomainDataFull(NewDomainExport): ] -class DomainDataFederal(NewDomainExport): +class DomainDataFederal(DomainExport): @classmethod def get_columns(cls): @@ -789,7 +789,7 @@ class DomainDataFederal(NewDomainExport): ] -class DomainGrowth(NewDomainExport): +class DomainGrowth(DomainExport): @classmethod def get_columns(cls): @@ -866,7 +866,7 @@ class DomainGrowth(NewDomainExport): ] -class DomainManaged(NewDomainExport): +class DomainManaged(DomainExport): @classmethod def get_columns(cls): @@ -1004,7 +1004,7 @@ class DomainManaged(NewDomainExport): csv_writer.writerow([]) -class DomainUnmanaged(NewDomainExport): +class DomainUnmanaged(DomainExport): @classmethod def get_columns(cls): @@ -1120,642 +1120,6 @@ class DomainUnmanaged(NewDomainExport): csv_writer.writerow([]) -class DomainExport: - @classmethod - def export_data_type_to_csv(cls, csv_file): - """ - All domain metadata: - Exports domains of all statuses plus domain managers. - """ - writer = csv.writer(csv_file) - columns = [ - "Domain name", - "Status", - "First ready on", - "Expiration date", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "AO", - "AO email", - "Security contact email", - "Domain managers", - "Invited domain managers", - ] - - # Coalesce is used to replace federal_type of None with ZZZZZ - sort_fields = [ - "organization_type", - Coalesce("federal_type", Value("ZZZZZ")), - "federal_agency", - "domain__name", - ] - - # Fetch all relevant PublicContact entries - public_contacts = cls.get_all_security_emails() - - # Fetch all relevant Invite entries - domain_invitations = cls.get_all_domain_invitations() - - # Fetch all relevant ComainUserRole entries - user_domain_roles = cls.get_all_user_domain_roles() - - domain_infos = ( - DomainInformation.objects.select_related("domain", "authorizing_official") - .prefetch_related("permissions") - .order_by(*sort_fields) - .distinct() - ) - - annotations = cls._domain_metadata_annotations() - - # The .values returned from annotate_and_retrieve_fields can't go two levels deep - # (just returns the field id of say, "creator") - so we have to include this. - additional_values = [ - "domain__name", - "domain__state", - "domain__first_ready", - "domain__expiration_date", - "domain__created_at", - "domain__deleted", - "domain__security_contact_registry_id", - "authorizing_official__email", - "federal_agency__agency", - ] - - # Convert the domain request queryset to a dictionary (including annotated fields) - annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, public_contacts, domain_invitations, user_domain_roles, additional_values) - requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False) - - # Write the csv file - cls.write_csv_for_domains(writer, columns, requests_dict) - - @classmethod - def export_data_full_to_csv(cls, csv_file): - """Current full""" - writer = csv.writer(csv_file) - columns = [ - "Domain name", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "Security contact email", - ] - # Coalesce is used to replace federal_type of None with ZZZZZ - sort_fields = [ - "organization_type", - Coalesce("federal_type", Value("ZZZZZ")), - "federal_agency", - "domain__name", - ] - filter_condition = { - "domain__state__in": [ - Domain.State.READY, - Domain.State.DNS_NEEDED, - Domain.State.ON_HOLD, - ], - } - - # Fetch all relevant PublicContact entries - public_contacts = cls.get_all_security_emails() - - domain_infos = ( - DomainInformation.objects.select_related("domain") - .filter(**filter_condition) - .order_by(*sort_fields) - .distinct() - ) - - annotations = {} - additional_values = [ - "domain__name", - "domain__security_contact_registry_id", - "federal_agency__agency", - ] - - # Convert the domain request queryset to a dictionary (including annotated fields) - annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, public_contacts, {}, {}, additional_values) - requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False) - - # Write the csv file - cls.write_csv_for_domains(writer, columns, requests_dict) - - @classmethod - def export_data_federal_to_csv(cls, csv_file): - """Current federal""" - writer = csv.writer(csv_file) - columns = [ - "Domain name", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "Security contact email", - ] - # Coalesce is used to replace federal_type of None with ZZZZZ - sort_fields = [ - "organization_type", - Coalesce("federal_type", Value("ZZZZZ")), - "federal_agency", - "domain__name", - ] - filter_condition = { - "organization_type__icontains": "federal", - "domain__state__in": [ - Domain.State.READY, - Domain.State.DNS_NEEDED, - Domain.State.ON_HOLD, - ], - } - - # Fetch all relevant PublicContact entries - public_contacts = cls.get_all_security_emails() - - domain_infos = ( - DomainInformation.objects.select_related("domain") - .filter(**filter_condition) - .order_by(*sort_fields) - .distinct() - ) - - annotations = {} - additional_values = [ - "domain__name", - "domain__security_contact_registry_id", - "federal_agency__agency", - ] - - # Convert the domain request queryset to a dictionary (including annotated fields) - annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, public_contacts, {}, {}, additional_values) - requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False) - - # Write the csv file - cls.write_csv_for_domains(writer, columns, requests_dict) - - @classmethod - def export_data_domain_growth_to_csv(cls, csv_file, start_date, end_date): - """ - Domain growth: - Receive start and end dates from the view, parse them. - Request from write_body READY domains that are created between - the start and end dates, as well as DELETED domains that are deleted between - the start and end dates. Specify sort params for both lists. - """ - start_date_formatted = format_start_date(start_date) - end_date_formatted = format_end_date(end_date) - writer = csv.writer(csv_file) - # define columns to include in export - columns = [ - "Domain name", - "Domain type", - "Agency", - "Organization name", - "City", - "State", - "Status", - "Expiration date", - "Created at", - "First ready", - "Deleted", - ] - sort_fields = [ - "domain__first_ready", - "domain__name", - ] - filter_condition = { - "domain__state__in": [Domain.State.READY], - "domain__first_ready__lte": end_date_formatted, - "domain__first_ready__gte": start_date_formatted, - } - - # We also want domains deleted between sar and end dates, sorted - sort_fields_for_deleted_domains = [ - "domain__deleted", - "domain__name", - ] - filter_condition_for_deleted_domains = { - "domain__state__in": [Domain.State.DELETED], - "domain__deleted__lte": end_date_formatted, - "domain__deleted__gte": start_date_formatted, - } - - domain_infos = ( - DomainInformation.objects.select_related("domain") - .filter(**filter_condition) - .order_by(*sort_fields) - .distinct() - ) - deleted_domain_infos = ( - DomainInformation.objects.select_related("domain") - .filter(**filter_condition_for_deleted_domains) - .order_by(*sort_fields_for_deleted_domains) - .distinct() - ) - - annotations = {} - additional_values = [ - "domain__name", - "domain__state", - "domain__first_ready", - "domain__expiration_date", - "domain__created_at", - "domain__deleted", - "federal_agency__agency", - ] - - # Convert the domain request queryset to a dictionary (including annotated fields) - annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, {}, {}, additional_values) - requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False) - - # Convert the domain request queryset to a dictionary (including annotated fields) - deleted_annotated_domains = cls.annotate_and_retrieve_fields(deleted_domain_infos, annotations, {}, {}, {}, additional_values) - deleted_requests_dict = convert_queryset_to_dict(deleted_annotated_domains, is_model=False) - - cls.write_csv_for_domains( - writer, columns, requests_dict - - ) - cls.write_csv_for_domains( - writer, - columns, - deleted_requests_dict, - should_write_header=False, - ) - - @classmethod - def export_data_managed_domains_to_csv(cls, csv_file, start_date, end_date): - """ - Managed domains: - Get counts for domains that have domain managers for two different dates, - get list of managed domains at end_date.""" - start_date_formatted = format_start_date(start_date) - end_date_formatted = format_end_date(end_date) - writer = csv.writer(csv_file) - columns = [ - "Domain name", - "Domain type", - "Domain managers", - "Invited domain managers", - ] - sort_fields = [ - "domain__name", - ] - filter_managed_domains_start_date = { - "domain__permissions__isnull": False, - "domain__first_ready__lte": start_date_formatted, - } - managed_domains_sliced_at_start_date = get_sliced_domains(filter_managed_domains_start_date) - - writer.writerow(["MANAGED DOMAINS COUNTS AT START DATE"]) - writer.writerow( - [ - "Total", - "Federal", - "Interstate", - "State or territory", - "Tribal", - "County", - "City", - "Special district", - "School district", - "Election office", - ] - ) - writer.writerow(managed_domains_sliced_at_start_date) - writer.writerow([]) - - filter_managed_domains_end_date = { - "domain__permissions__isnull": False, - "domain__first_ready__lte": end_date_formatted, - } - managed_domains_sliced_at_end_date = get_sliced_domains(filter_managed_domains_end_date) - - writer.writerow(["MANAGED DOMAINS COUNTS AT END DATE"]) - writer.writerow( - [ - "Total", - "Federal", - "Interstate", - "State or territory", - "Tribal", - "County", - "City", - "Special district", - "School district", - "Election office", - ] - ) - writer.writerow(managed_domains_sliced_at_end_date) - writer.writerow([]) - - domain_invitations = cls.get_all_domain_invitations() - - # Fetch all relevant ComainUserRole entries - user_domain_roles = cls.get_all_user_domain_roles() - - annotations = {} - # The .values returned from annotate_and_retrieve_fields can't go two levels deep - # (just returns the field id of say, "creator") - so we have to include this. - additional_values = [ - "domain__name", - ] - - domain_infos = ( - DomainInformation.objects.select_related("domain") - .prefetch_related("permissions") - .filter(**filter_managed_domains_end_date) - .order_by(*sort_fields) - .distinct() - ) - - # Convert the domain request queryset to a dictionary (including annotated fields) - annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, domain_invitations, user_domain_roles, additional_values) - requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False) - - cls.write_csv_for_domains( - writer, - columns, - requests_dict - ) - - @classmethod - def export_data_unmanaged_domains_to_csv(cls, csv_file, start_date, end_date): - """ - Unmanaged domains: - Get counts for domains that have domain managers for two different dates, - get list of managed domains at end_date.""" - - start_date_formatted = format_start_date(start_date) - end_date_formatted = format_end_date(end_date) - writer = csv.writer(csv_file) - columns = [ - "Domain name", - "Domain type", - ] - sort_fields = [ - "domain__name", - ] - - filter_unmanaged_domains_start_date = { - "domain__permissions__isnull": True, - "domain__first_ready__lte": start_date_formatted, - } - unmanaged_domains_sliced_at_start_date = get_sliced_domains(filter_unmanaged_domains_start_date) - - writer.writerow(["UNMANAGED DOMAINS AT START DATE"]) - writer.writerow( - [ - "Total", - "Federal", - "Interstate", - "State or territory", - "Tribal", - "County", - "City", - "Special district", - "School district", - "Election office", - ] - ) - writer.writerow(unmanaged_domains_sliced_at_start_date) - writer.writerow([]) - - filter_unmanaged_domains_end_date = { - "domain__permissions__isnull": True, - "domain__first_ready__lte": end_date_formatted, - } - unmanaged_domains_sliced_at_end_date = get_sliced_domains(filter_unmanaged_domains_end_date) - - writer.writerow(["UNMANAGED DOMAINS AT END DATE"]) - writer.writerow( - [ - "Total", - "Federal", - "Interstate", - "State or territory", - "Tribal", - "County", - "City", - "Special district", - "School district", - "Election office", - ] - ) - writer.writerow(unmanaged_domains_sliced_at_end_date) - writer.writerow([]) - - annotations = {} - # The .values returned from annotate_and_retrieve_fields can't go two levels deep - # (just returns the field id of say, "creator") - so we have to include this. - additional_values = [ - "domain__name", - ] - domain_infos = ( - DomainInformation.objects.select_related("domain") - .filter(**filter_unmanaged_domains_end_date) - .order_by(*sort_fields) - .distinct() - ) - - # Convert the domain request queryset to a dictionary (including annotated fields) - annotated_domains = cls.annotate_and_retrieve_fields(domain_infos, annotations, {}, {}, {}, additional_values) - requests_dict = convert_queryset_to_dict(annotated_domains, is_model=False) - - cls.write_csv_for_domains( - writer, - columns, - requests_dict - ) - - @classmethod - def _domain_metadata_annotations(cls, delimiter=", "): - """""" - return { - "ao_name": Concat( - Coalesce(F("authorizing_official__first_name"), Value("")), - Value(" "), - Coalesce(F("authorizing_official__last_name"), Value("")), - output_field=CharField(), - ), - } - - @classmethod - def annotate_and_retrieve_fields( - cls, domains, annotations, public_contacts={}, domain_invitations={}, user_domain_roles={}, additional_values=None, include_many_to_many=False - ) -> QuerySet: - """ - Applies annotations to a queryset and retrieves specified fields, - including class-defined and annotation-defined. - - Parameters: - requests (QuerySet): Initial queryset. - annotations (dict, optional): Fields to compute {field_name: expression}. - additional_values (list, optional): Extra fields to retrieve; defaults to annotation keys if None. - include_many_to_many (bool, optional): Determines if we should include many to many fields or not - - Returns: - QuerySet: Contains dictionaries with the specified fields for each record. - """ - - if additional_values is None: - additional_values = [] - - # We can infer that if we're passing in annotations, - # we want to grab the result of said annotation. - if annotations: - additional_values.extend(annotations.keys()) - - # Get prexisting fields on DomainRequest - domain_fields = set() - for field in DomainInformation._meta.get_fields(): - # Exclude many to many fields unless we specify - many_to_many = isinstance(field, ManyToManyField) and include_many_to_many - if many_to_many or not isinstance(field, ManyToManyField): - domain_fields.add(field.name) - - queryset = domains.annotate(**annotations).values(*domain_fields, *additional_values) - annotated_domains = [] - - # Create mapping of domain to a list of invited users and managers - invited_users_dict = defaultdict(list) - for domain, email in domain_invitations: - invited_users_dict[domain].append(email) - - managers_dict = defaultdict(list) - for domain, email in user_domain_roles: - managers_dict[domain].append(email) - - # Annotate with security_contact from public_contacts - for domain in queryset: - domain['security_contact_email'] = public_contacts.get(domain.get('domain__security_contact_registry_id')) - domain['invited_users'] = ', '.join(invited_users_dict.get(domain.get('domain__name'), [])) - domain['managers'] = ', '.join(managers_dict.get(domain.get('domain__name'), [])) - annotated_domains.append(domain) - - if annotated_domains: - return annotated_domains - - return queryset - - @staticmethod - def parse_row_for_domains(columns, domain): - """ - Given a set of columns and a request dictionary, generate a new row from cleaned column data. - """ - - status = domain.get("domain__state") - human_readable_status = Domain.State.get_state_label(status) - - expiration_date = domain.get("domain__expiration_date") - if expiration_date is None: - expiration_date = "(blank)" - - first_ready_on = domain.get("domain__first_ready") - if first_ready_on is None: - first_ready_on = "(blank)" - - domain_org_type = domain.get("generic_org_type") - human_readable_domain_org_type = DomainRequest.OrganizationChoices.get_org_label(domain_org_type) - domain_federal_type = domain.get("federal_type") - human_readable_domain_federal_type = BranchChoices.get_branch_label(domain_federal_type) - domain_type = human_readable_domain_org_type - if domain_federal_type and domain_org_type == DomainRequest.OrgChoicesElectionOffice.FEDERAL: - domain_type = f"{human_readable_domain_org_type} - {human_readable_domain_federal_type}" - - if domain.get("domain__name") == "18f.gov": - print(f'domain_type {domain_type}') - print(f'federal_agency {domain.get("federal_agency")}') - print(f'city {domain.get("city")}') - - print(f'agency {domain.get("agency")}') - - print(f'federal_agency__agency {domain.get("federal_agency__agency")}') - - # create a dictionary of fields which can be included in output. - # "extra_fields" are precomputed fields (generated in the DB or parsed). - FIELDS = { - - "Domain name": domain.get("domain__name"), - "Status": human_readable_status, - "First ready on": first_ready_on, - "Expiration date": expiration_date, - "Domain type": domain_type, - "Agency": domain.get("federal_agency__agency"), - "Organization name": domain.get("organization_name"), - "City": domain.get("city"), - "State": domain.get("state_territory"), - "AO": domain.get("ao_name"), - "AO email": domain.get("authorizing_official__email"), - "Security contact email": domain.get("security_contact_email"), - "Created at": domain.get("domain__created_at"), - "Deleted": domain.get("domain__deleted"), - "Domain managers": domain.get("managers"), - "Invited domain managers": domain.get("invited_users"), - } - - row = [FIELDS.get(column, "") for column in columns] - return row - - @staticmethod - def write_csv_for_domains( - writer, - columns, - domains_dict, - should_write_header=True, - ): - """Receives params from the parent methods and outputs a CSV with filtered and sorted requests. - Works with write_header as long as the same writer object is passed.""" - - rows = [] - for domain in domains_dict.values(): - try: - row = DomainExport.parse_row_for_domains(columns, domain) - rows.append(row) - except ValueError as err: - logger.error(f"csv_export -> Error when parsing row: {err}") - continue - - if should_write_header: - write_header(writer, columns) - - writer.writerows(rows) - - # ============================================================= # - # Helper functions for django ORM queries. # - # We are using these rather than pure python for speed reasons. # - # ============================================================= # - - @classmethod - def get_all_security_emails(cls): - """ - Fetch all PublicContact entries and return a mapping of registry_id to email. - """ - public_contacts = PublicContact.objects.values_list('registry_id', 'email') - return {registry_id: email for registry_id, email in public_contacts} - - @classmethod - def get_all_domain_invitations(cls): - """ - Fetch all DomainInvitation entries and return a mapping of domain to email. - """ - domain_invitations = DomainInvitation.objects.filter(status="invited").values_list('domain__name', 'email') - return list(domain_invitations) - - @classmethod - def get_all_user_domain_roles(cls): - """ - Fetch all UserDomainRole entries and return a mapping of domain to user__email. - """ - user_domain_roles = UserDomainRole.objects.select_related('user').values_list('domain__name', 'user__email') - return list(user_domain_roles) - - - class DomainRequestExport: """ A collection of functions which return csv files regarding the DomainRequest model. diff --git a/src/registrar/views/admin_views.py b/src/registrar/views/admin_views.py index ba57737cb..9f89c9bd5 100644 --- a/src/registrar/views/admin_views.py +++ b/src/registrar/views/admin_views.py @@ -49,8 +49,8 @@ class AnalyticsView(View): "domain__permissions__isnull": False, "domain__first_ready__lte": end_date_formatted, } - managed_domains_sliced_at_start_date = csv_export.NewDomainExport.get_sliced_domains(filter_managed_domains_start_date) - managed_domains_sliced_at_end_date = csv_export.NewDomainExport.get_sliced_domains(filter_managed_domains_end_date) + managed_domains_sliced_at_start_date = csv_export.DomainExport.get_sliced_domains(filter_managed_domains_start_date) + managed_domains_sliced_at_end_date = csv_export.DomainExport.get_sliced_domains(filter_managed_domains_end_date) filter_unmanaged_domains_start_date = { "domain__permissions__isnull": True, @@ -60,8 +60,8 @@ class AnalyticsView(View): "domain__permissions__isnull": True, "domain__first_ready__lte": end_date_formatted, } - unmanaged_domains_sliced_at_start_date = csv_export.NewDomainExport.get_sliced_domains(filter_unmanaged_domains_start_date) - unmanaged_domains_sliced_at_end_date = csv_export.NewDomainExport.get_sliced_domains(filter_unmanaged_domains_end_date) + unmanaged_domains_sliced_at_start_date = csv_export.DomainExport.get_sliced_domains(filter_unmanaged_domains_start_date) + unmanaged_domains_sliced_at_end_date = csv_export.DomainExport.get_sliced_domains(filter_unmanaged_domains_end_date) filter_ready_domains_start_date = { "domain__state__in": [models.Domain.State.READY], @@ -71,8 +71,8 @@ class AnalyticsView(View): "domain__state__in": [models.Domain.State.READY], "domain__first_ready__lte": end_date_formatted, } - ready_domains_sliced_at_start_date = csv_export.NewDomainExport.get_sliced_domains(filter_ready_domains_start_date) - ready_domains_sliced_at_end_date = csv_export.NewDomainExport.get_sliced_domains(filter_ready_domains_end_date) + ready_domains_sliced_at_start_date = csv_export.DomainExport.get_sliced_domains(filter_ready_domains_start_date) + ready_domains_sliced_at_end_date = csv_export.DomainExport.get_sliced_domains(filter_ready_domains_end_date) filter_deleted_domains_start_date = { "domain__state__in": [models.Domain.State.DELETED], @@ -82,8 +82,8 @@ class AnalyticsView(View): "domain__state__in": [models.Domain.State.DELETED], "domain__deleted__lte": end_date_formatted, } - deleted_domains_sliced_at_start_date = csv_export.NewDomainExport.get_sliced_domains(filter_deleted_domains_start_date) - deleted_domains_sliced_at_end_date = csv_export.NewDomainExport.get_sliced_domains(filter_deleted_domains_end_date) + deleted_domains_sliced_at_start_date = csv_export.DomainExport.get_sliced_domains(filter_deleted_domains_start_date) + deleted_domains_sliced_at_end_date = csv_export.DomainExport.get_sliced_domains(filter_deleted_domains_end_date) filter_requests_start_date = { "created_at__lte": start_date_formatted,