add omb analyst group, add omb analyst permission, refine is_staff decorator as appropriate

This commit is contained in:
David Kennedy 2025-03-03 16:53:28 -05:00
parent 39bd73fc91
commit 4dd16ec370
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B
7 changed files with 162 additions and 55 deletions

View file

@ -6,6 +6,9 @@ from registrar.models import Domain, DomainInformation, DomainInvitation, Domain
# Constants for clarity
ALL = "all"
IS_STAFF = "is_staff"
IS_CISA_ANALYST = "is_cisa_analyst"
IS_OMB_ANALYST = "is_omb_analyst"
IS_FULL_ACCESS = "is_full_access"
IS_DOMAIN_MANAGER = "is_domain_manager"
IS_DOMAIN_REQUEST_CREATOR = "is_domain_request_creator"
IS_STAFF_MANAGING_DOMAIN = "is_staff_managing_domain"
@ -101,6 +104,9 @@ def _user_has_permission(user, request, rules, **kwargs):
# Define permission checks
permission_checks = [
(IS_STAFF, lambda: user.is_staff),
(IS_CISA_ANALYST, lambda: user.has_perm("registrar.analyst_access_permission")),
(IS_OMB_ANALYST, lambda: user.has_perm("registrar.omb_analyst_access_permission")),
(IS_FULL_ACCESS, lambda: user.has_perm("registrar.full_access_permission")),
(IS_DOMAIN_MANAGER, lambda: _is_domain_manager(user, **kwargs)),
(IS_STAFF_MANAGING_DOMAIN, lambda: _is_staff_managing_domain(request, **kwargs)),
(IS_PORTFOLIO_MEMBER, lambda: user.is_org_user(request)),

View file

@ -0,0 +1,38 @@
# This migration creates the create_full_access_group and create_cisa_analyst_group groups
# It is dependent on 0079 (which populates federal agencies)
# If permissions on the groups need changing, edit CISA_ANALYST_GROUP_PERMISSIONS
# in the user_group model then:
# [NOT RECOMMENDED]
# step 1: docker-compose exec app ./manage.py migrate --fake registrar 0035_contenttypes_permissions
# step 2: docker-compose exec app ./manage.py migrate registrar 0036_create_groups
# step 3: fake run the latest migration in the migrations list
# [RECOMMENDED]
# Alternatively:
# step 1: duplicate the migration that loads data
# step 2: docker-compose exec app ./manage.py migrate
from django.db import migrations
from registrar.models import UserGroup
from typing import Any
# For linting: RunPython expects a function reference,
# so let's give it one
def create_groups(apps, schema_editor) -> Any:
UserGroup.create_cisa_analyst_group(apps, schema_editor)
UserGroup.create_omb_analyst_group(apps, schema_editor)
UserGroup.create_full_access_group(apps, schema_editor)
class Migration(migrations.Migration):
dependencies = [
("registrar", "0140_alter_portfolioinvitation_additional_permissions_and_more"),
]
operations = [
migrations.RunPython(
create_groups,
reverse_code=migrations.RunPython.noop,
atomic=True,
),
]

View file

@ -40,6 +40,7 @@ class User(AbstractUser):
permissions = [
("analyst_access_permission", "Analyst Access Permission"),
("omb_analyst_access_permission", "OMB Analyst Access Permission"),
("full_access_permission", "Full Access Permission"),
]

View file

@ -141,6 +141,104 @@ class UserGroup(Group):
except Exception as e:
logger.error(f"Error creating analyst permissions group: {e}")
def create_omb_analyst_group(apps, schema_editor):
"""This method gets run from a data migration."""
# Hard to pass self to these methods as the calls from migrations
# are only expecting apps and schema_editor, so we'll just define
# apps, schema_editor in the local scope instead
OMB_ANALYST_GROUP_PERMISSIONS = [
{
"app_label": "registrar",
"model": "domainrequest",
"permissions": ["change_domainrequest"],
},
{
"app_label": "registrar",
"model": "domain",
"permissions": ["view_domain"],
},
{
"app_label": "registrar",
"model": "user",
"permissions": ["omb_analyst_access_permission"],
},
{
"app_label": "registrar",
"model": "domaininvitation",
"permissions": ["view_domaininvitation"],
},
{
"app_label": "registrar",
"model": "federalagency",
"permissions": ["change_federalagency", "delete_federalagency"],
},
{
"app_label": "registrar",
"model": "portfolio",
"permissions": ["change_portfolio", "delete_portfolio"],
},
{
"app_label": "registrar",
"model": "suborganization",
"permissions": ["change_suborganization", "delete_suborganization"],
},
{
"app_label": "registrar",
"model": "seniorofficial",
"permissions": ["change_seniorofficial", "delete_seniorofficial"],
},
]
# Avoid error: You can't execute queries until the end
# of the 'atomic' block.
# From django docs:
# https://docs.djangoproject.com/en/4.2/topics/migrations/#data-migrations
# We cant import the Person model directly as it may be a newer
# version than this migration expects. We use the historical version.
ContentType = apps.get_model("contenttypes", "ContentType")
Permission = apps.get_model("auth", "Permission")
UserGroup = apps.get_model("registrar", "UserGroup")
logger.info("Going to create the OMB Analyst Group")
try:
omb_analysts_group, _ = UserGroup.objects.get_or_create(
name="omb_analysts_group",
)
omb_analysts_group.permissions.clear()
for permission in OMB_ANALYST_GROUP_PERMISSIONS:
app_label = permission["app_label"]
model_name = permission["model"]
permissions = permission["permissions"]
# Retrieve the content type for the app and model
content_type = ContentType.objects.get(app_label=app_label, model=model_name)
# Retrieve the permissions based on their codenames
permissions = Permission.objects.filter(content_type=content_type, codename__in=permissions)
# Assign the permissions to the group
omb_analysts_group.permissions.add(*permissions)
# Convert the permissions QuerySet to a list of codenames
permission_list = list(permissions.values_list("codename", flat=True))
logger.debug(
app_label
+ " | "
+ model_name
+ " | "
+ ", ".join(permission_list)
+ " added to group "
+ omb_analysts_group.name
)
logger.debug("OMB Analyst permissions added to group " + omb_analysts_group.name)
except Exception as e:
logger.error(f"Error creating analyst permissions group: {e}")
def create_full_access_group(apps, schema_editor):
"""This method gets run from a data migration."""

View file

@ -6,7 +6,7 @@ from django.shortcuts import render
from django.contrib import admin
from django.db.models import Avg, F
from registrar.decorators import ALL, HAS_PORTFOLIO_MEMBERS_VIEW, IS_STAFF, grant_access
from registrar.decorators import ALL, HAS_PORTFOLIO_MEMBERS_VIEW, IS_CISA_ANALYST, IS_FULL_ACCESS, grant_access
from .. import models
import datetime
from django.utils import timezone
@ -16,7 +16,7 @@ import logging
logger = logging.getLogger(__name__)
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS)
class AnalyticsView(View):
def get(self, request):
thirty_days_ago = datetime.datetime.today() - datetime.timedelta(days=30)
@ -176,7 +176,7 @@ class AnalyticsView(View):
return render(request, "admin/analytics.html", context)
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS)
class ExportDataType(View):
def get(self, request, *args, **kwargs):
# match the CSV example with all the fields
@ -227,7 +227,7 @@ class ExportMembersPortfolio(View):
return response
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS)
class ExportDataFull(View):
def get(self, request, *args, **kwargs):
# Smaller export based on 1
@ -237,7 +237,7 @@ class ExportDataFull(View):
return response
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS)
class ExportDataFederal(View):
def get(self, request, *args, **kwargs):
# Federal only
@ -247,7 +247,7 @@ class ExportDataFederal(View):
return response
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS)
class ExportDomainRequestDataFull(View):
"""Generates a downloaded report containing all Domain Requests (except started)"""
@ -259,7 +259,7 @@ class ExportDomainRequestDataFull(View):
return response
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS)
class ExportDataDomainsGrowth(View):
def get(self, request, *args, **kwargs):
start_date = request.GET.get("start_date", "")
@ -272,7 +272,7 @@ class ExportDataDomainsGrowth(View):
return response
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS)
class ExportDataRequestsGrowth(View):
def get(self, request, *args, **kwargs):
start_date = request.GET.get("start_date", "")
@ -285,7 +285,7 @@ class ExportDataRequestsGrowth(View):
return response
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS)
class ExportDataManagedDomains(View):
def get(self, request, *args, **kwargs):
start_date = request.GET.get("start_date", "")
@ -297,7 +297,7 @@ class ExportDataManagedDomains(View):
return response
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS)
class ExportDataUnmanagedDomains(View):
def get(self, request, *args, **kwargs):
start_date = request.GET.get("start_date", "")

