Merge branch 'za/2737-take-two-members-csv-export' into za/2737-members-csv-report

This commit is contained in:
zandercymatics 2024-11-26 10:16:17 -07:00
commit f05148f2c3
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
3 changed files with 405 additions and 496 deletions

View file

@ -1,4 +1,4 @@
from abc import abstractmethod
from abc import ABC, abstractmethod
from collections import defaultdict
import csv
import logging
@ -10,6 +10,9 @@ from registrar.models import (
DomainInformation,
PublicContact,
UserDomainRole,
PortfolioInvitation,
UserGroup,
UserPortfolioPermission,
)
from django.db.models import (
Case,
@ -17,27 +20,29 @@ from django.db.models import (
Count,
DateField,
F,
ManyToManyField,
Q,
QuerySet,
TextField,
Value,
When,
OuterRef,
Subquery,
Exists,
Func,
)
from django.utils import timezone
from django.db.models.functions import Concat, Coalesce
from django.contrib.postgres.aggregates import StringAgg
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from django.db.models.functions import Concat, Coalesce, Cast
from django.contrib.postgres.aggregates import ArrayAgg, StringAgg
from django.contrib.admin.models import LogEntry, ADDITION
from django.contrib.contenttypes.models import ContentType
from registrar.models.utility.generic_helper import convert_queryset_to_dict
from registrar.models.utility.orm_helper import ArrayRemoveNull
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
from registrar.templatetags.custom_filters import get_region
from registrar.utility.constants import BranchChoices
from registrar.utility.enums import DefaultEmail
from registrar.utility.model_annotations import (
BaseModelAnnotation,
PortfolioInvitationModelAnnotation,
UserPortfolioPermissionModelAnnotation,
)
logger = logging.getLogger(__name__)
@ -67,12 +72,21 @@ def format_end_date(end_date):
return timezone.make_aware(datetime.strptime(end_date, "%Y-%m-%d")) if end_date else get_default_end_date()
class BaseExport(BaseModelAnnotation):
class BaseExport(ABC):
"""
A generic class for exporting data which returns a csv file for the given model.
2nd class in an inheritance tree of 4.
Base class in an inheritance tree of 3.
"""
@classmethod
@abstractmethod
def model(self):
"""
Property to specify the model that the export class will handle.
Must be implemented by subclasses.
"""
pass
@classmethod
def get_columns(cls):
"""
@ -80,6 +94,78 @@ class BaseExport(BaseModelAnnotation):
"""
return []
@classmethod
def get_sort_fields(cls):
"""
Returns the sort fields for the CSV export. Override in subclasses as needed.
"""
return []
@classmethod
def get_additional_args(cls):
"""
Returns additional keyword arguments as an empty dictionary.
Override in subclasses to provide specific arguments.
"""
return {}
@classmethod
def get_select_related(cls):
"""
Get a list of tables to pass to select_related when building queryset.
"""
return []
@classmethod
def get_prefetch_related(cls):
"""
Get a list of tables to pass to prefetch_related when building queryset.
"""
return []
@classmethod
def get_exclusions(cls):
"""
Get a Q object of exclusion conditions to pass to .exclude() when building queryset.
"""
return Q()
@classmethod
def get_filter_conditions(cls, **kwargs):
"""
Get a Q object of filter conditions to filter when building queryset.
"""
return Q()
@classmethod
def get_computed_fields(cls, **kwargs):
"""
Get a dict of computed fields. These are fields that do not exist on the model normally
and will be passed to .annotate() when building a queryset.
"""
return {}
@classmethod
def get_annotations_for_sort(cls):
"""
Get a dict of annotations to make available for order_by clause.
"""
return {}
@classmethod
def get_related_table_fields(cls):
"""
Get a list of fields from related tables.
"""
return []
@classmethod
def update_queryset(cls, queryset, **kwargs):
"""
Returns an updated queryset. Override in subclass to update queryset.
"""
return queryset
@classmethod
def write_csv_before(cls, csv_writer, **kwargs):
"""
@ -88,6 +174,94 @@ class BaseExport(BaseModelAnnotation):
"""
pass
@classmethod
def annotate_and_retrieve_fields(
cls, initial_queryset, annotated_fields, related_table_fields=None, include_many_to_many=False, **kwargs
) -> QuerySet:
"""
Applies annotations to a queryset and retrieves specified fields,
including class-defined and annotation-defined.
Parameters:
initial_queryset (QuerySet): Initial queryset.
annotated_fields (dict, optional): Fields to compute {field_name: expression}.
related_table_fields (list, optional): Extra fields to retrieve; defaults to annotation keys if None.
include_many_to_many (bool, optional): Determines if we should include many to many fields or not
**kwargs: Additional keyword arguments for specific parameters (e.g., public_contacts, domain_invitations,
user_domain_roles).
Returns:
QuerySet: Contains dictionaries with the specified fields for each record.
"""
if related_table_fields is None:
related_table_fields = []
# We can infer that if we're passing in annotations,
# we want to grab the result of said annotation.
if annotated_fields:
related_table_fields.extend(annotated_fields.keys())
# Get prexisting fields on the model
model_fields = set()
for field in cls.model()._meta.get_fields():
# Exclude many to many fields unless we specify
many_to_many = isinstance(field, ManyToManyField) and include_many_to_many
if many_to_many or not isinstance(field, ManyToManyField):
model_fields.add(field.name)
queryset = initial_queryset.annotate(**annotated_fields).values(*model_fields, *related_table_fields)
return cls.update_queryset(queryset, **kwargs)
@classmethod
def export_data_to_csv(cls, csv_file, **kwargs):
"""
All domain metadata:
Exports domains of all statuses plus domain managers.
"""
writer = csv.writer(csv_file)
columns = cls.get_columns()
models_dict = cls.get_model_annotation_dict(**kwargs)
# Write to csv file before the write_csv
cls.write_csv_before(writer, **kwargs)
# Write the csv file
rows = cls.write_csv(writer, columns, models_dict)
# Return rows that for easier parsing and testing
return rows
@classmethod
def get_annotated_queryset(cls, **kwargs):
sort_fields = cls.get_sort_fields()
# Get additional args and merge with incoming kwargs
additional_args = cls.get_additional_args()
kwargs.update(additional_args)
select_related = cls.get_select_related()
prefetch_related = cls.get_prefetch_related()
exclusions = cls.get_exclusions()
annotations_for_sort = cls.get_annotations_for_sort()
filter_conditions = cls.get_filter_conditions(**kwargs)
annotated_fields = cls.get_computed_fields(**kwargs)
related_table_fields = cls.get_related_table_fields()
model_queryset = (
cls.model()
.objects.select_related(*select_related)
.prefetch_related(*prefetch_related)
.filter(filter_conditions)
.exclude(exclusions)
.annotate(**annotations_for_sort)
.order_by(*sort_fields)
.distinct()
)
return cls.annotate_and_retrieve_fields(model_queryset, annotated_fields, related_table_fields, **kwargs)
@classmethod
def get_model_annotation_dict(cls, **kwargs):
return convert_queryset_to_dict(cls.get_annotated_queryset(**kwargs), is_model=False)
@classmethod
def export_data_to_csv(cls, csv_file, **kwargs):
"""
@ -185,14 +359,128 @@ class MemberExport(BaseExport):
"joined_date",
"invited_by",
]
permissions = UserPortfolioPermissionModelAnnotation.get_annotated_queryset(portfolio, csv_report=True).values(
*shared_columns
)
invitations = PortfolioInvitationModelAnnotation.get_annotated_queryset(portfolio, csv_report=True).values(
*shared_columns
)
# Permissions
permissions = UserPortfolioPermission.objects.filter(portfolio=portfolio).select_related("user").annotate(
first_name=F("user__first_name"),
last_name=F("user__last_name"),
email_display=F("user__email"),
last_active=Coalesce(
Func(
F("user__last_login"), Value("YYYY-MM-DD"), function="to_char", output_field=TextField()
),
Value("Invalid date"),
output_field=CharField(),
),
additional_permissions_display=F("additional_permissions"),
member_display=Case(
# If email is present and not blank, use email
When(Q(user__email__isnull=False) & ~Q(user__email=""), then=F("user__email")),
# If first name or last name is present, use concatenation of first_name + " " + last_name
When(
Q(user__first_name__isnull=False) | Q(user__last_name__isnull=False),
then=Concat(
Coalesce(F("user__first_name"), Value("")),
Value(" "),
Coalesce(F("user__last_name"), Value("")),
),
),
# If neither, use an empty string
default=Value(""),
output_field=CharField(),
),
domain_info=ArrayAgg(
F("user__permissions__domain__name"),
distinct=True,
# only include domains in portfolio
filter=Q(user__permissions__domain__isnull=False)
& Q(user__permissions__domain__domain_info__portfolio=portfolio),
),
type=Value("member", output_field=CharField()),
joined_date=Func(F("created_at"), Value("YYYY-MM-DD"), function="to_char", output_field=CharField()),
invited_by=cls.get_invited_by_query(
object_id_query=cls.get_portfolio_invitation_id_query()
),
).values(*shared_columns)
# Invitations
domain_invitations = DomainInvitation.objects.filter(
email=OuterRef("email"), # Check if email matches the OuterRef("email")
domain__domain_info__portfolio=portfolio, # Check if the domain's portfolio matches the given portfolio
).annotate(domain_info=Concat(F("domain__id"), Value(":"), F("domain__name"), output_field=CharField()))
invitations = PortfolioInvitation.objects.filter(portfolio=portfolio).annotate(
first_name=Value(None, output_field=CharField()),
last_name=Value(None, output_field=CharField()),
email_display=F("email"),
last_active=Value("Invited", output_field=CharField()),
additional_permissions_display=F("additional_permissions"),
member_display=F("email"),
# Use ArrayRemove to return an empty list when no domain invitations are found
domain_info=ArrayRemoveNull(
ArrayAgg(
Subquery(domain_invitations.values("domain_info")),
distinct=True,
)
),
type=Value("invitedmember", output_field=CharField()),
joined_date=Value("Unretrieved", output_field=CharField()),
invited_by=cls.get_invited_by_query(object_id_query=Cast(OuterRef("id"), output_field=CharField())),
).values(*shared_columns)
return convert_queryset_to_dict(permissions.union(invitations), is_model=False)
@classmethod
def get_invited_by_query(cls, object_id_query):
"""Returns the user that created the given portfolio invitation.
Grabs this data from the audit log, given that a portfolio invitation object
is specified via object_id_query."""
return Coalesce(
Subquery(
LogEntry.objects.filter(
content_type=ContentType.objects.get_for_model(PortfolioInvitation),
object_id=object_id_query,
action_flag=ADDITION,
)
.annotate(
display_email=Case(
When(
Exists(
UserGroup.objects.filter(
name__in=["cisa_analysts_group", "full_access_group"],
user=OuterRef("user"),
)
),
then=Value("help@get.gov"),
),
default=F("user__email"),
output_field=CharField(),
)
)
.order_by("action_time")
.values("display_email")[:1]
),
Value("System"),
output_field=CharField(),
)
@classmethod
def get_portfolio_invitation_id_query(cls):
"""Gets the id of the portfolio invitation that created this UserPortfolioPermission.
This makes the assumption that if an invitation is retrieved, it must have created the given
UserPortfolioPermission object."""
return Cast(
Subquery(
PortfolioInvitation.objects.filter(
status=PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED,
# Double outer ref because we first go into the LogEntry query,
# then into the parent UserPortfolioPermission.
email=OuterRef(OuterRef("user__email")),
portfolio=OuterRef(OuterRef("portfolio")),
).values("id")[:1]
),
output_field=CharField(),
)
@classmethod
def get_columns(cls):
"""
@ -498,7 +786,7 @@ class DomainDataType(DomainExport):
return ["domain__permissions"]
@classmethod
def get_annotated_fields(cls, delimiter=", ", **kwargs):
def get_computed_fields(cls, delimiter=", ", **kwargs):
"""
Get a dict of computed fields.
"""
@ -715,7 +1003,7 @@ class DomainDataFull(DomainExport):
)
@classmethod
def get_annotated_fields(cls, delimiter=", ", **kwargs):
def get_computed_fields(cls, delimiter=", ", **kwargs):
"""
Get a dict of computed fields.
"""
@ -810,7 +1098,7 @@ class DomainDataFederal(DomainExport):
)
@classmethod
def get_annotated_fields(cls, delimiter=", ", **kwargs):
def get_computed_fields(cls, delimiter=", ", **kwargs):
"""
Get a dict of computed fields.
"""
@ -1444,7 +1732,7 @@ class DomainRequestDataFull(DomainRequestExport):
]
@classmethod
def get_annotated_fields(cls, delimiter=", ", **kwargs):
def get_computed_fields(cls, delimiter=", ", **kwargs):
"""
Get a dict of computed fields.
"""

View file

@ -1,449 +0,0 @@
"""
Model annotation classes.
Intended to return django querysets with computed fields for api endpoints and our csv reports.
Used by both API endpoints (e.g. portfolio members JSON) and data exports (e.g. CSV reports).
Initially created to manage the complexity of the MembersTable and Members CSV report,
as they require complex but common annotations.
Example:
# For a JSON table endpoint
permissions = UserPortfolioPermissionAnnotation.get_annotated_queryset(portfolio)
# Returns queryset with standardized fields for member tables
# For a CSV export
permissions = UserPortfolioPermissionAnnotation.get_annotated_queryset(portfolio, csv_report=True)
# Returns same fields but formatted for CSV export
"""
from abc import ABC, abstractmethod
from registrar.models import (
DomainInvitation,
PortfolioInvitation,
)
from django.db.models import (
CharField,
F,
ManyToManyField,
Q,
QuerySet,
Value,
TextField,
OuterRef,
Subquery,
Func,
Case,
When,
Exists,
)
from django.db.models.functions import Concat, Coalesce, Cast
from registrar.models.user_group import UserGroup
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.generic_helper import convert_queryset_to_dict
from registrar.models.utility.orm_helper import ArrayRemoveNull
from django.contrib.postgres.aggregates import ArrayAgg
from django.contrib.admin.models import LogEntry, ADDITION
from django.contrib.contenttypes.models import ContentType
from registrar.utility.enums import DefaultEmail, DefaultUser
class BaseModelAnnotation(ABC):
"""
Abstract base class that standardizes how models are annotated for csv exports or complex annotation queries.
For example, the Members table / csv export.
Subclasses define model-specific annotations, filters, and field formatting while inheriting
common queryset building logic.
Intended ensure consistent data presentation across both table UI components and CSV exports.
Base class in an inheritance tree of 4.
"""
@classmethod
@abstractmethod
def model(self):
"""
Property to specify the model that the export class will handle.
Must be implemented by subclasses.
"""
pass
@classmethod
def get_sort_fields(cls):
"""
Returns the sort fields for the CSV export. Override in subclasses as needed.
"""
return []
@classmethod
def get_additional_args(cls):
"""
Returns additional keyword arguments as an empty dictionary.
Override in subclasses to provide specific arguments.
"""
return {}
@classmethod
def get_select_related(cls):
"""
Get a list of tables to pass to select_related when building queryset.
"""
return []
@classmethod
def get_prefetch_related(cls):
"""
Get a list of tables to pass to prefetch_related when building queryset.
"""
return []
@classmethod
def get_exclusions(cls):
"""
Get a Q object of exclusion conditions to pass to .exclude() when building queryset.
"""
return Q()
@classmethod
def get_filter_conditions(cls, **kwargs):
"""
Get a Q object of filter conditions to filter when building queryset.
"""
return Q()
@classmethod
def get_annotated_fields(cls, **kwargs):
"""
Get a dict of computed fields. These are fields that do not exist on the model normally
and will be passed to .annotate() when building a queryset.
"""
return {}
@classmethod
def get_annotations_for_sort(cls):
"""
Get a dict of annotations to make available for order_by clause.
"""
return {}
@classmethod
def get_related_table_fields(cls):
"""
Get a list of fields from related tables.
"""
return []
@classmethod
def annotate_and_retrieve_fields(
cls, initial_queryset, annotated_fields, related_table_fields=None, include_many_to_many=False, **kwargs
) -> QuerySet:
"""
Applies annotations to a queryset and retrieves specified fields,
including class-defined and annotation-defined.
Parameters:
initial_queryset (QuerySet): Initial queryset.
annotated_fields (dict, optional): Fields to compute {field_name: expression}.
related_table_fields (list, optional): Extra fields to retrieve; defaults to annotation keys if None.
include_many_to_many (bool, optional): Determines if we should include many to many fields or not
**kwargs: Additional keyword arguments for specific parameters (e.g., public_contacts, domain_invitations,
user_domain_roles).
Returns:
QuerySet: Contains dictionaries with the specified fields for each record.
"""
if related_table_fields is None:
related_table_fields = []
# We can infer that if we're passing in annotations,
# we want to grab the result of said annotation.
if annotated_fields:
related_table_fields.extend(annotated_fields.keys())
# Get prexisting fields on the model
model_fields = set()
for field in cls.model()._meta.get_fields():
# Exclude many to many fields unless we specify
many_to_many = isinstance(field, ManyToManyField) and include_many_to_many
if many_to_many or not isinstance(field, ManyToManyField):
model_fields.add(field.name)
queryset = initial_queryset.annotate(**annotated_fields).values(*model_fields, *related_table_fields)
return cls.update_queryset(queryset, **kwargs)
@classmethod
def get_annotated_queryset(cls, **kwargs):
sort_fields = cls.get_sort_fields()
# Get additional args and merge with incoming kwargs
additional_args = cls.get_additional_args()
kwargs.update(additional_args)
select_related = cls.get_select_related()
prefetch_related = cls.get_prefetch_related()
exclusions = cls.get_exclusions()
annotations_for_sort = cls.get_annotations_for_sort()
filter_conditions = cls.get_filter_conditions(**kwargs)
annotated_fields = cls.get_annotated_fields(**kwargs)
related_table_fields = cls.get_related_table_fields()
model_queryset = (
cls.model()
.objects.select_related(*select_related)
.prefetch_related(*prefetch_related)
.filter(filter_conditions)
.exclude(exclusions)
.annotate(**annotations_for_sort)
.order_by(*sort_fields)
.distinct()
)
return cls.annotate_and_retrieve_fields(model_queryset, annotated_fields, related_table_fields, **kwargs)
@classmethod
def update_queryset(cls, queryset, **kwargs):
"""
Returns an updated queryset. Override in subclass to update queryset.
"""
return queryset
@classmethod
def get_model_annotation_dict(cls, **kwargs):
return convert_queryset_to_dict(cls.get_annotated_queryset(**kwargs), is_model=False)
class UserPortfolioPermissionModelAnnotation(BaseModelAnnotation):
"""
Annotates UserPortfolioPermission querysets with computed fields for member tables.
Handles formatting of user details, permissions, and related domain information
for both UI display and CSV export.
"""
@classmethod
def model(cls):
# Return the model class that this export handles
return UserPortfolioPermission
@classmethod
def get_select_related(cls):
"""
Get a list of tables to pass to select_related when building queryset.
"""
return ["user"]
@classmethod
def get_filter_conditions(cls, portfolio, **kwargs):
"""
Get a Q object of filter conditions to filter when building queryset.
"""
if not portfolio:
# Return nothing
return Q(id__in=[])
# Get all members on this portfolio
return Q(portfolio=portfolio)
@classmethod
def get_portfolio_invitation_id_query(cls):
"""Gets the id of the portfolio invitation that created this UserPortfolioPermission.
This makes the assumption that if an invitation is retrieved, it must have created the given
UserPortfolioPermission object."""
return Cast(
Subquery(
PortfolioInvitation.objects.filter(
status=PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED,
# Double outer ref because we first go into the LogEntry query,
# then into the parent UserPortfolioPermission.
email=OuterRef(OuterRef("user__email")),
portfolio=OuterRef(OuterRef("portfolio")),
).values("id")[:1]
),
output_field=CharField(),
)
@classmethod
def get_annotated_fields(cls, portfolio, csv_report=False):
"""
Get a dict of computed fields. These are fields that do not exist on the model normally
and will be passed to .annotate() when building a queryset.
"""
if not portfolio:
# Return nothing
return {}
# Tweak the queries slightly to only return the data we need.
# When returning data for the csv report we:
# 1. Only return the domain name for 'domain_info' rather than also add ':' seperated id
# 2. Return a formatted date for 'last_active'
# These are just optimizations that are better done in SQL as opposed to python.
if csv_report:
domain_query = F("user__permissions__domain__name")
last_active_query = Func(
F("user__last_login"), Value("YYYY-MM-DD"), function="to_char", output_field=TextField()
)
else:
# an array of domains, with id and name, colon separated
domain_query = Concat(
F("user__permissions__domain_id"),
Value(":"),
F("user__permissions__domain__name"),
# specify the output_field to ensure union has same column types
output_field=CharField(),
)
last_active_query = Cast(F("user__last_login"), output_field=TextField())
return {
"first_name": F("user__first_name"),
"last_name": F("user__last_name"),
"email_display": F("user__email"),
"last_active": Coalesce(
last_active_query,
Value("Invalid date"),
output_field=CharField(),
),
"additional_permissions_display": F("additional_permissions"),
"member_display": Case(
# If email is present and not blank, use email
When(Q(user__email__isnull=False) & ~Q(user__email=""), then=F("user__email")),
# If first name or last name is present, use concatenation of first_name + " " + last_name
When(
Q(user__first_name__isnull=False) | Q(user__last_name__isnull=False),
then=Concat(
Coalesce(F("user__first_name"), Value("")),
Value(" "),
Coalesce(F("user__last_name"), Value("")),
),
),
# If neither, use an empty string
default=Value(""),
output_field=CharField(),
),
"domain_info": ArrayAgg(
domain_query,
distinct=True,
# only include domains in portfolio
filter=Q(user__permissions__domain__isnull=False)
& Q(user__permissions__domain__domain_info__portfolio=portfolio),
),
"type": Value("member", output_field=CharField()),
"joined_date": Func(F("created_at"), Value("YYYY-MM-DD"), function="to_char", output_field=CharField()),
"invited_by": PortfolioInvitationModelAnnotation.get_invited_by_query(
object_id_query=cls.get_portfolio_invitation_id_query()
),
}
@classmethod
def get_annotated_queryset(cls, portfolio, csv_report=False):
"""Override of the base annotated queryset to pass in portfolio"""
return super().get_annotated_queryset(portfolio=portfolio, csv_report=csv_report)
class PortfolioInvitationModelAnnotation(BaseModelAnnotation):
"""
Annotates PortfolioInvitation querysets with computed fields for the member table.
Handles formatting of user details, permissions, and related domain information
for both UI display and CSV export.
"""
@classmethod
def model(cls):
# Return the model class that this export handles
return PortfolioInvitation
@classmethod
def get_exclusions(cls):
"""
Get a Q object of exclusion conditions to pass to .exclude() when building queryset.
"""
return Q(status=PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED)
@classmethod
def get_filter_conditions(cls, portfolio, **kwargs):
"""
Get a Q object of filter conditions to filter when building queryset.
"""
if not portfolio:
# Return nothing
return Q(id__in=[])
# Get all members on this portfolio
return Q(portfolio=portfolio)
@classmethod
def get_invited_by_query(cls, object_id_query):
"""Returns the user that created the given portfolio invitation.
Grabs this data from the audit log, given that a portfolio invitation object
is specified via object_id_query."""
return Coalesce(
Subquery(
LogEntry.objects.filter(
content_type=ContentType.objects.get_for_model(PortfolioInvitation),
object_id=object_id_query,
action_flag=ADDITION,
)
.annotate(
display_email=Case(
When(
Exists(
UserGroup.objects.filter(
name__in=["cisa_analysts_group", "full_access_group"],
user=OuterRef("user"),
)
),
then=Value(DefaultEmail.HELP_EMAIL),
),
default=F("user__email"),
output_field=CharField(),
)
)
.order_by("action_time")
.values("display_email")[:1]
),
Value(DefaultUser.SYSTEM),
output_field=CharField(),
)
@classmethod
def get_annotated_fields(cls, portfolio, csv_report=False):
"""
Get a dict of computed fields. These are fields that do not exist on the model normally
and will be passed to .annotate() when building a queryset.
"""
if not portfolio:
# Return nothing
return {}
# Tweak the queries slightly to only return the data we need
if csv_report:
domain_query = F("domain__name")
else:
domain_query = Concat(F("domain__id"), Value(":"), F("domain__name"), output_field=CharField())
# Get all existing domain invitations and search on that for domains the user exists on
domain_invitations = DomainInvitation.objects.filter(
email=OuterRef("email"), # Check if email matches the OuterRef("email")
domain__domain_info__portfolio=portfolio, # Check if the domain's portfolio matches the given portfolio
).annotate(domain_info=domain_query)
return {
"first_name": Value(None, output_field=CharField()),
"last_name": Value(None, output_field=CharField()),
"email_display": F("email"),
"last_active": Value("Invited", output_field=CharField()),
"additional_permissions_display": F("additional_permissions"),
"member_display": F("email"),
# Use ArrayRemove to return an empty list when no domain invitations are found
"domain_info": ArrayRemoveNull(
ArrayAgg(
Subquery(domain_invitations.values("domain_info")),
distinct=True,
)
),
"type": Value("invitedmember", output_field=CharField()),
"joined_date": Value("Unretrieved", output_field=CharField()),
"invited_by": cls.get_invited_by_query(object_id_query=Cast(OuterRef("id"), output_field=CharField())),
}
@classmethod
def get_annotated_queryset(cls, portfolio, csv_report=False):
"""Override of the base annotated queryset to pass in portfolio"""
return super().get_annotated_queryset(portfolio=portfolio, csv_report=csv_report)

View file

@ -1,16 +1,18 @@
from django.http import JsonResponse
from django.core.paginator import Paginator
from django.db.models import F, Q
from django.db.models import Value, F, CharField, TextField, Q, Case, When, OuterRef, Subquery
from django.db.models.expressions import Func
from django.db.models.functions import Cast, Coalesce, Concat
from django.contrib.postgres.aggregates import ArrayAgg
from django.urls import reverse
from django.views import View
from registrar.models import UserPortfolioPermission
from registrar.models.domain_invitation import DomainInvitation
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from registrar.utility.model_annotations import (
PortfolioInvitationModelAnnotation,
UserPortfolioPermissionModelAnnotation,
)
from registrar.views.utility.mixins import PortfolioMembersPermission
from registrar.models.utility.orm_helper import ArrayRemoveNull
class PortfolioMembersJson(PortfolioMembersPermission, View):
@ -38,7 +40,7 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
page_number = request.GET.get("page", 1)
page_obj = paginator.get_page(page_number)
members = [self.serialize_members(portfolio, item, request.user) for item in page_obj.object_list]
members = [self.serialize_members(request, portfolio, item, request.user) for item in page_obj.object_list]
return JsonResponse(
{
@ -55,26 +57,92 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
def initial_permissions_search(self, portfolio):
"""Perform initial search for permissions before applying any filters."""
# Get UserPortfolioPermission query for getting active members on the portfolio
permissions = UserPortfolioPermissionModelAnnotation.get_annotated_queryset(portfolio)
return permissions.values(
"id",
"first_name",
"last_name",
"email_display",
"last_active",
"roles",
"additional_permissions_display",
"member_display",
"domain_info",
"type",
permissions = UserPortfolioPermission.objects.filter(portfolio=portfolio)
permissions = (
permissions.select_related("user")
.annotate(
first_name=F("user__first_name"),
last_name=F("user__last_name"),
email_display=F("user__email"),
last_active=Coalesce(
Cast(F("user__last_login"), output_field=TextField()), # Cast last_login to text
Value("Invalid date"),
output_field=TextField(),
),
additional_permissions_display=F("additional_permissions"),
member_display=Case(
# If email is present and not blank, use email
When(Q(user__email__isnull=False) & ~Q(user__email=""), then=F("user__email")),
# If first name or last name is present, use concatenation of first_name + " " + last_name
When(
Q(user__first_name__isnull=False) | Q(user__last_name__isnull=False),
then=Concat(
Coalesce(F("user__first_name"), Value("")),
Value(" "),
Coalesce(F("user__last_name"), Value("")),
),
),
# If neither, use an empty string
default=Value(""),
output_field=CharField(),
),
domain_info=ArrayAgg(
# an array of domains, with id and name, colon separated
Concat(
F("user__permissions__domain_id"),
Value(":"),
F("user__permissions__domain__name"),
# specify the output_field to ensure union has same column types
output_field=CharField(),
),
distinct=True,
filter=Q(user__permissions__domain__isnull=False) # filter out null values
& Q(
user__permissions__domain__domain_info__portfolio=portfolio
), # only include domains in portfolio
),
type=Value("member", output_field=CharField()),
)
.values(
"id",
"first_name",
"last_name",
"email_display",
"last_active",
"roles",
"additional_permissions_display",
"member_display",
"domain_info",
"type",
)
)
return permissions
def initial_invitations_search(self, portfolio):
"""Perform initial invitations search and get related DomainInvitation data based on the email."""
# Get PortfolioInvitation query for getting active invitations on the portfolio
invitations = PortfolioInvitationModelAnnotation.get_annotated_queryset(portfolio)
return invitations.values(
# Get DomainInvitation query for matching email and for the portfolio
domain_invitations = DomainInvitation.objects.filter(
email=OuterRef("email"), # Check if email matches the OuterRef("email")
domain__domain_info__portfolio=portfolio, # Check if the domain's portfolio matches the given portfolio
).annotate(domain_info=Concat(F("domain__id"), Value(":"), F("domain__name"), output_field=CharField()))
# PortfolioInvitation query
invitations = PortfolioInvitation.objects.filter(portfolio=portfolio)
invitations = invitations.annotate(
first_name=Value(None, output_field=CharField()),
last_name=Value(None, output_field=CharField()),
email_display=F("email"),
last_active=Value("Invited", output_field=TextField()),
additional_permissions_display=F("additional_permissions"),
member_display=F("email"),
# Use ArrayRemove to return an empty list when no domain invitations are found
domain_info=ArrayRemoveNull(
ArrayAgg(
Subquery(domain_invitations.values("domain_info")),
distinct=True,
)
),
type=Value("invitedmember", output_field=CharField()),
).values(
"id",
"first_name",
"last_name",
@ -86,6 +154,7 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
"domain_info",
"type",
)
return invitations
def apply_search_term(self, queryset, request):
"""Apply search term to the queryset."""
@ -111,7 +180,7 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
queryset = queryset.order_by(sort_by)
return queryset
def serialize_members(self, portfolio, item, user):
def serialize_members(self, request, portfolio, item, user):
# Check if the user can edit other users
user_can_edit_other_users = any(
user.has_perm(perm) for perm in ["registrar.full_access_permission", "registrar.change_user"]
@ -145,3 +214,4 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
"svg_icon": ("visibility" if view_only else "settings"),
}
return member_json