diff --git a/src/registrar/models/utility/generic_helper.py b/src/registrar/models/utility/generic_helper.py index ca6ce6c31..45e0b0208 100644 --- a/src/registrar/models/utility/generic_helper.py +++ b/src/registrar/models/utility/generic_helper.py @@ -298,3 +298,27 @@ def replace_url_queryparams(url_to_modify: str, query_params, convert_list_to_cs new_url = urlunparse(url_parts) return new_url + + +@staticmethod +def convert_queryset_to_dict(queryset, is_model=True, key="id"): + """ + Transforms a queryset into a dictionary keyed by a specified key (like "id"). + + Parameters: + requests (QuerySet or list of dicts): Input data. + is_model (bool): Indicates if each item in 'queryset' are model instances (True) or dictionaries (False). + key (str): Key or attribute to use for the resulting dictionary's keys. + + Returns: + dict: Dictionary with keys derived from 'key' and values corresponding to items in 'queryset'. + """ + + if is_model: + request_dict = {getattr(value, key): value for value in queryset} + else: + # Querysets sometimes contain sets of dictionaries. + # Calling .values is an example of this. + request_dict = {value[key]: value for value in queryset} + + return request_dict \ No newline at end of file diff --git a/src/registrar/utility/csv_export.py b/src/registrar/utility/csv_export.py index c3c5a53ef..65e0a0074 100644 --- a/src/registrar/utility/csv_export.py +++ b/src/registrar/utility/csv_export.py @@ -14,6 +14,7 @@ from django.utils import timezone from django.core.paginator import Paginator from django.db.models.functions import Concat, Coalesce from django.contrib.postgres.aggregates import StringAgg +from registrar.models.utility.generic_helper import convert_queryset_to_dict from registrar.templatetags.custom_filters import get_region from registrar.utility.enums import DefaultEmail @@ -713,6 +714,9 @@ class DomainRequestExport: Purely organizational -- all functions are independent. """ + # Get all the field names of the DomainRequest object + domain_request_fields = set(field.name for field in DomainRequest._meta.get_fields()) + all_columns = [ "Domain request", "Submitted at", @@ -743,111 +747,8 @@ class DomainRequestExport: "Investigator", ] - @staticmethod - def write_csv_for_requests( - writer, - columns, - requests, - requests_with_extra_fields=None, - should_write_header=True, - ): - """Receives params from the parent methods and outputs a CSV with filtered and sorted requests. - Works with write_header as long as the same writer object is passed.""" - - # Reduce the memory overhead when performing the write operation - paginator = Paginator(requests, 1000) - total_body_rows = [] - for page_num in paginator.page_range: - page = paginator.page(page_num) - rows = [] - for request in page.object_list: - try: - extra = requests_with_extra_fields.get(request.id) if requests_with_extra_fields else {} - row = DomainRequestExport.parse_row_for_requests(columns, request, extra) - rows.append(row) - except ValueError as err: - logger.error(f"csv_export -> Error when parsing row: {err}") - continue - total_body_rows.extend(rows) - - if should_write_header: - write_header(writer, columns) - writer.writerows(total_body_rows) - - @staticmethod - def parse_row_for_requests(columns, request: DomainRequest, extra_fields: dict): - """ - Given a set of columns and a request dictionary, - generate a new row from cleaned column data - """ - - # Handle the domain_type field. Defaults to the wrong format. - org_type = request.organization_type - if org_type and extra_fields.get("domain_type") is None: - readable_org_type = DomainRequest.OrganizationChoices.get_org_label(org_type) - extra_fields["domain_type"] = readable_org_type - - # Handle the federal_type field. Defaults to the wrong format. - federal_type = request.federal_type - if federal_type and extra_fields.get("human_readable_federal_type") is None: - extra_fields["human_readable_federal_type"] = DomainRequest.BranchChoices.get_branch_label(federal_type) - - # Handle the status field. Defaults to the wrong format. - status = request.status - if status and extra_fields.get("status_display") is None: - extra_fields["status_display"] = DomainRequest.DomainRequestStatus.get_status_label(status) - - # Handle the region field. - state_territory = request.state_territory - if state_territory and extra_fields.get("region") is None: - extra_fields["region"] = get_region(state_territory) - - human_readable_election_board = "N/A" - if request.is_election_board is not None: - human_readable_election_board = "Yes" if request.is_election_board else "No" - - requested_domain = request.requested_domain - if extra_fields.get("requested_domain_name") is None: - extra_fields["requested_domain_name"] = getattr(requested_domain, "name", "No requested domain") - - # create a dictionary of fields which can be included in output. - # "extra_fields" are precomputed fields (generated in the DB or parsed). - FIELDS = { - "Domain request": extra_fields.get("requested_domain_name"), - "Status": extra_fields.get("status_display"), - "Domain type": extra_fields.get("domain_type"), - "Federal type": extra_fields.get("human_readable_federal_type"), - "Region": extra_fields.get("region"), - "Creator approved domains count": extra_fields.get("creator_approved_domains_count", 0), - "Creator active requests count": extra_fields.get("creator_active_requests_count", 0), - "Alternative domains": extra_fields.get("all_alternative_domains"), - "Request additional details": extra_fields.get("additional_details"), - "Other contacts": extra_fields.get("all_other_contacts"), - "Current websites": extra_fields.get("all_current_websites"), - # Normal fields - "Federal agency": request.federal_agency.agency if request.federal_agency else None, - "AO first name": request.authorizing_official.first_name, - "AO last name": request.authorizing_official.last_name, - "AO email": request.authorizing_official.email, - "AO title/role": request.authorizing_official.title, - "Creator first name": request.creator.first_name, - "Creator last name": request.creator.last_name, - "Creator email": request.creator.email, - "Organization name": request.organization_name, - "Election office": human_readable_election_board, - "City": request.city, - "State/territory": request.state_territory, - "Request purpose": request.purpose, - "CISA regional representative": request.cisa_representative_email, - "Investigator": request.investigator.email if request.investigator else None, - "Submitted at": request.submission_date, - } - - row = [FIELDS.get(column, "") for column in columns] - return row - - @staticmethod - def export_data_requests_growth_to_csv(csv_file, start_date, end_date): + @classmethod + def export_data_requests_growth_to_csv(cls, csv_file, start_date, end_date): """ Growth report: Receive start and end dates from the view, parse them. @@ -876,33 +777,27 @@ class DomainRequestExport: all_requests = DomainRequest.objects.filter(**filter_condition).order_by(*sort_fields).distinct() - # Convert the request to a querystring. Only grab what we need. - values_to_return = ["id", "generic_org_type", "federal_type", "submission_date"] - extra_values = DomainRequestExport.annotate_request_and_return_values(all_requests, {}, values_to_return) - - # Override the default value for domain_type - for request in extra_values.values(): - # Handle the domain_type field. Defaults to the wrong variant. - org_type = request.get("generic_org_type") - federal_type = request.get("federal_type") - - request["domain_type"] = None - if org_type: - readable_org_type = DomainRequest.OrganizationChoices.get_org_label(org_type) - if federal_type: - readable_federal_type = DomainRequest.BranchChoices.get_branch_label(federal_type) - request["domain_type"] = f"{readable_org_type} - {readable_federal_type}" - else: - request["domain_type"] = readable_org_type - - DomainRequestExport.write_csv_for_requests(writer, columns, all_requests, extra_values, should_write_header=True) + annotated_requests = cls.annotate_and_retrieve_fields(all_requests, {"requested_domain__name"}) + requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False) + cls.write_csv_for_requests(writer, columns, requests_dict) + @classmethod def export_full_domain_request_report(cls, csv_file): writer = csv.writer(csv_file) + requests = ( + DomainRequest.objects.select_related( + "creator", "authorizing_official", "federal_agency", "investigator", "requested_domain" + ) + .prefetch_related("current_websites", "other_contacts", "alternative_domains") + .exclude(status__in=[DomainRequest.DomainRequestStatus.STARTED]) + .order_by("status","requested_domain__name",) + .distinct() + ) + + # Annotations are custom columns returned to the queryset (AKA: computed in the DB) annotations = { - "additional_details": DomainRequestExport.get_additional_details_query(), "creator_approved_domains_count": DomainRequestExport.get_creator_approved_domains_count_query(), "creator_active_requests_count": DomainRequestExport.get_creator_active_requests_count_query(), "all_other_contacts": DomainRequestExport.get_all_other_contacts_query(), @@ -910,95 +805,166 @@ class DomainRequestExport: "all_alternative_domains": StringAgg("alternative_domains__website", delimiter=" | ", distinct=True), } - values_to_return = [ - # Return out custom fields - "all_alternative_domains", - "all_other_contacts", - "all_current_websites", - "additional_details", - "creator_approved_domains_count", - "creator_active_requests_count", + # .values can't go two levels deep (just returns the id) - so we have to include this. + additional_values = [ + "requested_domain__name", + "federal_agency__agency", + "authorizing_official__first_name", + "authorizing_official__last_name", + "authorizing_official__email", + "authorizing_official__title", + "creator__first_name", + "creator__last_name", + "creator__email", + "investigator__email" ] - requests = ( - DomainRequest.objects.select_related( - "creator", "authorizing_official", "federal_agency", "investigator", "requested_domain" - ) - .exclude(status__in=[DomainRequest.DomainRequestStatus.STARTED]) - .order_by("status","requested_domain__name",) - .distinct() - ) + # Convert the domain request queryset to a dictionary (including annotated fields) + annotated_requests = cls.annotate_and_retrieve_fields(requests, annotations, additional_values) + requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False) - extra_values = DomainRequestExport.annotate_request_and_return_values(requests, annotations, values_to_return) - DomainRequestExport.write_csv_for_requests(writer, cls.all_columns, requests, extra_values, should_write_header=True) + # Write the csv file + cls.write_csv_for_requests(writer, cls.all_columns, requests_dict) @staticmethod - def annotate_request_and_return_values(requests, annotations, values_to_return, return_dict=True) -> QuerySet | dict: + def write_csv_for_requests( + writer, + columns, + requests_dict, + should_write_header=True, + ): + """Receives params from the parent methods and outputs a CSV with filtered and sorted requests. + Works with write_header as long as the same writer object is passed.""" + + rows = [] + for request in requests_dict.values(): + try: + row = DomainRequestExport.parse_row_for_requests(columns, request) + rows.append(row) + except ValueError as err: + logger.error(f"csv_export -> Error when parsing row: {err}") + continue + + if should_write_header: + write_header(writer, columns) + + writer.writerows(rows) + + @staticmethod + def parse_row_for_requests(columns, request, show_expanded_org_type=False): """ - Annotates a queryset with specified annotations and retrieves specified fields. + Given a set of columns and a request dictionary, + generate a new row from cleaned column data + """ + + # FK fields + federal_agency = "federal_agency" + ao = "authorizing_official" + creator = "creator" + investigator = "investigator" + + # Handle the federal_type field. Defaults to the wrong format. + federal_type = request.get("federal_type") + human_readable_federal_type = DomainRequest.BranchChoices.get_branch_label(federal_type) if federal_type else None + + # Handle the org_type field. Either "Tribal" or "Federal - Executive". + org_type = request.get("organization_type") + human_readable_org_type = DomainRequest.OrganizationChoices.get_org_label(org_type) if org_type else None + if human_readable_org_type and human_readable_federal_type and show_expanded_org_type: + human_readable_org_type = f"{human_readable_org_type} - {human_readable_federal_type}" + + # Handle the status field. Defaults to the wrong format. + status = request.get("status") + status_display = DomainRequest.DomainRequestStatus.get_status_label(status) if status else None + + # Handle the region field. + state_territory = request.get("state_territory") + region = get_region(state_territory) if state_territory else None + + requested_domain = request.get("requested_domain__name") + requested_domain_name = requested_domain if requested_domain else "No requested domain" + + # Handle the election field. N/A if None, "Yes"/"No" if boolean + human_readable_election_board = "N/A" + if request.get("is_election_board") is not None: + human_readable_election_board = "Yes" if request.is_election_board else "No" + + # Handle the additional details field. Pipe seperated + details = [request.get("cisa_representative_email"), request.get("anything_else")] + additional_details = " | ".join([field for field in details if field is not None]) + + # create a dictionary of fields which can be included in output. + # "extra_fields" are precomputed fields (generated in the DB or parsed). + FIELDS = { + # Parsed fields - defined above. + "Domain request": requested_domain_name, + "Region": region, + "Status": status_display, + "Election office": human_readable_election_board, + "Federal type": human_readable_federal_type, + "Domain type": human_readable_org_type, + "Request additional details": additional_details, + # Annotated fields - passed into the request dict. + "Creator approved domains count": request.get("creator_approved_domains_count", 0), + "Creator active requests count": request.get("creator_active_requests_count", 0), + "Alternative domains": request.get("all_alternative_domains"), + "Other contacts": request.get("all_other_contacts"), + "Current websites": request.get("all_current_websites"), + # Untouched FK fields - passed into the request dict. + "Federal agency": request.get(f"{federal_agency}__agency"), + "AO first name": request.get(f"{ao}__first_name"), + "AO last name": request.get(f"{ao}__last_name"), + "AO email": request.get(f"{ao}__email"), + "AO title/role": request.get(f"{ao}__title"), + "Creator first name": request.get(f"{creator}__first_name"), + "Creator last name": request.get(f"{creator}__last_name"), + "Creator email": request.get(f"{creator}__email"), + "Investigator": request.get(f"{investigator}__email"), + # Untouched fields + "Organization name": request.get("organization_name"), + "City": request.get("city"), + "State/territory": request.get("state_territory"), + "Request purpose": request.get("purpose"), + "CISA regional representative": request.get("cisa_representative_email"), + "Submitted at": request.get("submission_date"), + } + + row = [FIELDS.get(column, "") for column in columns] + return row + + @classmethod + def annotate_and_retrieve_fields(cls, requests, annotations, additional_values=None) -> QuerySet: + """ + Applies annotations to a queryset and retrieves specified fields, + including class-defined and annotation-defined. Parameters: - requests (QuerySet): The initial queryset to which annotations will be applied. - annotations (dict): A dictionary where keys are names of the new fields to create, and values are - expressions that describe how to calculate these fields. - values_to_return (list): A list of strings that specify which fields should be included in the final - output, in addition to the 'id' field which is included by default. - return_dict (bool): If True, the method returns a dictionary where each key is an 'id' of an item - and each value is a dictionary of the fields specified in `values_to_return`. - If False, the method returns a QuerySet containing dictionaries of the requested fields. + requests (QuerySet): Initial queryset. + annotations (dict, optional): Fields to compute {field_name: expression}. + additional_values (list, optional): Extra fields to retrieve; defaults to annotation keys if None. + Returns: - QuerySet | dict: Depending on the value of `return_dict`, returns either a QuerySet or a dictionary. - The QuerySet contains dictionaries for each record with the specified fields. - The dictionary contains keys as record ids and values as dictionaries of the specified fields. + QuerySet: Contains dictionaries with the specified fields for each record. """ - queryset = requests.annotate(**annotations).values("id", *values_to_return) - if return_dict: - requests_dict = {value["id"]: value for value in queryset} - return requests_dict + if annotations is None: + annotations = {} + if additional_values is None: + additional_values = [] + + # We can infer that if we're passing in annotations, + # we want to grab the result of said annotation. + if annotations: + additional_values.extend(annotations.keys()) + + queryset = requests.annotate(**annotations).values(*cls.domain_request_fields, *additional_values) return queryset # ============================================================= # # Helper functions for django ORM queries. # # We are using these rather than pure python for speed reasons. # # ============================================================= # - - # AXE THIS - @staticmethod - def get_additional_details_query(default_message=None, delimiter=" | "): - """ - A SQL case statement for DomainRequest.anything_else and DomainRequest.cisa_representative_email. - - When ran, returns a concatenation of anything_else and cisa_representative_email - seperated by " | ". - - Equivalent to: - - `f"{cisa_rep} | {anything_else}" If anything_else or cisa_rep else None` - """ - - # TODO this should be replaced with cisa_representative_first_name and cisa_representative_last_name - additional_details_query = Case( - When( - cisa_representative_email__isnull=False, - anything_else__isnull=False, - then=Concat(F("cisa_representative_email"), Value(delimiter), F("anything_else")), - ), - When( - cisa_representative_email__isnull=True, - anything_else__isnull=False, - then=F("anything_else"), - ), - When( - cisa_representative_email__isnull=False, - anything_else__isnull=True, - then=F("cisa_representative_email"), - ), - default=Value(default_message), - output_field=CharField(), - ) - - return additional_details_query @staticmethod def get_all_other_contacts_query(delimiter=" | "):