View file

@ -4,7 +4,7 @@ from django.db.models import ForeignKey, OneToOneField, ManyToManyField, ManyToO
from django.shortcuts import render, get_object_or_404, redirect
from django.views import View
from registrar.decorators import IS_STAFF, grant_access
from registrar.decorators import IS_CISA_ANALYST, IS_FULL_ACCESS, grant_access
from registrar.models.domain import Domain
from registrar.models.domain_request import DomainRequest
from registrar.models.user import User
@ -19,7 +19,7 @@ from registrar.utility.db_helpers import ignore_unique_violation
logger = logging.getLogger(__name__)
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_FULL_ACCESS)
class TransferUserView(View):
"""Transfer user methods that set up the transfer_user template and handle the forms on it."""

View file

@ -1,7 +1,7 @@
import logging
from django.http import JsonResponse
from django.forms.models import model_to_dict
from registrar.decorators import IS_STAFF, grant_access
from registrar.decorators import IS_CISA_ANALYST, IS_FULL_ACCESS, IS_OMB_ANALYST, grant_access
from registrar.models import FederalAgency, SeniorOfficial, DomainRequest
from registrar.utility.admin_helpers import get_action_needed_reason_default_email, get_rejection_reason_default_email
from registrar.models.portfolio import Portfolio
@ -10,16 +10,10 @@ from registrar.utility.constants import BranchChoices
logger = logging.getLogger(__name__)
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_OMB_ANALYST, IS_FULL_ACCESS)
def get_senior_official_from_federal_agency_json(request):
"""Returns federal_agency information as a JSON"""
# This API is only accessible to admins and analysts
superuser_perm = request.user.has_perm("registrar.full_access_permission")
analyst_perm = request.user.has_perm("registrar.analyst_access_permission")
if not request.user.is_authenticated or not any([analyst_perm, superuser_perm]):
return JsonResponse({"error": "You do not have access to this resource"}, status=403)
agency_name = request.GET.get("agency_name")
agency = FederalAgency.objects.filter(agency=agency_name).first()
senior_official = SeniorOfficial.objects.filter(federal_agency=agency).first()
@ -37,16 +31,10 @@ def get_senior_official_from_federal_agency_json(request):
return JsonResponse({"error": "Senior Official not found"}, status=404)
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_OMB_ANALYST, IS_FULL_ACCESS)
def get_portfolio_json(request):
"""Returns portfolio information as a JSON"""
# This API is only accessible to admins and analysts
superuser_perm = request.user.has_perm("registrar.full_access_permission")
analyst_perm = request.user.has_perm("registrar.analyst_access_permission")
if not request.user.is_authenticated or not any([analyst_perm, superuser_perm]):
return JsonResponse({"error": "You do not have access to this resource"}, status=403)
portfolio_id = request.GET.get("id")
try:
portfolio = Portfolio.objects.get(id=portfolio_id)
@ -93,16 +81,10 @@ def get_portfolio_json(request):
return JsonResponse(portfolio_dict)
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_OMB_ANALYST, IS_FULL_ACCESS)
def get_suborganization_list_json(request):
"""Returns suborganization list information for a portfolio as a JSON"""
# This API is only accessible to admins and analysts
superuser_perm = request.user.has_perm("registrar.full_access_permission")
analyst_perm = request.user.has_perm("registrar.analyst_access_permission")
if not request.user.is_authenticated or not any([analyst_perm, superuser_perm]):
return JsonResponse({"error": "You do not have access to this resource"}, status=403)
portfolio_id = request.GET.get("portfolio_id")
try:
portfolio = Portfolio.objects.get(id=portfolio_id)
@ -115,17 +97,11 @@ def get_suborganization_list_json(request):
return JsonResponse({"results": results, "pagination": {"more": False}})
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_OMB_ANALYST, IS_FULL_ACCESS)
def get_federal_and_portfolio_types_from_federal_agency_json(request):
"""Returns specific portfolio information as a JSON. Request must have
both agency_name and organization_type."""
# This API is only accessible to admins and analysts
superuser_perm = request.user.has_perm("registrar.full_access_permission")
analyst_perm = request.user.has_perm("registrar.analyst_access_permission")
if not request.user.is_authenticated or not any([analyst_perm, superuser_perm]):
return JsonResponse({"error": "You do not have access to this resource"}, status=403)
federal_type = None
portfolio_type = None
@ -143,16 +119,10 @@ def get_federal_and_portfolio_types_from_federal_agency_json(request):
return JsonResponse(response_data)
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_OMB_ANALYST, IS_FULL_ACCESS)
def get_action_needed_email_for_user_json(request):
"""Returns a default action needed email for a given user"""
# This API is only accessible to admins and analysts
superuser_perm = request.user.has_perm("registrar.full_access_permission")
analyst_perm = request.user.has_perm("registrar.analyst_access_permission")
if not request.user.is_authenticated or not any([analyst_perm, superuser_perm]):
return JsonResponse({"error": "You do not have access to this resource"}, status=403)
reason = request.GET.get("reason")
domain_request_id = request.GET.get("domain_request_id")
if not reason:
@ -167,16 +137,10 @@ def get_action_needed_email_for_user_json(request):
return JsonResponse({"email": email}, status=200)
@grant_access(IS_STAFF)
@grant_access(IS_CISA_ANALYST, IS_OMB_ANALYST, IS_FULL_ACCESS)
def get_rejection_email_for_user_json(request):
"""Returns a default rejection email for a given user"""
# This API is only accessible to admins and analysts
superuser_perm = request.user.has_perm("registrar.full_access_permission")
analyst_perm = request.user.has_perm("registrar.analyst_access_permission")
if not request.user.is_authenticated or not any([analyst_perm, superuser_perm]):
return JsonResponse({"error": "You do not have access to this resource"}, status=403)
reason = request.GET.get("reason")
domain_request_id = request.GET.get("domain_request_id")
if not reason: