From cd279468e515622b61ba73b2a16a387453d1710f Mon Sep 17 00:00:00 2001 From: David Kennedy Date: Thu, 27 Jun 2024 16:15:25 -0400 Subject: [PATCH] wip; working --- src/registrar/utility/csv_export.py | 996 +++++++++++++++++++++++++++- src/registrar/views/admin_views.py | 36 +- 2 files changed, 1016 insertions(+), 16 deletions(-) diff --git a/src/registrar/utility/csv_export.py b/src/registrar/utility/csv_export.py index e51c67f1f..55b0bf37f 100644 --- a/src/registrar/utility/csv_export.py +++ b/src/registrar/utility/csv_export.py @@ -1,3 +1,4 @@ +from abc import ABC, abstractmethod from collections import defaultdict import csv import logging @@ -122,11 +123,1004 @@ def get_sliced_requests(filter_condition): election_board, ] -class DomainExport: +class BaseExport(ABC): + """ + A generic class for exporting data which returns a csv file for the given model. + """ + + @classmethod + @abstractmethod + def model(self): + """ + Property to specify the model that the export class will handle. + Must be implemented by subclasses. + """ + pass + + @classmethod + def get_columns(cls): + """ + Returns the columns for CSV export. Override in subclasses as needed. + """ + return [] + + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields for the CSV export. Override in subclasses as needed. + """ + return [] + + @classmethod + def get_additional_args(cls): + """ + Returns additional keyword arguments as an empty dictionary. + Override in subclasses to provide specific arguments. + """ + return {} + + @classmethod + def get_select_related(cls): + """ + Get a list of tables to pass to select_related when building queryset. + """ + return [] + + @classmethod + def get_prefetch_related(cls): + """ + Get a list of tables to pass to prefetch_related when building queryset. + """ + return [] + + @classmethod + def get_filter_conditions(cls, start_date=None, end_date=None): + """ + Get a Q object of filter conditions to filter when building queryset. + """ + return Q() + + @classmethod + def get_computed_fields(cls): + """ + Get a dict of computed fields. + """ + return {} + + @classmethod + def get_related_table_fields(cls): + """ + Get a list of fields from related tables. + """ + return [] + + @classmethod + def update_queryset(cls, queryset, **kwargs): + """ + Returns an updated queryset. Override in subclass to update queryset. + """ + return queryset + + @classmethod + def write_csv_before(cls, csv_writer, start_date=None, end_date=None): + """ + Write to csv file before the write_csv method. + Override in subclasses where needed. + """ + pass + + @classmethod + def annotate_and_retrieve_fields( + cls, initial_queryset, computed_fields, related_table_fields=None, include_many_to_many=False, **kwargs + ) -> QuerySet: + """ + Applies annotations to a queryset and retrieves specified fields, + including class-defined and annotation-defined. + + Parameters: + initial_queryset (QuerySet): Initial queryset. + computed_fields (dict, optional): Fields to compute {field_name: expression}. + related_table_fields (list, optional): Extra fields to retrieve; defaults to annotation keys if None. + include_many_to_many (bool, optional): Determines if we should include many to many fields or not + **kwargs: Additional keyword arguments for specific parameters (e.g., public_contacts, domain_invitations, + user_domain_roles). + + Returns: + QuerySet: Contains dictionaries with the specified fields for each record. + """ + if related_table_fields is None: + related_table_fields = [] + + # We can infer that if we're passing in annotations, + # we want to grab the result of said annotation. + if computed_fields: + related_table_fields.extend(computed_fields.keys()) + + # Get prexisting fields on the model + model_fields = set() + for field in cls.model()._meta.get_fields(): + # Exclude many to many fields unless we specify + many_to_many = isinstance(field, ManyToManyField) and include_many_to_many + if many_to_many or not isinstance(field, ManyToManyField): + model_fields.add(field.name) + + queryset = initial_queryset.annotate(**computed_fields).values(*model_fields, *related_table_fields) + + return cls.update_queryset(queryset, **kwargs) + + @classmethod + def export_data_to_csv(cls, csv_file, start_date=None, end_date=None): + """ + All domain metadata: + Exports domains of all statuses plus domain managers. + """ + writer = csv.writer(csv_file) + columns = cls.get_columns() + sort_fields = cls.get_sort_fields() + kwargs = cls.get_additional_args() + select_related = cls.get_select_related() + prefetch_related = cls.get_prefetch_related() + filter_conditions = cls.get_filter_conditions(start_date, end_date) + computed_fields = cls.get_computed_fields() + related_table_fields = cls.get_related_table_fields() + + model_queryset = ( + cls.model().objects.select_related(*select_related) + .prefetch_related(*prefetch_related) + .filter(filter_conditions) + .order_by(*sort_fields) + .distinct() + ) + + # Convert the queryset to a dictionary (including annotated fields) + annotated_queryset = cls.annotate_and_retrieve_fields(model_queryset, computed_fields, related_table_fields, **kwargs) + models_dict = convert_queryset_to_dict(annotated_queryset, is_model=False) + + # Write to csv file before the write_csv + cls.write_csv_before(writer, start_date, end_date) + + # Write the csv file + cls.write_csv(writer, columns, models_dict) + + + @classmethod + def write_csv( + cls, + writer, + columns, + models_dict, + should_write_header=True, + ): + """Receives params from the parent methods and outputs a CSV with filtered and sorted objects. + Works with write_header as long as the same writer object is passed.""" + + rows = [] + for object in models_dict.values(): + try: + row = cls.parse_row(columns, object) + rows.append(row) + except ValueError as err: + logger.error(f"csv_export -> Error when parsing row: {err}") + continue + + if should_write_header: + write_header(writer, columns) + + writer.writerows(rows) + + @classmethod + def parse_row(cls, columns, model): + """ + Given a set of columns and a model dictionary, generate a new row from cleaned column data. + Must be implemented by subclasses + """ + pass + + +class NewDomainExport(BaseExport): """ A collection of functions which return csv files regarding the Domain model. """ + @classmethod + def model(cls): + # Return the model class that this export handles + return DomainInformation + + @classmethod + def update_queryset(cls, queryset, **kwargs): + """ + Returns an updated queryset. + + Add security_contact_email, invited_users, and managers to the queryset, + based on public_contacts, domain_invitations and user_domain_roles + passed through kwargs. + """ + public_contacts = kwargs.get('public_contacts', {}) + domain_invitations = kwargs.get('domain_invitations', {}) + user_domain_roles = kwargs.get('user_domain_roles', {}) + + annotated_domain_infos = [] + + # Create mapping of domain to a list of invited users and managers + invited_users_dict = defaultdict(list) + for domain, email in domain_invitations: + invited_users_dict[domain].append(email) + + managers_dict = defaultdict(list) + for domain, email in user_domain_roles: + managers_dict[domain].append(email) + + # Annotate with security_contact from public_contacts + for domain_info in queryset: + domain_info['security_contact_email'] = public_contacts.get(domain_info.get('domain__security_contact_registry_id')) + domain_info['invited_users'] = ', '.join(invited_users_dict.get(domain_info.get('domain__name'), [])) + domain_info['managers'] = ', '.join(managers_dict.get(domain_info.get('domain__name'), [])) + annotated_domain_infos.append(domain_info) + + if annotated_domain_infos: + return annotated_domain_infos + + return queryset + + # ============================================================= # + # Helper functions for django ORM queries. # + # We are using these rather than pure python for speed reasons. # + # ============================================================= # + + @classmethod + def get_all_security_emails(cls): + """ + Fetch all PublicContact entries and return a mapping of registry_id to email. + """ + public_contacts = PublicContact.objects.values_list('registry_id', 'email') + return {registry_id: email for registry_id, email in public_contacts} + + @classmethod + def get_all_domain_invitations(cls): + """ + Fetch all DomainInvitation entries and return a mapping of domain to email. + """ + domain_invitations = DomainInvitation.objects.filter(status="invited").values_list('domain__name', 'email') + return list(domain_invitations) + + @classmethod + def get_all_user_domain_roles(cls): + """ + Fetch all UserDomainRole entries and return a mapping of domain to user__email. + """ + user_domain_roles = UserDomainRole.objects.select_related('user').values_list('domain__name', 'user__email') + return list(user_domain_roles) + + @classmethod + def parse_row(cls, columns, model): + """ + Given a set of columns and a model dictionary, generate a new row from cleaned column data. + """ + + status = model.get("domain__state") + human_readable_status = Domain.State.get_state_label(status) + + expiration_date = model.get("domain__expiration_date") + if expiration_date is None: + expiration_date = "(blank)" + + first_ready_on = model.get("domain__first_ready") + if first_ready_on is None: + first_ready_on = "(blank)" + + domain_org_type = model.get("generic_org_type") + human_readable_domain_org_type = DomainRequest.OrganizationChoices.get_org_label(domain_org_type) + domain_federal_type = model.get("federal_type") + human_readable_domain_federal_type = BranchChoices.get_branch_label(domain_federal_type) + domain_type = human_readable_domain_org_type + if domain_federal_type and domain_org_type == DomainRequest.OrgChoicesElectionOffice.FEDERAL: + domain_type = f"{human_readable_domain_org_type} - {human_readable_domain_federal_type}" + + if model.get("domain__name") == "18f.gov": + print(f'domain_type {domain_type}') + print(f'federal_agency {model.get("federal_agency")}') + print(f'city {model.get("city")}') + + print(f'agency {model.get("agency")}') + + print(f'federal_agency__agency {model.get("federal_agency__agency")}') + + # create a dictionary of fields which can be included in output. + # "extra_fields" are precomputed fields (generated in the DB or parsed). + FIELDS = { + + "Domain name": model.get("domain__name"), + "Status": human_readable_status, + "First ready on": first_ready_on, + "Expiration date": expiration_date, + "Domain type": domain_type, + "Agency": model.get("federal_agency__agency"), + "Organization name": model.get("organization_name"), + "City": model.get("city"), + "State": model.get("state_territory"), + "AO": model.get("ao_name"), + "AO email": model.get("authorizing_official__email"), + "Security contact email": model.get("security_contact_email"), + "Created at": model.get("domain__created_at"), + "Deleted": model.get("domain__deleted"), + "Domain managers": model.get("managers"), + "Invited domain managers": model.get("invited_users"), + } + + row = [FIELDS.get(column, "") for column in columns] + return row + + @classmethod + def get_sliced_domains(cls, filter_condition): + """Get filtered domains counts sliced by org type and election office. + Pass distinct=True when filtering by permissions so we do not to count multiples + when a domain has more that one manager. + """ + + domains = DomainInformation.objects.all().filter(**filter_condition).distinct() + domains_count = domains.count() + federal = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.FEDERAL).distinct().count() + interstate = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.INTERSTATE).count() + state_or_territory = ( + domains.filter(generic_org_type=DomainRequest.OrganizationChoices.STATE_OR_TERRITORY).distinct().count() + ) + tribal = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.TRIBAL).distinct().count() + county = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.COUNTY).distinct().count() + city = domains.filter(generic_org_type=DomainRequest.OrganizationChoices.CITY).distinct().count() + special_district = ( + domains.filter(generic_org_type=DomainRequest.OrganizationChoices.SPECIAL_DISTRICT).distinct().count() + ) + school_district = ( + domains.filter(generic_org_type=DomainRequest.OrganizationChoices.SCHOOL_DISTRICT).distinct().count() + ) + election_board = domains.filter(is_election_board=True).distinct().count() + + return [ + domains_count, + federal, + interstate, + state_or_territory, + tribal, + county, + city, + special_district, + school_district, + election_board, + ] + + +class DomainDataType(NewDomainExport): + + @classmethod + def get_columns(cls): + """ + Overrides the columns for CSV export specific to DomainExport. + """ + return [ + "Domain name", + "Status", + "First ready on", + "Expiration date", + "Domain type", + "Agency", + "Organization name", + "City", + "State", + "AO", + "AO email", + "Security contact email", + "Domain managers", + "Invited domain managers", + ] + + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields. + """ + # Coalesce is used to replace federal_type of None with ZZZZZ + return [ + "organization_type", + Coalesce("federal_type", Value("ZZZZZ")), + "federal_agency", + "domain__name", + ] + + @classmethod + def get_additional_args(cls): + """ + Returns additional keyword arguments specific to DomainExport. + + Returns: + dict: Dictionary containing public_contacts, domain_invitations, and user_domain_roles. + """ + # Fetch all relevant PublicContact entries + public_contacts = cls.get_all_security_emails() + + # Fetch all relevant Invite entries + domain_invitations = cls.get_all_domain_invitations() + + # Fetch all relevant UserDomainRole entries + user_domain_roles = cls.get_all_user_domain_roles() + + return { + 'public_contacts': public_contacts, + 'domain_invitations': domain_invitations, + 'user_domain_roles': user_domain_roles, + } + + @classmethod + def get_select_related(cls): + """ + Get a list of tables to pass to select_related when building queryset. + """ + return [ + "domain", + "authorizing_official" + ] + + @classmethod + def get_prefetch_related(cls): + """ + Get a list of tables to pass to prefetch_related when building queryset. + """ + return [ + "permissions" + ] + + @classmethod + def get_computed_fields(cls, delimiter=", "): + """ + Get a dict of computed fields. + """ + return { + "ao_name": Concat( + Coalesce(F("authorizing_official__first_name"), Value("")), + Value(" "), + Coalesce(F("authorizing_official__last_name"), Value("")), + output_field=CharField(), + ), + } + + @classmethod + def get_related_table_fields(cls): + """ + Get a list of fields from related tables. + """ + print("DomainDataType::get_related_table_fields") + return [ + "domain__name", + "domain__state", + "domain__first_ready", + "domain__expiration_date", + "domain__created_at", + "domain__deleted", + "domain__security_contact_registry_id", + "authorizing_official__email", + "federal_agency__agency", + ] + + +class DomainDataFull(NewDomainExport): + + @classmethod + def get_columns(cls): + """ + Overrides the columns for CSV export specific to DomainExport. + """ + return [ + "Domain name", + "Domain type", + "Agency", + "Organization name", + "City", + "State", + "Security contact email", + ] + + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields. + """ + # Coalesce is used to replace federal_type of None with ZZZZZ + return [ + "organization_type", + Coalesce("federal_type", Value("ZZZZZ")), + "federal_agency", + "domain__name", + ] + + @classmethod + def get_additional_args(cls): + """ + Returns additional keyword arguments specific to DomainExport. + + Returns: + dict: Dictionary containing public_contacts, domain_invitations, and user_domain_roles. + """ + # Fetch all relevant PublicContact entries + public_contacts = cls.get_all_security_emails() + + return { + 'public_contacts': public_contacts, + } + + @classmethod + def get_select_related(cls): + """ + Get a list of tables to pass to select_related when building queryset. + """ + return [ + "domain" + ] + + @classmethod + def get_filter_conditions(cls, start_date=None, end_date=None): + """ + Get a Q object of filter conditions to filter when building queryset. + """ + return Q( + domain__state__in = [ + Domain.State.READY, + Domain.State.DNS_NEEDED, + Domain.State.ON_HOLD, + ], + ) + + @classmethod + def get_computed_fields(cls, delimiter=", "): + """ + Get a dict of computed fields. + """ + return { + "ao_name": Concat( + Coalesce(F("authorizing_official__first_name"), Value("")), + Value(" "), + Coalesce(F("authorizing_official__last_name"), Value("")), + output_field=CharField(), + ), + } + + @classmethod + def get_related_table_fields(cls): + """ + Get a list of fields from related tables. + """ + return [ + "domain__name", + "domain__security_contact_registry_id", + "federal_agency__agency", + ] + + +class DomainDataFederal(NewDomainExport): + + @classmethod + def get_columns(cls): + """ + Overrides the columns for CSV export specific to DomainExport. + """ + return [ + "Domain name", + "Domain type", + "Agency", + "Organization name", + "City", + "State", + "Security contact email", + ] + + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields. + """ + # Coalesce is used to replace federal_type of None with ZZZZZ + return [ + "organization_type", + Coalesce("federal_type", Value("ZZZZZ")), + "federal_agency", + "domain__name", + ] + + @classmethod + def get_additional_args(cls): + """ + Returns additional keyword arguments specific to DomainExport. + + Returns: + dict: Dictionary containing public_contacts, domain_invitations, and user_domain_roles. + """ + # Fetch all relevant PublicContact entries + public_contacts = cls.get_all_security_emails() + + return { + 'public_contacts': public_contacts, + } + + @classmethod + def get_select_related(cls): + """ + Get a list of tables to pass to select_related when building queryset. + """ + return [ + "domain" + ] + + @classmethod + def get_filter_conditions(cls, start_date=None, end_date=None): + """ + Get a Q object of filter conditions to filter when building queryset. + """ + return Q( + organization_type__icontains="federal", + domain__state__in=[ + Domain.State.READY, + Domain.State.DNS_NEEDED, + Domain.State.ON_HOLD, + ] + ) + + @classmethod + def get_computed_fields(cls, delimiter=", "): + """ + Get a dict of computed fields. + """ + return { + "ao_name": Concat( + Coalesce(F("authorizing_official__first_name"), Value("")), + Value(" "), + Coalesce(F("authorizing_official__last_name"), Value("")), + output_field=CharField(), + ), + } + + @classmethod + def get_related_table_fields(cls): + """ + Get a list of fields from related tables. + """ + return [ + "domain__name", + "domain__security_contact_registry_id", + "federal_agency__agency", + ] + + +class DomainGrowth(NewDomainExport): + + @classmethod + def get_columns(cls): + """ + Overrides the columns for CSV export specific to DomainExport. + """ + return [ + "Domain name", + "Domain type", + "Agency", + "Organization name", + "City", + "State", + "Status", + "Expiration date", + "Created at", + "First ready", + "Deleted", + ] + + # TODO: The below sort is not working properly + + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields. + """ + return [ + '-domain__state', + 'domain__first_ready', + 'domain__deleted', + 'domain__name', + ] + + @classmethod + def get_select_related(cls): + """ + Get a list of tables to pass to select_related when building queryset. + """ + return [ + "domain" + ] + + @classmethod + def get_filter_conditions(cls, start_date=None, end_date=None): + """ + Get a Q object of filter conditions to filter when building queryset. + """ + filter_ready = Q( + domain__state__in=[Domain.State.READY], + domain__first_ready__gte=start_date, + domain__first_ready__lte=end_date + ) + filter_deleted = Q( + domain__state__in=[Domain.State.DELETED], + domain__deleted__gte=start_date, + domain__deleted__lte=end_date + ) + return filter_ready | filter_deleted + + @classmethod + def get_related_table_fields(cls): + """ + Get a list of fields from related tables. + """ + return [ + "domain__name", + "domain__state", + "domain__first_ready", + "domain__expiration_date", + "domain__created_at", + "domain__deleted", + "federal_agency__agency", + ] + + +class DomainManaged(NewDomainExport): + + @classmethod + def get_columns(cls): + """ + Overrides the columns for CSV export specific to DomainExport. + """ + return [ + "Domain name", + "Domain type", + "Domain managers", + "Invited domain managers", + ] + + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields. + """ + return [ + 'domain__name', + ] + + @classmethod + def get_select_related(cls): + """ + Get a list of tables to pass to select_related when building queryset. + """ + return [ + "domain" + ] + + @classmethod + def get_prefetch_related(cls): + """ + Get a list of tables to pass to prefetch_related when building queryset. + """ + return [ + "permissions" + ] + + @classmethod + def get_filter_conditions(cls, start_date=None, end_date=None): + """ + Get a Q object of filter conditions to filter when building queryset. + """ + end_date_formatted = format_end_date(end_date) + return Q( + domain__permissions__isnull=False, + domain__first_ready__lte=end_date_formatted, + ) + + + @classmethod + def get_additional_args(cls): + """ + Returns additional keyword arguments specific to DomainExport. + + Returns: + dict: Dictionary containing public_contacts, domain_invitations, and user_domain_roles. + """ + + # Fetch all relevant Invite entries + domain_invitations = cls.get_all_domain_invitations() + + # Fetch all relevant UserDomainRole entries + user_domain_roles = cls.get_all_user_domain_roles() + + return { + 'domain_invitations': domain_invitations, + 'user_domain_roles': user_domain_roles, + } + + @classmethod + def get_related_table_fields(cls): + """ + Get a list of fields from related tables. + """ + return [ + "domain__name", + ] + + @classmethod + def write_csv_before(cls, csv_writer, start_date=None, end_date=None): + """ + Write to csv file before the write_csv method. + """ + start_date_formatted = format_start_date(start_date) + end_date_formatted = format_end_date(end_date) + filter_managed_domains_start_date = { + "domain__permissions__isnull": False, + "domain__first_ready__lte": start_date_formatted, + } + managed_domains_sliced_at_start_date = cls.get_sliced_domains(filter_managed_domains_start_date) + + csv_writer.writerow(["MANAGED DOMAINS COUNTS AT START DATE"]) + csv_writer.writerow( + [ + "Total", + "Federal", + "Interstate", + "State or territory", + "Tribal", + "County", + "City", + "Special district", + "School district", + "Election office", + ] + ) + csv_writer.writerow(managed_domains_sliced_at_start_date) + csv_writer.writerow([]) + + filter_managed_domains_end_date = { + "domain__permissions__isnull": False, + "domain__first_ready__lte": end_date_formatted, + } + managed_domains_sliced_at_end_date = cls.get_sliced_domains(filter_managed_domains_end_date) + + csv_writer.writerow(["MANAGED DOMAINS COUNTS AT END DATE"]) + csv_writer.writerow( + [ + "Total", + "Federal", + "Interstate", + "State or territory", + "Tribal", + "County", + "City", + "Special district", + "School district", + "Election office", + ] + ) + csv_writer.writerow(managed_domains_sliced_at_end_date) + csv_writer.writerow([]) + + +class DomainUnmanaged(NewDomainExport): + + @classmethod + def get_columns(cls): + """ + Overrides the columns for CSV export specific to DomainExport. + """ + return [ + "Domain name", + "Domain type", + ] + + @classmethod + def get_sort_fields(cls): + """ + Returns the sort fields. + """ + return [ + 'domain__name', + ] + + @classmethod + def get_select_related(cls): + """ + Get a list of tables to pass to select_related when building queryset. + """ + return [ + "domain" + ] + + @classmethod + def get_prefetch_related(cls): + """ + Get a list of tables to pass to prefetch_related when building queryset. + """ + return [ + "permissions" + ] + + @classmethod + def get_filter_conditions(cls, start_date=None, end_date=None): + """ + Get a Q object of filter conditions to filter when building queryset. + """ + end_date_formatted = format_end_date(end_date) + return Q( + domain__permissions__isnull=True, + domain__first_ready__lte=end_date_formatted, + ) + + @classmethod + def get_related_table_fields(cls): + """ + Get a list of fields from related tables. + """ + return [ + "domain__name", + ] + + @classmethod + def write_csv_before(cls, csv_writer, start_date=None, end_date=None): + """ + Write to csv file before the write_csv method. + + """ + start_date_formatted = format_start_date(start_date) + end_date_formatted = format_end_date(end_date) + filter_unmanaged_domains_start_date = { + "domain__permissions__isnull": True, + "domain__first_ready__lte": start_date_formatted, + } + unmanaged_domains_sliced_at_start_date = cls.get_sliced_domains(filter_unmanaged_domains_start_date) + + csv_writer.writerow(["UNMANAGED DOMAINS AT START DATE"]) + csv_writer.writerow( + [ + "Total", + "Federal", + "Interstate", + "State or territory", + "Tribal", + "County", + "City", + "Special district", + "School district", + "Election office", + ] + ) + csv_writer.writerow(unmanaged_domains_sliced_at_start_date) + csv_writer.writerow([]) + + filter_unmanaged_domains_end_date = { + "domain__permissions__isnull": True, + "domain__first_ready__lte": end_date_formatted, + } + unmanaged_domains_sliced_at_end_date = cls.get_sliced_domains(filter_unmanaged_domains_end_date) + + csv_writer.writerow(["UNMANAGED DOMAINS AT END DATE"]) + csv_writer.writerow( + [ + "Total", + "Federal", + "Interstate", + "State or territory", + "Tribal", + "County", + "City", + "Special district", + "School district", + "Election office", + ] + ) + csv_writer.writerow(unmanaged_domains_sliced_at_end_date) + csv_writer.writerow([]) + + +class DomainExport: @classmethod def export_data_type_to_csv(cls, csv_file): """ diff --git a/src/registrar/views/admin_views.py b/src/registrar/views/admin_views.py index 9b013e36c..ba57737cb 100644 --- a/src/registrar/views/admin_views.py +++ b/src/registrar/views/admin_views.py @@ -49,8 +49,8 @@ class AnalyticsView(View): "domain__permissions__isnull": False, "domain__first_ready__lte": end_date_formatted, } - managed_domains_sliced_at_start_date = csv_export.get_sliced_domains(filter_managed_domains_start_date) - managed_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_managed_domains_end_date) + managed_domains_sliced_at_start_date = csv_export.NewDomainExport.get_sliced_domains(filter_managed_domains_start_date) + managed_domains_sliced_at_end_date = csv_export.NewDomainExport.get_sliced_domains(filter_managed_domains_end_date) filter_unmanaged_domains_start_date = { "domain__permissions__isnull": True, @@ -60,8 +60,8 @@ class AnalyticsView(View): "domain__permissions__isnull": True, "domain__first_ready__lte": end_date_formatted, } - unmanaged_domains_sliced_at_start_date = csv_export.get_sliced_domains(filter_unmanaged_domains_start_date) - unmanaged_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_unmanaged_domains_end_date) + unmanaged_domains_sliced_at_start_date = csv_export.NewDomainExport.get_sliced_domains(filter_unmanaged_domains_start_date) + unmanaged_domains_sliced_at_end_date = csv_export.NewDomainExport.get_sliced_domains(filter_unmanaged_domains_end_date) filter_ready_domains_start_date = { "domain__state__in": [models.Domain.State.READY], @@ -71,8 +71,8 @@ class AnalyticsView(View): "domain__state__in": [models.Domain.State.READY], "domain__first_ready__lte": end_date_formatted, } - ready_domains_sliced_at_start_date = csv_export.get_sliced_domains(filter_ready_domains_start_date) - ready_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_ready_domains_end_date) + ready_domains_sliced_at_start_date = csv_export.NewDomainExport.get_sliced_domains(filter_ready_domains_start_date) + ready_domains_sliced_at_end_date = csv_export.NewDomainExport.get_sliced_domains(filter_ready_domains_end_date) filter_deleted_domains_start_date = { "domain__state__in": [models.Domain.State.DELETED], @@ -82,8 +82,8 @@ class AnalyticsView(View): "domain__state__in": [models.Domain.State.DELETED], "domain__deleted__lte": end_date_formatted, } - deleted_domains_sliced_at_start_date = csv_export.get_sliced_domains(filter_deleted_domains_start_date) - deleted_domains_sliced_at_end_date = csv_export.get_sliced_domains(filter_deleted_domains_end_date) + deleted_domains_sliced_at_start_date = csv_export.NewDomainExport.get_sliced_domains(filter_deleted_domains_start_date) + deleted_domains_sliced_at_end_date = csv_export.NewDomainExport.get_sliced_domains(filter_deleted_domains_end_date) filter_requests_start_date = { "created_at__lte": start_date_formatted, @@ -142,7 +142,8 @@ class ExportDataType(View): # match the CSV example with all the fields response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = 'attachment; filename="domains-by-type.csv"' - csv_export.DomainExport.export_data_type_to_csv(response) + #csv_export.DomainExport.export_data_type_to_csv(response) + csv_export.DomainDataType.export_data_to_csv(response) return response @@ -151,7 +152,8 @@ class ExportDataFull(View): # Smaller export based on 1 response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = 'attachment; filename="current-full.csv"' - csv_export.DomainExport.export_data_full_to_csv(response) + #csv_export.DomainExport.export_data_full_to_csv(response) + csv_export.DomainDataFull.export_data_to_csv(response) return response @@ -160,7 +162,8 @@ class ExportDataFederal(View): # Federal only response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = 'attachment; filename="current-federal.csv"' - csv_export.DomainExport.export_data_federal_to_csv(response) + #csv_export.DomainExport.export_data_federal_to_csv(response) + csv_export.DomainDataFederal.export_data_to_csv(response) return response @@ -182,7 +185,8 @@ class ExportDataDomainsGrowth(View): response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = f'attachment; filename="domain-growth-report-{start_date}-to-{end_date}.csv"' - csv_export.DomainExport.export_data_domain_growth_to_csv(response, start_date, end_date) + #csv_export.DomainExport.export_data_domain_growth_to_csv(response, start_date, end_date) + csv_export.DomainGrowth.export_data_to_csv(response, start_date, end_date) return response @@ -205,7 +209,8 @@ class ExportDataManagedDomains(View): end_date = request.GET.get("end_date", "") response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = f'attachment; filename="managed-domains-{start_date}-to-{end_date}.csv"' - csv_export.DomainExport.export_data_managed_domains_to_csv(response, start_date, end_date) + #csv_export.DomainExport.export_data_managed_domains_to_csv(response, start_date, end_date) + csv_export.DomainManaged.export_data_to_csv(response, start_date, end_date) return response @@ -215,7 +220,8 @@ class ExportDataUnmanagedDomains(View): start_date = request.GET.get("start_date", "") end_date = request.GET.get("end_date", "") response = HttpResponse(content_type="text/csv") - response["Content-Disposition"] = f'attachment; filename="unamanaged-domains-{start_date}-to-{end_date}.csv"' - csv_export.DomainExport.export_data_unmanaged_domains_to_csv(response, start_date, end_date) + response["Content-Disposition"] = f'attachment; filename="unmanaged-domains-{start_date}-to-{end_date}.csv"' + #csv_export.DomainExport.export_data_unmanaged_domains_to_csv(response, start_date, end_date) + csv_export.DomainUnmanaged.export_data_to_csv(response, start_date, end_date) return response