Add some data to report

This commit is contained in:
zandercymatics 2024-11-14 10:22:41 -07:00
parent 1e43974a11
commit 9ac8a3e8bc
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
3 changed files with 83 additions and 29 deletions

View file

@ -151,11 +151,23 @@ class MemberExport(BaseExport):
if not portfolio: if not portfolio:
return {} return {}
# Union the two querysets to combine UserPortfolioPermission + invites # Union the two querysets to combine UserPortfolioPermission + invites.
permissions = UserPortfolioPermissionModelDict.get_annotated_queryset(portfolio) # Unions cannot have a col mismatch, so we must clamp what is returned here.
invitations = PortfolioInvitationModelDict.get_annotated_queryset(portfolio) shared_columns = [
objects = permissions.union(invitations) "id",
return convert_queryset_to_dict(objects, is_model=False) "first_name",
"last_name",
"email_display",
"last_active",
"roles",
"additional_permissions_display",
"member_display",
"domain_info",
"source",
]
permissions = UserPortfolioPermissionModelDict.get_annotated_queryset(portfolio, csv_report=True).values(*shared_columns)
invitations = PortfolioInvitationModelDict.get_annotated_queryset(portfolio, csv_report=True).values(*shared_columns)
return convert_queryset_to_dict(permissions.union(invitations), is_model=False)
@classmethod @classmethod
def get_columns(cls): def get_columns(cls):
@ -183,18 +195,32 @@ class MemberExport(BaseExport):
""" """
is_admin = UserPortfolioRoleChoices.ORGANIZATION_ADMIN in (model.get("roles") or []) is_admin = UserPortfolioRoleChoices.ORGANIZATION_ADMIN in (model.get("roles") or [])
domains = ",".join(model.get("domain_info")) if model.get("domain_info") else ""
FIELDS = { FIELDS = {
"Email": model.get("email"), "Email": model.get("email_display"),
"Organization admin": is_admin, "Organization admin": is_admin,
"Invited by": "TODO", "Invited by": model.get("source"),
"Last active": "TODO", "Last active": model.get("last_active"),
"Domain requests": "TODO", "Domain requests": "TODO",
"Member management": "TODO", "Member management": "TODO",
"Domain management": "TODO", "Domain management": "TODO",
"Number of domains": "TODO", "Number of domains": "TODO",
"Domains": "TODO", # Quote enclose the domain list
# Note: this will only enclose when more than two items exist
"Domains": domains,
} }
# "id",
# "first_name",
# "last_name",
# "email_display",
# "last_active",
# "roles",
# "additional_permissions_display",
# "member_display",
# "domain_info",
# "source",
row = [FIELDS.get(column, "") for column in columns] row = [FIELDS.get(column, "") for column in columns]
return row return row

View file

@ -6,9 +6,8 @@ from registrar.models import (
DomainInvitation, DomainInvitation,
PortfolioInvitation, PortfolioInvitation,
) )
from django.db.models import Case, CharField, F, ManyToManyField, Q, QuerySet, Value, When, TextField, OuterRef, Subquery from django.db.models import CharField, F, ManyToManyField, Q, QuerySet, Value, TextField, OuterRef, Subquery, Func, Case, When
from django.db.models.functions import Cast from django.db.models.functions import Concat, Coalesce, Cast
from django.db.models.functions import Concat, Coalesce
from registrar.models.user_portfolio_permission import UserPortfolioPermission from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.generic_helper import convert_queryset_to_dict from registrar.models.utility.generic_helper import convert_queryset_to_dict
from registrar.models.utility.orm_helper import ArrayRemove from registrar.models.utility.orm_helper import ArrayRemove
@ -200,7 +199,7 @@ class UserPortfolioPermissionModelDict(BaseModelDict):
return Q(portfolio=portfolio) return Q(portfolio=portfolio)
@classmethod @classmethod
def get_annotated_fields(cls, portfolio): def get_annotated_fields(cls, portfolio, csv_report=False):
""" """
Get a dict of computed fields. These are fields that do not exist on the model normally Get a dict of computed fields. These are fields that do not exist on the model normally
and will be passed to .annotate() when building a queryset. and will be passed to .annotate() when building a queryset.
@ -209,12 +208,34 @@ class UserPortfolioPermissionModelDict(BaseModelDict):
# Return nothing # Return nothing
return {} return {}
# Tweak the queries slightly to only return the data we need.
# When returning data for the csv report we:
# 1. Only return the domain name for 'domain_info'
# 2. Return a formatted date for 'last_active'
# These are just optimizations that are better done in SQL as opposed to python.
if csv_report:
domain_query = F("user__permissions__domain__name")
last_active_query = Func(
F("user__last_login"),
Value("FMMonth DD, YYYY"),
function="to_char",
output_field=TextField()
)
else:
domain_query = Concat(
F("user__permissions__domain_id"),
Value(":"),
F("user__permissions__domain__name"),
output_field=CharField(),
)
last_active_query = Cast(F("user__last_login"), output_field=TextField())
return { return {
"first_name": F("user__first_name"), "first_name": F("user__first_name"),
"last_name": F("user__last_name"), "last_name": F("user__last_name"),
"email_display": F("user__email"), "email_display": F("user__email"),
"last_active": Coalesce( "last_active": Coalesce(
Cast(F("user__last_login"), output_field=TextField()), last_active_query,
Value("Invalid date"), Value("Invalid date"),
output_field=TextField(), output_field=TextField(),
), ),
@ -236,12 +257,7 @@ class UserPortfolioPermissionModelDict(BaseModelDict):
output_field=CharField(), output_field=CharField(),
), ),
"domain_info": ArrayAgg( "domain_info": ArrayAgg(
Concat( domain_query,
F("user__permissions__domain_id"),
Value(":"),
F("user__permissions__domain__name"),
output_field=CharField(),
),
distinct=True, distinct=True,
filter=Q(user__permissions__domain__isnull=False) filter=Q(user__permissions__domain__isnull=False)
& Q(user__permissions__domain__domain_info__portfolio=portfolio), & Q(user__permissions__domain__domain_info__portfolio=portfolio),
@ -250,7 +266,7 @@ class UserPortfolioPermissionModelDict(BaseModelDict):
} }
@classmethod @classmethod
def get_annotated_queryset(cls, portfolio): def get_annotated_queryset(cls, portfolio, csv_report=False):
"""Override of the base annotated queryset to pass in portfolio""" """Override of the base annotated queryset to pass in portfolio"""
model_queryset = ( model_queryset = (
cls.model() cls.model()
@ -264,7 +280,7 @@ class UserPortfolioPermissionModelDict(BaseModelDict):
.distinct() .distinct()
) )
annotated_fields = cls.get_annotated_fields(portfolio) annotated_fields = cls.get_annotated_fields(portfolio, csv_report)
related_table_fields = cls.get_related_table_fields() related_table_fields = cls.get_related_table_fields()
return cls.annotate_and_retrieve_fields( return cls.annotate_and_retrieve_fields(
model_queryset, annotated_fields, related_table_fields model_queryset, annotated_fields, related_table_fields
@ -291,7 +307,7 @@ class PortfolioInvitationModelDict(BaseModelDict):
return Q(portfolio=portfolio) return Q(portfolio=portfolio)
@classmethod @classmethod
def get_annotated_fields(cls, portfolio): def get_annotated_fields(cls, portfolio, csv_report=False):
""" """
Get a dict of computed fields. These are fields that do not exist on the model normally Get a dict of computed fields. These are fields that do not exist on the model normally
and will be passed to .annotate() when building a queryset. and will be passed to .annotate() when building a queryset.
@ -300,10 +316,18 @@ class PortfolioInvitationModelDict(BaseModelDict):
# Return nothing # Return nothing
return {} return {}
# Tweak the queries slightly to only return the data we need
if csv_report:
domain_query = F("domain__name")
else:
domain_query = Concat(F("domain__id"), Value(":"), F("domain__name"), output_field=CharField())
# Get all existing domain invitations and search on that
domain_invitations = DomainInvitation.objects.filter( domain_invitations = DomainInvitation.objects.filter(
email=OuterRef("email"), # Check if email matches the OuterRef("email") email=OuterRef("email"), # Check if email matches the OuterRef("email")
domain__domain_info__portfolio=portfolio, # Check if the domain's portfolio matches the given portfolio domain__domain_info__portfolio=portfolio, # Check if the domain's portfolio matches the given portfolio
).annotate(domain_info=Concat(F("domain__id"), Value(":"), F("domain__name"), output_field=CharField())) ).annotate(domain_info=domain_query)
return { return {
"first_name": Value(None, output_field=CharField()), "first_name": Value(None, output_field=CharField()),
"last_name": Value(None, output_field=CharField()), "last_name": Value(None, output_field=CharField()),
@ -321,7 +345,7 @@ class PortfolioInvitationModelDict(BaseModelDict):
} }
@classmethod @classmethod
def get_annotated_queryset(cls, portfolio): def get_annotated_queryset(cls, portfolio, csv_report=False):
"""Override of the base annotated queryset to pass in portfolio""" """Override of the base annotated queryset to pass in portfolio"""
model_queryset = ( model_queryset = (
cls.model() cls.model()
@ -335,7 +359,7 @@ class PortfolioInvitationModelDict(BaseModelDict):
.distinct() .distinct()
) )
annotated_fields = cls.get_annotated_fields(portfolio) annotated_fields = cls.get_annotated_fields(portfolio, csv_report)
related_table_fields = cls.get_related_table_fields() related_table_fields = cls.get_related_table_fields()
return cls.annotate_and_retrieve_fields( return cls.annotate_and_retrieve_fields(
model_queryset, annotated_fields, related_table_fields model_queryset, annotated_fields, related_table_fields

View file

@ -173,10 +173,14 @@ class ExportMembersPortfolio(View):
"""Returns a a members report for a given portfolio""" """Returns a a members report for a given portfolio"""
def get(self, request, *args, **kwargs): def get(self, request, *args, **kwargs):
portfolio = request.session.get("portfolio") """Returns the members report"""
# match the CSV example with all the fields
portfolio_display = "portfolio"
if request.session.get("portfolio"):
portfolio_display = str(request.session.get("portfolio")).replace(" ", "-")
response = HttpResponse(content_type="text/csv") response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = f'attachment; filename="members-for-{portfolio}.csv"' response["Content-Disposition"] = f'attachment; filename="members-for-{portfolio_display}.csv"'
csv_export.MemberExport.export_data_to_csv(response, request=request) csv_export.MemberExport.export_data_to_csv(response, request=request)
return response return response