mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-08-15 14:04:10 +02:00
remove model annotations file
This commit is contained in:
parent
4eb54319b7
commit
859e8b3bfa
3 changed files with 394 additions and 480 deletions
|
@ -21,6 +21,7 @@ from django.db.models import (
|
||||||
Value,
|
Value,
|
||||||
When,
|
When,
|
||||||
)
|
)
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from django.db.models.functions import Concat, Coalesce
|
from django.db.models.functions import Concat, Coalesce
|
||||||
from django.contrib.postgres.aggregates import StringAgg
|
from django.contrib.postgres.aggregates import StringAgg
|
||||||
|
@ -30,13 +31,34 @@ from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
|
||||||
from registrar.templatetags.custom_filters import get_region
|
from registrar.templatetags.custom_filters import get_region
|
||||||
from registrar.utility.constants import BranchChoices
|
from registrar.utility.constants import BranchChoices
|
||||||
from registrar.utility.enums import DefaultEmail
|
from registrar.utility.enums import DefaultEmail
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
from registrar.utility.model_annotations import (
|
from registrar.models import (
|
||||||
BaseModelAnnotation,
|
DomainInvitation,
|
||||||
PortfolioInvitationModelAnnotation,
|
PortfolioInvitation,
|
||||||
UserPortfolioPermissionModelAnnotation,
|
|
||||||
)
|
)
|
||||||
|
from django.db.models import (
|
||||||
|
CharField,
|
||||||
|
F,
|
||||||
|
ManyToManyField,
|
||||||
|
Q,
|
||||||
|
QuerySet,
|
||||||
|
Value,
|
||||||
|
TextField,
|
||||||
|
OuterRef,
|
||||||
|
Subquery,
|
||||||
|
Func,
|
||||||
|
Case,
|
||||||
|
When,
|
||||||
|
Exists,
|
||||||
|
)
|
||||||
|
from django.db.models.functions import Concat, Coalesce, Cast
|
||||||
|
from registrar.models.user_group import UserGroup
|
||||||
|
from registrar.models.user_portfolio_permission import UserPortfolioPermission
|
||||||
|
from registrar.models.utility.generic_helper import convert_queryset_to_dict
|
||||||
|
from registrar.models.utility.orm_helper import ArrayRemoveNull
|
||||||
|
from django.contrib.postgres.aggregates import ArrayAgg
|
||||||
|
from django.contrib.admin.models import LogEntry, ADDITION
|
||||||
|
from django.contrib.contenttypes.models import ContentType
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -67,12 +89,162 @@ def format_end_date(end_date):
|
||||||
return timezone.make_aware(datetime.strptime(end_date, "%Y-%m-%d")) if end_date else get_default_end_date()
|
return timezone.make_aware(datetime.strptime(end_date, "%Y-%m-%d")) if end_date else get_default_end_date()
|
||||||
|
|
||||||
|
|
||||||
class BaseExport(BaseModelAnnotation):
|
class BaseExport(ABC):
|
||||||
"""
|
"""
|
||||||
A generic class for exporting data which returns a csv file for the given model.
|
A generic class for exporting data which returns a csv file for the given model.
|
||||||
2nd class in an inheritance tree of 4.
|
2nd class in an inheritance tree of 4.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@abstractmethod
|
||||||
|
def model(self):
|
||||||
|
"""
|
||||||
|
Property to specify the model that the export class will handle.
|
||||||
|
Must be implemented by subclasses.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_sort_fields(cls):
|
||||||
|
"""
|
||||||
|
Returns the sort fields for the CSV export. Override in subclasses as needed.
|
||||||
|
"""
|
||||||
|
return []
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_additional_args(cls):
|
||||||
|
"""
|
||||||
|
Returns additional keyword arguments as an empty dictionary.
|
||||||
|
Override in subclasses to provide specific arguments.
|
||||||
|
"""
|
||||||
|
return {}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_select_related(cls):
|
||||||
|
"""
|
||||||
|
Get a list of tables to pass to select_related when building queryset.
|
||||||
|
"""
|
||||||
|
return []
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_prefetch_related(cls):
|
||||||
|
"""
|
||||||
|
Get a list of tables to pass to prefetch_related when building queryset.
|
||||||
|
"""
|
||||||
|
return []
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_exclusions(cls):
|
||||||
|
"""
|
||||||
|
Get a Q object of exclusion conditions to pass to .exclude() when building queryset.
|
||||||
|
"""
|
||||||
|
return Q()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_filter_conditions(cls, **kwargs):
|
||||||
|
"""
|
||||||
|
Get a Q object of filter conditions to filter when building queryset.
|
||||||
|
"""
|
||||||
|
return Q()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_annotated_fields(cls, **kwargs):
|
||||||
|
"""
|
||||||
|
Get a dict of computed fields. These are fields that do not exist on the model normally
|
||||||
|
and will be passed to .annotate() when building a queryset.
|
||||||
|
"""
|
||||||
|
return {}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_annotations_for_sort(cls):
|
||||||
|
"""
|
||||||
|
Get a dict of annotations to make available for order_by clause.
|
||||||
|
"""
|
||||||
|
return {}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_related_table_fields(cls):
|
||||||
|
"""
|
||||||
|
Get a list of fields from related tables.
|
||||||
|
"""
|
||||||
|
return []
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def annotate_and_retrieve_fields(
|
||||||
|
cls, initial_queryset, annotated_fields, related_table_fields=None, include_many_to_many=False, **kwargs
|
||||||
|
) -> QuerySet:
|
||||||
|
"""
|
||||||
|
Applies annotations to a queryset and retrieves specified fields,
|
||||||
|
including class-defined and annotation-defined.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
initial_queryset (QuerySet): Initial queryset.
|
||||||
|
annotated_fields (dict, optional): Fields to compute {field_name: expression}.
|
||||||
|
related_table_fields (list, optional): Extra fields to retrieve; defaults to annotation keys if None.
|
||||||
|
include_many_to_many (bool, optional): Determines if we should include many to many fields or not
|
||||||
|
**kwargs: Additional keyword arguments for specific parameters (e.g., public_contacts, domain_invitations,
|
||||||
|
user_domain_roles).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
QuerySet: Contains dictionaries with the specified fields for each record.
|
||||||
|
"""
|
||||||
|
if related_table_fields is None:
|
||||||
|
related_table_fields = []
|
||||||
|
|
||||||
|
# We can infer that if we're passing in annotations,
|
||||||
|
# we want to grab the result of said annotation.
|
||||||
|
if annotated_fields:
|
||||||
|
related_table_fields.extend(annotated_fields.keys())
|
||||||
|
|
||||||
|
# Get prexisting fields on the model
|
||||||
|
model_fields = set()
|
||||||
|
for field in cls.model()._meta.get_fields():
|
||||||
|
# Exclude many to many fields unless we specify
|
||||||
|
many_to_many = isinstance(field, ManyToManyField) and include_many_to_many
|
||||||
|
if many_to_many or not isinstance(field, ManyToManyField):
|
||||||
|
model_fields.add(field.name)
|
||||||
|
|
||||||
|
queryset = initial_queryset.annotate(**annotated_fields).values(*model_fields, *related_table_fields)
|
||||||
|
|
||||||
|
return cls.update_queryset(queryset, **kwargs)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_annotated_queryset(cls, **kwargs):
|
||||||
|
sort_fields = cls.get_sort_fields()
|
||||||
|
# Get additional args and merge with incoming kwargs
|
||||||
|
additional_args = cls.get_additional_args()
|
||||||
|
kwargs.update(additional_args)
|
||||||
|
select_related = cls.get_select_related()
|
||||||
|
prefetch_related = cls.get_prefetch_related()
|
||||||
|
exclusions = cls.get_exclusions()
|
||||||
|
annotations_for_sort = cls.get_annotations_for_sort()
|
||||||
|
filter_conditions = cls.get_filter_conditions(**kwargs)
|
||||||
|
annotated_fields = cls.get_annotated_fields(**kwargs)
|
||||||
|
related_table_fields = cls.get_related_table_fields()
|
||||||
|
|
||||||
|
model_queryset = (
|
||||||
|
cls.model()
|
||||||
|
.objects.select_related(*select_related)
|
||||||
|
.prefetch_related(*prefetch_related)
|
||||||
|
.filter(filter_conditions)
|
||||||
|
.exclude(exclusions)
|
||||||
|
.annotate(**annotations_for_sort)
|
||||||
|
.order_by(*sort_fields)
|
||||||
|
.distinct()
|
||||||
|
)
|
||||||
|
return cls.annotate_and_retrieve_fields(model_queryset, annotated_fields, related_table_fields, **kwargs)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def update_queryset(cls, queryset, **kwargs):
|
||||||
|
"""
|
||||||
|
Returns an updated queryset. Override in subclass to update queryset.
|
||||||
|
"""
|
||||||
|
return queryset
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_model_annotation_dict(cls, **kwargs):
|
||||||
|
return convert_queryset_to_dict(cls.get_annotated_queryset(**kwargs), is_model=False)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_columns(cls):
|
def get_columns(cls):
|
||||||
"""
|
"""
|
||||||
|
@ -181,14 +353,128 @@ class MemberExport(BaseExport):
|
||||||
"joined_date",
|
"joined_date",
|
||||||
"invited_by",
|
"invited_by",
|
||||||
]
|
]
|
||||||
permissions = UserPortfolioPermissionModelAnnotation.get_annotated_queryset(portfolio, csv_report=True).values(
|
|
||||||
*shared_columns
|
# Permissions
|
||||||
)
|
permissions = UserPortfolioPermission.objects.filter(portfolio=portfolio).select_related("user").annotate(
|
||||||
invitations = PortfolioInvitationModelAnnotation.get_annotated_queryset(portfolio, csv_report=True).values(
|
first_name=F("user__first_name"),
|
||||||
*shared_columns
|
last_name=F("user__last_name"),
|
||||||
|
email_display=F("user__email"),
|
||||||
|
last_active=Coalesce(
|
||||||
|
Func(
|
||||||
|
F("user__last_login"), Value("YYYY-MM-DD"), function="to_char", output_field=TextField()
|
||||||
|
),
|
||||||
|
Value("Invalid date"),
|
||||||
|
output_field=CharField(),
|
||||||
|
),
|
||||||
|
additional_permissions_display=F("additional_permissions"),
|
||||||
|
member_display=Case(
|
||||||
|
# If email is present and not blank, use email
|
||||||
|
When(Q(user__email__isnull=False) & ~Q(user__email=""), then=F("user__email")),
|
||||||
|
# If first name or last name is present, use concatenation of first_name + " " + last_name
|
||||||
|
When(
|
||||||
|
Q(user__first_name__isnull=False) | Q(user__last_name__isnull=False),
|
||||||
|
then=Concat(
|
||||||
|
Coalesce(F("user__first_name"), Value("")),
|
||||||
|
Value(" "),
|
||||||
|
Coalesce(F("user__last_name"), Value("")),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
# If neither, use an empty string
|
||||||
|
default=Value(""),
|
||||||
|
output_field=CharField(),
|
||||||
|
),
|
||||||
|
domain_info=ArrayAgg(
|
||||||
|
F("user__permissions__domain__name"),
|
||||||
|
distinct=True,
|
||||||
|
# only include domains in portfolio
|
||||||
|
filter=Q(user__permissions__domain__isnull=False)
|
||||||
|
& Q(user__permissions__domain__domain_info__portfolio=portfolio),
|
||||||
|
),
|
||||||
|
type=Value("member", output_field=CharField()),
|
||||||
|
joined_date=Func(F("created_at"), Value("YYYY-MM-DD"), function="to_char", output_field=CharField()),
|
||||||
|
invited_by=cls.get_invited_by_query(
|
||||||
|
object_id_query=cls.get_portfolio_invitation_id_query()
|
||||||
|
),
|
||||||
|
).values(*shared_columns)
|
||||||
|
|
||||||
|
# Invitations
|
||||||
|
domain_invitations = DomainInvitation.objects.filter(
|
||||||
|
email=OuterRef("email"), # Check if email matches the OuterRef("email")
|
||||||
|
domain__domain_info__portfolio=portfolio, # Check if the domain's portfolio matches the given portfolio
|
||||||
|
).annotate(domain_info=Concat(F("domain__id"), Value(":"), F("domain__name"), output_field=CharField()))
|
||||||
|
invitations = PortfolioInvitation.objects.filter(portfolio=portfolio).annotate(
|
||||||
|
first_name=Value(None, output_field=CharField()),
|
||||||
|
last_name=Value(None, output_field=CharField()),
|
||||||
|
email_display=F("email"),
|
||||||
|
last_active=Value("Invited", output_field=CharField()),
|
||||||
|
additional_permissions_display=F("additional_permissions"),
|
||||||
|
member_display=F("email"),
|
||||||
|
# Use ArrayRemove to return an empty list when no domain invitations are found
|
||||||
|
domain_info=ArrayRemoveNull(
|
||||||
|
ArrayAgg(
|
||||||
|
Subquery(domain_invitations.values("domain_info")),
|
||||||
|
distinct=True,
|
||||||
)
|
)
|
||||||
|
),
|
||||||
|
type=Value("invitedmember", output_field=CharField()),
|
||||||
|
joined_date=Value("Unretrieved", output_field=CharField()),
|
||||||
|
invited_by=cls.get_invited_by_query(object_id_query=Cast(OuterRef("id"), output_field=CharField())),
|
||||||
|
).values(*shared_columns)
|
||||||
|
|
||||||
return convert_queryset_to_dict(permissions.union(invitations), is_model=False)
|
return convert_queryset_to_dict(permissions.union(invitations), is_model=False)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_invited_by_query(cls, object_id_query):
|
||||||
|
"""Returns the user that created the given portfolio invitation.
|
||||||
|
Grabs this data from the audit log, given that a portfolio invitation object
|
||||||
|
is specified via object_id_query."""
|
||||||
|
return Coalesce(
|
||||||
|
Subquery(
|
||||||
|
LogEntry.objects.filter(
|
||||||
|
content_type=ContentType.objects.get_for_model(PortfolioInvitation),
|
||||||
|
object_id=object_id_query,
|
||||||
|
action_flag=ADDITION,
|
||||||
|
)
|
||||||
|
.annotate(
|
||||||
|
display_email=Case(
|
||||||
|
When(
|
||||||
|
Exists(
|
||||||
|
UserGroup.objects.filter(
|
||||||
|
name__in=["cisa_analysts_group", "full_access_group"],
|
||||||
|
user=OuterRef("user"),
|
||||||
|
)
|
||||||
|
),
|
||||||
|
then=Value("help@get.gov"),
|
||||||
|
),
|
||||||
|
default=F("user__email"),
|
||||||
|
output_field=CharField(),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.order_by("action_time")
|
||||||
|
.values("display_email")[:1]
|
||||||
|
),
|
||||||
|
Value("System"),
|
||||||
|
output_field=CharField(),
|
||||||
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_portfolio_invitation_id_query(cls):
|
||||||
|
"""Gets the id of the portfolio invitation that created this UserPortfolioPermission.
|
||||||
|
This makes the assumption that if an invitation is retrieved, it must have created the given
|
||||||
|
UserPortfolioPermission object."""
|
||||||
|
return Cast(
|
||||||
|
Subquery(
|
||||||
|
PortfolioInvitation.objects.filter(
|
||||||
|
status=PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED,
|
||||||
|
# Double outer ref because we first go into the LogEntry query,
|
||||||
|
# then into the parent UserPortfolioPermission.
|
||||||
|
email=OuterRef(OuterRef("user__email")),
|
||||||
|
portfolio=OuterRef(OuterRef("portfolio")),
|
||||||
|
).values("id")[:1]
|
||||||
|
),
|
||||||
|
output_field=CharField(),
|
||||||
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_columns(cls):
|
def get_columns(cls):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -1,447 +0,0 @@
|
||||||
"""
|
|
||||||
Model annotation classes.
|
|
||||||
|
|
||||||
Intended to return django querysets with computed fields for api endpoints and our csv reports.
|
|
||||||
Used by both API endpoints (e.g. portfolio members JSON) and data exports (e.g. CSV reports).
|
|
||||||
|
|
||||||
Initially created to manage the complexity of the MembersTable and Members CSV report,
|
|
||||||
as they require complex but common annotations.
|
|
||||||
|
|
||||||
Example:
|
|
||||||
# For a JSON table endpoint
|
|
||||||
permissions = UserPortfolioPermissionAnnotation.get_annotated_queryset(portfolio)
|
|
||||||
# Returns queryset with standardized fields for member tables
|
|
||||||
|
|
||||||
# For a CSV export
|
|
||||||
permissions = UserPortfolioPermissionAnnotation.get_annotated_queryset(portfolio, csv_report=True)
|
|
||||||
# Returns same fields but formatted for CSV export
|
|
||||||
"""
|
|
||||||
|
|
||||||
from abc import ABC, abstractmethod
|
|
||||||
from registrar.models import (
|
|
||||||
DomainInvitation,
|
|
||||||
PortfolioInvitation,
|
|
||||||
)
|
|
||||||
from django.db.models import (
|
|
||||||
CharField,
|
|
||||||
F,
|
|
||||||
ManyToManyField,
|
|
||||||
Q,
|
|
||||||
QuerySet,
|
|
||||||
Value,
|
|
||||||
TextField,
|
|
||||||
OuterRef,
|
|
||||||
Subquery,
|
|
||||||
Func,
|
|
||||||
Case,
|
|
||||||
When,
|
|
||||||
Exists,
|
|
||||||
)
|
|
||||||
from django.db.models.functions import Concat, Coalesce, Cast
|
|
||||||
from registrar.models.user_group import UserGroup
|
|
||||||
from registrar.models.user_portfolio_permission import UserPortfolioPermission
|
|
||||||
from registrar.models.utility.generic_helper import convert_queryset_to_dict
|
|
||||||
from registrar.models.utility.orm_helper import ArrayRemoveNull
|
|
||||||
from django.contrib.postgres.aggregates import ArrayAgg
|
|
||||||
from django.contrib.admin.models import LogEntry, ADDITION
|
|
||||||
from django.contrib.contenttypes.models import ContentType
|
|
||||||
|
|
||||||
|
|
||||||
class BaseModelAnnotation(ABC):
|
|
||||||
"""
|
|
||||||
Abstract base class that standardizes how models are annotated for csv exports or complex annotation queries.
|
|
||||||
For example, the Members table / csv export.
|
|
||||||
|
|
||||||
Subclasses define model-specific annotations, filters, and field formatting while inheriting
|
|
||||||
common queryset building logic.
|
|
||||||
Intended ensure consistent data presentation across both table UI components and CSV exports.
|
|
||||||
|
|
||||||
Base class in an inheritance tree of 4.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
@abstractmethod
|
|
||||||
def model(self):
|
|
||||||
"""
|
|
||||||
Property to specify the model that the export class will handle.
|
|
||||||
Must be implemented by subclasses.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_sort_fields(cls):
|
|
||||||
"""
|
|
||||||
Returns the sort fields for the CSV export. Override in subclasses as needed.
|
|
||||||
"""
|
|
||||||
return []
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_additional_args(cls):
|
|
||||||
"""
|
|
||||||
Returns additional keyword arguments as an empty dictionary.
|
|
||||||
Override in subclasses to provide specific arguments.
|
|
||||||
"""
|
|
||||||
return {}
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_select_related(cls):
|
|
||||||
"""
|
|
||||||
Get a list of tables to pass to select_related when building queryset.
|
|
||||||
"""
|
|
||||||
return []
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_prefetch_related(cls):
|
|
||||||
"""
|
|
||||||
Get a list of tables to pass to prefetch_related when building queryset.
|
|
||||||
"""
|
|
||||||
return []
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_exclusions(cls):
|
|
||||||
"""
|
|
||||||
Get a Q object of exclusion conditions to pass to .exclude() when building queryset.
|
|
||||||
"""
|
|
||||||
return Q()
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_filter_conditions(cls, **kwargs):
|
|
||||||
"""
|
|
||||||
Get a Q object of filter conditions to filter when building queryset.
|
|
||||||
"""
|
|
||||||
return Q()
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_annotated_fields(cls, **kwargs):
|
|
||||||
"""
|
|
||||||
Get a dict of computed fields. These are fields that do not exist on the model normally
|
|
||||||
and will be passed to .annotate() when building a queryset.
|
|
||||||
"""
|
|
||||||
return {}
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_annotations_for_sort(cls):
|
|
||||||
"""
|
|
||||||
Get a dict of annotations to make available for order_by clause.
|
|
||||||
"""
|
|
||||||
return {}
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_related_table_fields(cls):
|
|
||||||
"""
|
|
||||||
Get a list of fields from related tables.
|
|
||||||
"""
|
|
||||||
return []
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def annotate_and_retrieve_fields(
|
|
||||||
cls, initial_queryset, annotated_fields, related_table_fields=None, include_many_to_many=False, **kwargs
|
|
||||||
) -> QuerySet:
|
|
||||||
"""
|
|
||||||
Applies annotations to a queryset and retrieves specified fields,
|
|
||||||
including class-defined and annotation-defined.
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
initial_queryset (QuerySet): Initial queryset.
|
|
||||||
annotated_fields (dict, optional): Fields to compute {field_name: expression}.
|
|
||||||
related_table_fields (list, optional): Extra fields to retrieve; defaults to annotation keys if None.
|
|
||||||
include_many_to_many (bool, optional): Determines if we should include many to many fields or not
|
|
||||||
**kwargs: Additional keyword arguments for specific parameters (e.g., public_contacts, domain_invitations,
|
|
||||||
user_domain_roles).
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
QuerySet: Contains dictionaries with the specified fields for each record.
|
|
||||||
"""
|
|
||||||
if related_table_fields is None:
|
|
||||||
related_table_fields = []
|
|
||||||
|
|
||||||
# We can infer that if we're passing in annotations,
|
|
||||||
# we want to grab the result of said annotation.
|
|
||||||
if annotated_fields:
|
|
||||||
related_table_fields.extend(annotated_fields.keys())
|
|
||||||
|
|
||||||
# Get prexisting fields on the model
|
|
||||||
model_fields = set()
|
|
||||||
for field in cls.model()._meta.get_fields():
|
|
||||||
# Exclude many to many fields unless we specify
|
|
||||||
many_to_many = isinstance(field, ManyToManyField) and include_many_to_many
|
|
||||||
if many_to_many or not isinstance(field, ManyToManyField):
|
|
||||||
model_fields.add(field.name)
|
|
||||||
|
|
||||||
queryset = initial_queryset.annotate(**annotated_fields).values(*model_fields, *related_table_fields)
|
|
||||||
|
|
||||||
return cls.update_queryset(queryset, **kwargs)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_annotated_queryset(cls, **kwargs):
|
|
||||||
sort_fields = cls.get_sort_fields()
|
|
||||||
# Get additional args and merge with incoming kwargs
|
|
||||||
additional_args = cls.get_additional_args()
|
|
||||||
kwargs.update(additional_args)
|
|
||||||
select_related = cls.get_select_related()
|
|
||||||
prefetch_related = cls.get_prefetch_related()
|
|
||||||
exclusions = cls.get_exclusions()
|
|
||||||
annotations_for_sort = cls.get_annotations_for_sort()
|
|
||||||
filter_conditions = cls.get_filter_conditions(**kwargs)
|
|
||||||
annotated_fields = cls.get_annotated_fields(**kwargs)
|
|
||||||
related_table_fields = cls.get_related_table_fields()
|
|
||||||
|
|
||||||
model_queryset = (
|
|
||||||
cls.model()
|
|
||||||
.objects.select_related(*select_related)
|
|
||||||
.prefetch_related(*prefetch_related)
|
|
||||||
.filter(filter_conditions)
|
|
||||||
.exclude(exclusions)
|
|
||||||
.annotate(**annotations_for_sort)
|
|
||||||
.order_by(*sort_fields)
|
|
||||||
.distinct()
|
|
||||||
)
|
|
||||||
return cls.annotate_and_retrieve_fields(model_queryset, annotated_fields, related_table_fields, **kwargs)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def update_queryset(cls, queryset, **kwargs):
|
|
||||||
"""
|
|
||||||
Returns an updated queryset. Override in subclass to update queryset.
|
|
||||||
"""
|
|
||||||
return queryset
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_model_annotation_dict(cls, **kwargs):
|
|
||||||
return convert_queryset_to_dict(cls.get_annotated_queryset(**kwargs), is_model=False)
|
|
||||||
|
|
||||||
|
|
||||||
class UserPortfolioPermissionModelAnnotation(BaseModelAnnotation):
|
|
||||||
"""
|
|
||||||
Annotates UserPortfolioPermission querysets with computed fields for member tables.
|
|
||||||
Handles formatting of user details, permissions, and related domain information
|
|
||||||
for both UI display and CSV export.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def model(cls):
|
|
||||||
# Return the model class that this export handles
|
|
||||||
return UserPortfolioPermission
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_select_related(cls):
|
|
||||||
"""
|
|
||||||
Get a list of tables to pass to select_related when building queryset.
|
|
||||||
"""
|
|
||||||
return ["user"]
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_filter_conditions(cls, portfolio, **kwargs):
|
|
||||||
"""
|
|
||||||
Get a Q object of filter conditions to filter when building queryset.
|
|
||||||
"""
|
|
||||||
if not portfolio:
|
|
||||||
# Return nothing
|
|
||||||
return Q(id__in=[])
|
|
||||||
|
|
||||||
# Get all members on this portfolio
|
|
||||||
return Q(portfolio=portfolio)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_portfolio_invitation_id_query(cls):
|
|
||||||
"""Gets the id of the portfolio invitation that created this UserPortfolioPermission.
|
|
||||||
This makes the assumption that if an invitation is retrieved, it must have created the given
|
|
||||||
UserPortfolioPermission object."""
|
|
||||||
return Cast(
|
|
||||||
Subquery(
|
|
||||||
PortfolioInvitation.objects.filter(
|
|
||||||
status=PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED,
|
|
||||||
# Double outer ref because we first go into the LogEntry query,
|
|
||||||
# then into the parent UserPortfolioPermission.
|
|
||||||
email=OuterRef(OuterRef("user__email")),
|
|
||||||
portfolio=OuterRef(OuterRef("portfolio")),
|
|
||||||
).values("id")[:1]
|
|
||||||
),
|
|
||||||
output_field=CharField(),
|
|
||||||
)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_annotated_fields(cls, portfolio, csv_report=False):
|
|
||||||
"""
|
|
||||||
Get a dict of computed fields. These are fields that do not exist on the model normally
|
|
||||||
and will be passed to .annotate() when building a queryset.
|
|
||||||
"""
|
|
||||||
if not portfolio:
|
|
||||||
# Return nothing
|
|
||||||
return {}
|
|
||||||
|
|
||||||
# Tweak the queries slightly to only return the data we need.
|
|
||||||
# When returning data for the csv report we:
|
|
||||||
# 1. Only return the domain name for 'domain_info' rather than also add ':' seperated id
|
|
||||||
# 2. Return a formatted date for 'last_active'
|
|
||||||
# These are just optimizations that are better done in SQL as opposed to python.
|
|
||||||
if csv_report:
|
|
||||||
domain_query = F("user__permissions__domain__name")
|
|
||||||
last_active_query = Func(
|
|
||||||
F("user__last_login"), Value("YYYY-MM-DD"), function="to_char", output_field=TextField()
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# an array of domains, with id and name, colon separated
|
|
||||||
domain_query = Concat(
|
|
||||||
F("user__permissions__domain_id"),
|
|
||||||
Value(":"),
|
|
||||||
F("user__permissions__domain__name"),
|
|
||||||
# specify the output_field to ensure union has same column types
|
|
||||||
output_field=CharField(),
|
|
||||||
)
|
|
||||||
last_active_query = Cast(F("user__last_login"), output_field=TextField())
|
|
||||||
|
|
||||||
return {
|
|
||||||
"first_name": F("user__first_name"),
|
|
||||||
"last_name": F("user__last_name"),
|
|
||||||
"email_display": F("user__email"),
|
|
||||||
"last_active": Coalesce(
|
|
||||||
last_active_query,
|
|
||||||
Value("Invalid date"),
|
|
||||||
output_field=CharField(),
|
|
||||||
),
|
|
||||||
"additional_permissions_display": F("additional_permissions"),
|
|
||||||
"member_display": Case(
|
|
||||||
# If email is present and not blank, use email
|
|
||||||
When(Q(user__email__isnull=False) & ~Q(user__email=""), then=F("user__email")),
|
|
||||||
# If first name or last name is present, use concatenation of first_name + " " + last_name
|
|
||||||
When(
|
|
||||||
Q(user__first_name__isnull=False) | Q(user__last_name__isnull=False),
|
|
||||||
then=Concat(
|
|
||||||
Coalesce(F("user__first_name"), Value("")),
|
|
||||||
Value(" "),
|
|
||||||
Coalesce(F("user__last_name"), Value("")),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
# If neither, use an empty string
|
|
||||||
default=Value(""),
|
|
||||||
output_field=CharField(),
|
|
||||||
),
|
|
||||||
"domain_info": ArrayAgg(
|
|
||||||
domain_query,
|
|
||||||
distinct=True,
|
|
||||||
# only include domains in portfolio
|
|
||||||
filter=Q(user__permissions__domain__isnull=False)
|
|
||||||
& Q(user__permissions__domain__domain_info__portfolio=portfolio),
|
|
||||||
),
|
|
||||||
"type": Value("member", output_field=CharField()),
|
|
||||||
"joined_date": Func(F("created_at"), Value("YYYY-MM-DD"), function="to_char", output_field=CharField()),
|
|
||||||
"invited_by": PortfolioInvitationModelAnnotation.get_invited_by_query(
|
|
||||||
object_id_query=cls.get_portfolio_invitation_id_query()
|
|
||||||
),
|
|
||||||
}
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_annotated_queryset(cls, portfolio, csv_report=False):
|
|
||||||
"""Override of the base annotated queryset to pass in portfolio"""
|
|
||||||
return super().get_annotated_queryset(portfolio=portfolio, csv_report=csv_report)
|
|
||||||
|
|
||||||
|
|
||||||
class PortfolioInvitationModelAnnotation(BaseModelAnnotation):
|
|
||||||
"""
|
|
||||||
Annotates PortfolioInvitation querysets with computed fields for the member table.
|
|
||||||
Handles formatting of user details, permissions, and related domain information
|
|
||||||
for both UI display and CSV export.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def model(cls):
|
|
||||||
# Return the model class that this export handles
|
|
||||||
return PortfolioInvitation
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_exclusions(cls):
|
|
||||||
"""
|
|
||||||
Get a Q object of exclusion conditions to pass to .exclude() when building queryset.
|
|
||||||
"""
|
|
||||||
return Q(status=PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_filter_conditions(cls, portfolio, **kwargs):
|
|
||||||
"""
|
|
||||||
Get a Q object of filter conditions to filter when building queryset.
|
|
||||||
"""
|
|
||||||
if not portfolio:
|
|
||||||
# Return nothing
|
|
||||||
return Q(id__in=[])
|
|
||||||
|
|
||||||
# Get all members on this portfolio
|
|
||||||
return Q(portfolio=portfolio)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_invited_by_query(cls, object_id_query):
|
|
||||||
"""Returns the user that created the given portfolio invitation.
|
|
||||||
Grabs this data from the audit log, given that a portfolio invitation object
|
|
||||||
is specified via object_id_query."""
|
|
||||||
return Coalesce(
|
|
||||||
Subquery(
|
|
||||||
LogEntry.objects.filter(
|
|
||||||
content_type=ContentType.objects.get_for_model(PortfolioInvitation),
|
|
||||||
object_id=object_id_query,
|
|
||||||
action_flag=ADDITION,
|
|
||||||
)
|
|
||||||
.annotate(
|
|
||||||
display_email=Case(
|
|
||||||
When(
|
|
||||||
Exists(
|
|
||||||
UserGroup.objects.filter(
|
|
||||||
name__in=["cisa_analysts_group", "full_access_group"],
|
|
||||||
user=OuterRef("user"),
|
|
||||||
)
|
|
||||||
),
|
|
||||||
then=Value("help@get.gov"),
|
|
||||||
),
|
|
||||||
default=F("user__email"),
|
|
||||||
output_field=CharField(),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.order_by("action_time")
|
|
||||||
.values("display_email")[:1]
|
|
||||||
),
|
|
||||||
Value("System"),
|
|
||||||
output_field=CharField(),
|
|
||||||
)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_annotated_fields(cls, portfolio, csv_report=False):
|
|
||||||
"""
|
|
||||||
Get a dict of computed fields. These are fields that do not exist on the model normally
|
|
||||||
and will be passed to .annotate() when building a queryset.
|
|
||||||
"""
|
|
||||||
if not portfolio:
|
|
||||||
# Return nothing
|
|
||||||
return {}
|
|
||||||
|
|
||||||
# Tweak the queries slightly to only return the data we need
|
|
||||||
if csv_report:
|
|
||||||
domain_query = F("domain__name")
|
|
||||||
else:
|
|
||||||
domain_query = Concat(F("domain__id"), Value(":"), F("domain__name"), output_field=CharField())
|
|
||||||
|
|
||||||
# Get all existing domain invitations and search on that for domains the user exists on
|
|
||||||
domain_invitations = DomainInvitation.objects.filter(
|
|
||||||
email=OuterRef("email"), # Check if email matches the OuterRef("email")
|
|
||||||
domain__domain_info__portfolio=portfolio, # Check if the domain's portfolio matches the given portfolio
|
|
||||||
).annotate(domain_info=domain_query)
|
|
||||||
return {
|
|
||||||
"first_name": Value(None, output_field=CharField()),
|
|
||||||
"last_name": Value(None, output_field=CharField()),
|
|
||||||
"email_display": F("email"),
|
|
||||||
"last_active": Value("Invited", output_field=CharField()),
|
|
||||||
"additional_permissions_display": F("additional_permissions"),
|
|
||||||
"member_display": F("email"),
|
|
||||||
# Use ArrayRemove to return an empty list when no domain invitations are found
|
|
||||||
"domain_info": ArrayRemoveNull(
|
|
||||||
ArrayAgg(
|
|
||||||
Subquery(domain_invitations.values("domain_info")),
|
|
||||||
distinct=True,
|
|
||||||
)
|
|
||||||
),
|
|
||||||
"type": Value("invitedmember", output_field=CharField()),
|
|
||||||
"joined_date": Value("Unretrieved", output_field=CharField()),
|
|
||||||
"invited_by": cls.get_invited_by_query(object_id_query=Cast(OuterRef("id"), output_field=CharField())),
|
|
||||||
}
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_annotated_queryset(cls, portfolio, csv_report=False):
|
|
||||||
"""Override of the base annotated queryset to pass in portfolio"""
|
|
||||||
return super().get_annotated_queryset(portfolio=portfolio, csv_report=csv_report)
|
|
|
@ -3,15 +3,16 @@ from django.core.paginator import Paginator
|
||||||
from django.db.models import F, Q
|
from django.db.models import F, Q
|
||||||
from django.urls import reverse
|
from django.urls import reverse
|
||||||
from django.views import View
|
from django.views import View
|
||||||
|
from django.db.models.expressions import Func
|
||||||
|
from django.db.models import Value, F, CharField, TextField, Q, Case, When, OuterRef, Subquery
|
||||||
|
from django.db.models.functions import Cast, Coalesce, Concat
|
||||||
|
from django.contrib.postgres.aggregates import ArrayAgg
|
||||||
from registrar.models import UserPortfolioPermission
|
from registrar.models import UserPortfolioPermission
|
||||||
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
|
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
|
||||||
from registrar.utility.model_annotations import (
|
|
||||||
PortfolioInvitationModelAnnotation,
|
|
||||||
UserPortfolioPermissionModelAnnotation,
|
|
||||||
)
|
|
||||||
from registrar.views.utility.mixins import PortfolioMembersPermission
|
from registrar.views.utility.mixins import PortfolioMembersPermission
|
||||||
|
from registrar.models.domain_invitation import DomainInvitation
|
||||||
|
from registrar.models.portfolio_invitation import PortfolioInvitation
|
||||||
|
from registrar.models.user_portfolio_permission import UserPortfolioPermission
|
||||||
|
|
||||||
class PortfolioMembersJson(PortfolioMembersPermission, View):
|
class PortfolioMembersJson(PortfolioMembersPermission, View):
|
||||||
|
|
||||||
|
@ -55,8 +56,53 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
|
||||||
|
|
||||||
def initial_permissions_search(self, portfolio):
|
def initial_permissions_search(self, portfolio):
|
||||||
"""Perform initial search for permissions before applying any filters."""
|
"""Perform initial search for permissions before applying any filters."""
|
||||||
queryset = UserPortfolioPermissionModelAnnotation.get_annotated_queryset(portfolio)
|
permissions = UserPortfolioPermission.objects.filter(portfolio=portfolio)
|
||||||
return queryset.values(
|
permissions = (
|
||||||
|
permissions.select_related("user")
|
||||||
|
.annotate(
|
||||||
|
first_name=F("user__first_name"),
|
||||||
|
last_name=F("user__last_name"),
|
||||||
|
email_display=F("user__email"),
|
||||||
|
last_active=Coalesce(
|
||||||
|
Cast(F("user__last_login"), output_field=TextField()), # Cast last_login to text
|
||||||
|
Value("Invalid date"),
|
||||||
|
output_field=TextField(),
|
||||||
|
),
|
||||||
|
additional_permissions_display=F("additional_permissions"),
|
||||||
|
member_display=Case(
|
||||||
|
# If email is present and not blank, use email
|
||||||
|
When(Q(user__email__isnull=False) & ~Q(user__email=""), then=F("user__email")),
|
||||||
|
# If first name or last name is present, use concatenation of first_name + " " + last_name
|
||||||
|
When(
|
||||||
|
Q(user__first_name__isnull=False) | Q(user__last_name__isnull=False),
|
||||||
|
then=Concat(
|
||||||
|
Coalesce(F("user__first_name"), Value("")),
|
||||||
|
Value(" "),
|
||||||
|
Coalesce(F("user__last_name"), Value("")),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
# If neither, use an empty string
|
||||||
|
default=Value(""),
|
||||||
|
output_field=CharField(),
|
||||||
|
),
|
||||||
|
domain_info=ArrayAgg(
|
||||||
|
# an array of domains, with id and name, colon separated
|
||||||
|
Concat(
|
||||||
|
F("user__permissions__domain_id"),
|
||||||
|
Value(":"),
|
||||||
|
F("user__permissions__domain__name"),
|
||||||
|
# specify the output_field to ensure union has same column types
|
||||||
|
output_field=CharField(),
|
||||||
|
),
|
||||||
|
distinct=True,
|
||||||
|
filter=Q(user__permissions__domain__isnull=False) # filter out null values
|
||||||
|
& Q(
|
||||||
|
user__permissions__domain__domain_info__portfolio=portfolio
|
||||||
|
), # only include domains in portfolio
|
||||||
|
),
|
||||||
|
type=Value("member", output_field=CharField()),
|
||||||
|
)
|
||||||
|
.values(
|
||||||
"id",
|
"id",
|
||||||
"first_name",
|
"first_name",
|
||||||
"last_name",
|
"last_name",
|
||||||
|
@ -68,12 +114,34 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
|
||||||
"domain_info",
|
"domain_info",
|
||||||
"type",
|
"type",
|
||||||
)
|
)
|
||||||
|
)
|
||||||
|
return permissions
|
||||||
|
|
||||||
def initial_invitations_search(self, portfolio):
|
def initial_invitations_search(self, portfolio):
|
||||||
"""Perform initial invitations search and get related DomainInvitation data based on the email."""
|
"""Perform initial invitations search and get related DomainInvitation data based on the email."""
|
||||||
# Get DomainInvitation query for matching email and for the portfolio
|
# Get DomainInvitation query for matching email and for the portfolio
|
||||||
queryset = PortfolioInvitationModelAnnotation.get_annotated_queryset(portfolio)
|
domain_invitations = DomainInvitation.objects.filter(
|
||||||
return queryset.values(
|
email=OuterRef("email"), # Check if email matches the OuterRef("email")
|
||||||
|
domain__domain_info__portfolio=portfolio, # Check if the domain's portfolio matches the given portfolio
|
||||||
|
).annotate(domain_info=Concat(F("domain__id"), Value(":"), F("domain__name"), output_field=CharField()))
|
||||||
|
# PortfolioInvitation query
|
||||||
|
invitations = PortfolioInvitation.objects.filter(portfolio=portfolio)
|
||||||
|
invitations = invitations.annotate(
|
||||||
|
first_name=Value(None, output_field=CharField()),
|
||||||
|
last_name=Value(None, output_field=CharField()),
|
||||||
|
email_display=F("email"),
|
||||||
|
last_active=Value("Invited", output_field=TextField()),
|
||||||
|
additional_permissions_display=F("additional_permissions"),
|
||||||
|
member_display=F("email"),
|
||||||
|
# Use ArrayRemove to return an empty list when no domain invitations are found
|
||||||
|
domain_info=ArrayRemove(
|
||||||
|
ArrayAgg(
|
||||||
|
Subquery(domain_invitations.values("domain_info")),
|
||||||
|
distinct=True,
|
||||||
|
)
|
||||||
|
),
|
||||||
|
type=Value("invitedmember", output_field=CharField()),
|
||||||
|
).values(
|
||||||
"id",
|
"id",
|
||||||
"first_name",
|
"first_name",
|
||||||
"last_name",
|
"last_name",
|
||||||
|
@ -85,6 +153,7 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
|
||||||
"domain_info",
|
"domain_info",
|
||||||
"type",
|
"type",
|
||||||
)
|
)
|
||||||
|
return invitations
|
||||||
|
|
||||||
def apply_search_term(self, queryset, request):
|
def apply_search_term(self, queryset, request):
|
||||||
"""Apply search term to the queryset."""
|
"""Apply search term to the queryset."""
|
||||||
|
@ -144,3 +213,9 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
|
||||||
"svg_icon": ("visibility" if view_only else "settings"),
|
"svg_icon": ("visibility" if view_only else "settings"),
|
||||||
}
|
}
|
||||||
return member_json
|
return member_json
|
||||||
|
|
||||||
|
|
||||||
|
# Custom Func to use array_remove to remove null values
|
||||||
|
class ArrayRemove(Func):
|
||||||
|
function = "array_remove"
|
||||||
|
template = "%(function)s(%(expressions)s, NULL)"
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue