From 4dd16ec37058486a6b9f9f729eb77c8f815f75a0 Mon Sep 17 00:00:00 2001 From: David Kennedy Date: Mon, 3 Mar 2025 16:53:28 -0500 Subject: [PATCH] add omb analyst group, add omb analyst permission, refine is_staff decorator as appropriate --- src/registrar/decorators.py | 6 ++ .../migrations/0141_create_groups_v18.py | 38 +++++++ src/registrar/models/user.py | 1 + src/registrar/models/user_group.py | 98 +++++++++++++++++++ src/registrar/views/report_views.py | 20 ++-- src/registrar/views/transfer_user.py | 4 +- src/registrar/views/utility/api_views.py | 50 ++-------- 7 files changed, 162 insertions(+), 55 deletions(-) create mode 100644 src/registrar/migrations/0141_create_groups_v18.py diff --git a/src/registrar/decorators.py b/src/registrar/decorators.py index 7cb2792f4..8188b152e 100644 --- a/src/registrar/decorators.py +++ b/src/registrar/decorators.py @@ -6,6 +6,9 @@ from registrar.models import Domain, DomainInformation, DomainInvitation, Domain # Constants for clarity ALL = "all" IS_STAFF = "is_staff" +IS_CISA_ANALYST = "is_cisa_analyst" +IS_OMB_ANALYST = "is_omb_analyst" +IS_FULL_ACCESS = "is_full_access" IS_DOMAIN_MANAGER = "is_domain_manager" IS_DOMAIN_REQUEST_CREATOR = "is_domain_request_creator" IS_STAFF_MANAGING_DOMAIN = "is_staff_managing_domain" @@ -101,6 +104,9 @@ def _user_has_permission(user, request, rules, **kwargs): # Define permission checks permission_checks = [ (IS_STAFF, lambda: user.is_staff), + (IS_CISA_ANALYST, lambda: user.has_perm("registrar.analyst_access_permission")), + (IS_OMB_ANALYST, lambda: user.has_perm("registrar.omb_analyst_access_permission")), + (IS_FULL_ACCESS, lambda: user.has_perm("registrar.full_access_permission")), (IS_DOMAIN_MANAGER, lambda: _is_domain_manager(user, **kwargs)), (IS_STAFF_MANAGING_DOMAIN, lambda: _is_staff_managing_domain(request, **kwargs)), (IS_PORTFOLIO_MEMBER, lambda: user.is_org_user(request)), diff --git a/src/registrar/migrations/0141_create_groups_v18.py b/src/registrar/migrations/0141_create_groups_v18.py new file mode 100644 index 000000000..c416f0f1e --- /dev/null +++ b/src/registrar/migrations/0141_create_groups_v18.py @@ -0,0 +1,38 @@ +# This migration creates the create_full_access_group and create_cisa_analyst_group groups +# It is dependent on 0079 (which populates federal agencies) +# If permissions on the groups need changing, edit CISA_ANALYST_GROUP_PERMISSIONS +# in the user_group model then: +# [NOT RECOMMENDED] +# step 1: docker-compose exec app ./manage.py migrate --fake registrar 0035_contenttypes_permissions +# step 2: docker-compose exec app ./manage.py migrate registrar 0036_create_groups +# step 3: fake run the latest migration in the migrations list +# [RECOMMENDED] +# Alternatively: +# step 1: duplicate the migration that loads data +# step 2: docker-compose exec app ./manage.py migrate + +from django.db import migrations +from registrar.models import UserGroup +from typing import Any + + +# For linting: RunPython expects a function reference, +# so let's give it one +def create_groups(apps, schema_editor) -> Any: + UserGroup.create_cisa_analyst_group(apps, schema_editor) + UserGroup.create_omb_analyst_group(apps, schema_editor) + UserGroup.create_full_access_group(apps, schema_editor) + + +class Migration(migrations.Migration): + dependencies = [ + ("registrar", "0140_alter_portfolioinvitation_additional_permissions_and_more"), + ] + + operations = [ + migrations.RunPython( + create_groups, + reverse_code=migrations.RunPython.noop, + atomic=True, + ), + ] diff --git a/src/registrar/models/user.py b/src/registrar/models/user.py index d5476ab9a..7ee4fefdf 100644 --- a/src/registrar/models/user.py +++ b/src/registrar/models/user.py @@ -40,6 +40,7 @@ class User(AbstractUser): permissions = [ ("analyst_access_permission", "Analyst Access Permission"), + ("omb_analyst_access_permission", "OMB Analyst Access Permission"), ("full_access_permission", "Full Access Permission"), ] diff --git a/src/registrar/models/user_group.py b/src/registrar/models/user_group.py index 4770f34bc..b3bddee66 100644 --- a/src/registrar/models/user_group.py +++ b/src/registrar/models/user_group.py @@ -141,6 +141,104 @@ class UserGroup(Group): except Exception as e: logger.error(f"Error creating analyst permissions group: {e}") + def create_omb_analyst_group(apps, schema_editor): + """This method gets run from a data migration.""" + + # Hard to pass self to these methods as the calls from migrations + # are only expecting apps and schema_editor, so we'll just define + # apps, schema_editor in the local scope instead + OMB_ANALYST_GROUP_PERMISSIONS = [ + { + "app_label": "registrar", + "model": "domainrequest", + "permissions": ["change_domainrequest"], + }, + { + "app_label": "registrar", + "model": "domain", + "permissions": ["view_domain"], + }, + { + "app_label": "registrar", + "model": "user", + "permissions": ["omb_analyst_access_permission"], + }, + { + "app_label": "registrar", + "model": "domaininvitation", + "permissions": ["view_domaininvitation"], + }, + { + "app_label": "registrar", + "model": "federalagency", + "permissions": ["change_federalagency", "delete_federalagency"], + }, + { + "app_label": "registrar", + "model": "portfolio", + "permissions": ["change_portfolio", "delete_portfolio"], + }, + { + "app_label": "registrar", + "model": "suborganization", + "permissions": ["change_suborganization", "delete_suborganization"], + }, + { + "app_label": "registrar", + "model": "seniorofficial", + "permissions": ["change_seniorofficial", "delete_seniorofficial"], + }, + ] + + # Avoid error: You can't execute queries until the end + # of the 'atomic' block. + # From django docs: + # https://docs.djangoproject.com/en/4.2/topics/migrations/#data-migrations + # We can’t import the Person model directly as it may be a newer + # version than this migration expects. We use the historical version. + ContentType = apps.get_model("contenttypes", "ContentType") + Permission = apps.get_model("auth", "Permission") + UserGroup = apps.get_model("registrar", "UserGroup") + + logger.info("Going to create the OMB Analyst Group") + try: + omb_analysts_group, _ = UserGroup.objects.get_or_create( + name="omb_analysts_group", + ) + + omb_analysts_group.permissions.clear() + + for permission in OMB_ANALYST_GROUP_PERMISSIONS: + app_label = permission["app_label"] + model_name = permission["model"] + permissions = permission["permissions"] + + # Retrieve the content type for the app and model + content_type = ContentType.objects.get(app_label=app_label, model=model_name) + + # Retrieve the permissions based on their codenames + permissions = Permission.objects.filter(content_type=content_type, codename__in=permissions) + + # Assign the permissions to the group + omb_analysts_group.permissions.add(*permissions) + + # Convert the permissions QuerySet to a list of codenames + permission_list = list(permissions.values_list("codename", flat=True)) + + logger.debug( + app_label + + " | " + + model_name + + " | " + + ", ".join(permission_list) + + " added to group " + + omb_analysts_group.name + ) + + logger.debug("OMB Analyst permissions added to group " + omb_analysts_group.name) + except Exception as e: + logger.error(f"Error creating analyst permissions group: {e}") + def create_full_access_group(apps, schema_editor): """This method gets run from a data migration.""" diff --git a/src/registrar/views/report_views.py b/src/registrar/views/report_views.py index c07dcfc1b..7f1e63e32 100644 --- a/src/registrar/views/report_views.py +++ b/src/registrar/views/report_views.py @@ -6,7 +6,7 @@ from django.shortcuts import render from django.contrib import admin from django.db.models import Avg, F -from registrar.decorators import ALL, HAS_PORTFOLIO_MEMBERS_VIEW, IS_STAFF, grant_access +from registrar.decorators import ALL, HAS_PORTFOLIO_MEMBERS_VIEW, IS_CISA_ANALYST, IS_FULL_ACCESS, grant_access from .. import models import datetime from django.utils import timezone @@ -16,7 +16,7 @@ import logging logger = logging.getLogger(__name__) -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS) class AnalyticsView(View): def get(self, request): thirty_days_ago = datetime.datetime.today() - datetime.timedelta(days=30) @@ -176,7 +176,7 @@ class AnalyticsView(View): return render(request, "admin/analytics.html", context) -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS) class ExportDataType(View): def get(self, request, *args, **kwargs): # match the CSV example with all the fields @@ -227,7 +227,7 @@ class ExportMembersPortfolio(View): return response -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS) class ExportDataFull(View): def get(self, request, *args, **kwargs): # Smaller export based on 1 @@ -237,7 +237,7 @@ class ExportDataFull(View): return response -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS) class ExportDataFederal(View): def get(self, request, *args, **kwargs): # Federal only @@ -247,7 +247,7 @@ class ExportDataFederal(View): return response -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS) class ExportDomainRequestDataFull(View): """Generates a downloaded report containing all Domain Requests (except started)""" @@ -259,7 +259,7 @@ class ExportDomainRequestDataFull(View): return response -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS) class ExportDataDomainsGrowth(View): def get(self, request, *args, **kwargs): start_date = request.GET.get("start_date", "") @@ -272,7 +272,7 @@ class ExportDataDomainsGrowth(View): return response -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS) class ExportDataRequestsGrowth(View): def get(self, request, *args, **kwargs): start_date = request.GET.get("start_date", "") @@ -285,7 +285,7 @@ class ExportDataRequestsGrowth(View): return response -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS) class ExportDataManagedDomains(View): def get(self, request, *args, **kwargs): start_date = request.GET.get("start_date", "") @@ -297,7 +297,7 @@ class ExportDataManagedDomains(View): return response -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS) class ExportDataUnmanagedDomains(View): def get(self, request, *args, **kwargs): start_date = request.GET.get("start_date", "") diff --git a/src/registrar/views/transfer_user.py b/src/registrar/views/transfer_user.py index 62cd0a9d2..ee8ebad35 100644 --- a/src/registrar/views/transfer_user.py +++ b/src/registrar/views/transfer_user.py @@ -4,7 +4,7 @@ from django.db.models import ForeignKey, OneToOneField, ManyToManyField, ManyToO from django.shortcuts import render, get_object_or_404, redirect from django.views import View -from registrar.decorators import IS_STAFF, grant_access +from registrar.decorators import IS_CISA_ANALYST, IS_FULL_ACCESS, grant_access from registrar.models.domain import Domain from registrar.models.domain_request import DomainRequest from registrar.models.user import User @@ -19,7 +19,7 @@ from registrar.utility.db_helpers import ignore_unique_violation logger = logging.getLogger(__name__) -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS) class TransferUserView(View): """Transfer user methods that set up the transfer_user template and handle the forms on it.""" diff --git a/src/registrar/views/utility/api_views.py b/src/registrar/views/utility/api_views.py index 6d0a2b5ec..ea794e185 100644 --- a/src/registrar/views/utility/api_views.py +++ b/src/registrar/views/utility/api_views.py @@ -1,7 +1,7 @@ import logging from django.http import JsonResponse from django.forms.models import model_to_dict -from registrar.decorators import IS_STAFF, grant_access +from registrar.decorators import IS_CISA_ANALYST, IS_FULL_ACCESS, IS_OMB_ANALYST, grant_access from registrar.models import FederalAgency, SeniorOfficial, DomainRequest from registrar.utility.admin_helpers import get_action_needed_reason_default_email, get_rejection_reason_default_email from registrar.models.portfolio import Portfolio @@ -10,16 +10,10 @@ from registrar.utility.constants import BranchChoices logger = logging.getLogger(__name__) -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_OMB_ANALYST, IS_FULL_ACCESS) def get_senior_official_from_federal_agency_json(request): """Returns federal_agency information as a JSON""" - # This API is only accessible to admins and analysts - superuser_perm = request.user.has_perm("registrar.full_access_permission") - analyst_perm = request.user.has_perm("registrar.analyst_access_permission") - if not request.user.is_authenticated or not any([analyst_perm, superuser_perm]): - return JsonResponse({"error": "You do not have access to this resource"}, status=403) - agency_name = request.GET.get("agency_name") agency = FederalAgency.objects.filter(agency=agency_name).first() senior_official = SeniorOfficial.objects.filter(federal_agency=agency).first() @@ -37,16 +31,10 @@ def get_senior_official_from_federal_agency_json(request): return JsonResponse({"error": "Senior Official not found"}, status=404) -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_OMB_ANALYST, IS_FULL_ACCESS) def get_portfolio_json(request): """Returns portfolio information as a JSON""" - # This API is only accessible to admins and analysts - superuser_perm = request.user.has_perm("registrar.full_access_permission") - analyst_perm = request.user.has_perm("registrar.analyst_access_permission") - if not request.user.is_authenticated or not any([analyst_perm, superuser_perm]): - return JsonResponse({"error": "You do not have access to this resource"}, status=403) - portfolio_id = request.GET.get("id") try: portfolio = Portfolio.objects.get(id=portfolio_id) @@ -93,16 +81,10 @@ def get_portfolio_json(request): return JsonResponse(portfolio_dict) -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_OMB_ANALYST, IS_FULL_ACCESS) def get_suborganization_list_json(request): """Returns suborganization list information for a portfolio as a JSON""" - # This API is only accessible to admins and analysts - superuser_perm = request.user.has_perm("registrar.full_access_permission") - analyst_perm = request.user.has_perm("registrar.analyst_access_permission") - if not request.user.is_authenticated or not any([analyst_perm, superuser_perm]): - return JsonResponse({"error": "You do not have access to this resource"}, status=403) - portfolio_id = request.GET.get("portfolio_id") try: portfolio = Portfolio.objects.get(id=portfolio_id) @@ -115,17 +97,11 @@ def get_suborganization_list_json(request): return JsonResponse({"results": results, "pagination": {"more": False}}) -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_OMB_ANALYST, IS_FULL_ACCESS) def get_federal_and_portfolio_types_from_federal_agency_json(request): """Returns specific portfolio information as a JSON. Request must have both agency_name and organization_type.""" - # This API is only accessible to admins and analysts - superuser_perm = request.user.has_perm("registrar.full_access_permission") - analyst_perm = request.user.has_perm("registrar.analyst_access_permission") - if not request.user.is_authenticated or not any([analyst_perm, superuser_perm]): - return JsonResponse({"error": "You do not have access to this resource"}, status=403) - federal_type = None portfolio_type = None @@ -143,16 +119,10 @@ def get_federal_and_portfolio_types_from_federal_agency_json(request): return JsonResponse(response_data) -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_OMB_ANALYST, IS_FULL_ACCESS) def get_action_needed_email_for_user_json(request): """Returns a default action needed email for a given user""" - # This API is only accessible to admins and analysts - superuser_perm = request.user.has_perm("registrar.full_access_permission") - analyst_perm = request.user.has_perm("registrar.analyst_access_permission") - if not request.user.is_authenticated or not any([analyst_perm, superuser_perm]): - return JsonResponse({"error": "You do not have access to this resource"}, status=403) - reason = request.GET.get("reason") domain_request_id = request.GET.get("domain_request_id") if not reason: @@ -167,16 +137,10 @@ def get_action_needed_email_for_user_json(request): return JsonResponse({"email": email}, status=200) -@grant_access(IS_STAFF) +@grant_access(IS_CISA_ANALYST, IS_OMB_ANALYST, IS_FULL_ACCESS) def get_rejection_email_for_user_json(request): """Returns a default rejection email for a given user""" - # This API is only accessible to admins and analysts - superuser_perm = request.user.has_perm("registrar.full_access_permission") - analyst_perm = request.user.has_perm("registrar.analyst_access_permission") - if not request.user.is_authenticated or not any([analyst_perm, superuser_perm]): - return JsonResponse({"error": "You do not have access to this resource"}, status=403) - reason = request.GET.get("reason") domain_request_id = request.GET.get("domain_request_id") if not reason: