Merge branch 'main' into hotgov/2355-rejection-reason-emails

This commit is contained in:
zandercymatics 2024-10-07 09:17:12 -06:00
commit 5fc8d0d7e1
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
69 changed files with 2626 additions and 759 deletions

View file

@ -14,6 +14,7 @@ on:
options:
- ab
- backup
- el
- cb
- dk
- es

View file

@ -30,6 +30,7 @@ jobs:
|| startsWith(github.head_ref, 'ag/')
|| startsWith(github.head_ref, 'ms/')
|| startsWith(github.head_ref, 'ad/')
|| startsWith(github.head_ref, 'el/')
outputs:
environment: ${{ steps.var.outputs.environment}}
runs-on: "ubuntu-latest"

View file

@ -16,6 +16,7 @@ on:
- stable
- staging
- development
- el
- ad
- ms
- ag

View file

@ -16,6 +16,7 @@ on:
options:
- staging
- development
- el
- ad
- ms
- ag

View file

@ -173,7 +173,7 @@ You can change the logging verbosity, if needed. Do a web search for "django log
## Mock data
[load.py](../../src/registrar/management/commands/load.py) called from docker-compose (locally) and reset-db.yml (upper) loads the fixtures from [fixtures_user.py](../../src/registrar/fixtures_users.py) and [fixtures_domain_requests.py](../../src/registrar/fixtures_domain_requests.py), giving you some test data to play with while developing.
[load.py](../../src/registrar/management/commands/load.py) called from docker-compose (locally) and reset-db.yml (upper) loads the fixtures from [fixtures_user.py](../../src/registrar/fixtures/fixtures_users.py) and the rest of the data-loading fixtures in that fixtures folder, giving you some test data to play with while developing.
See the [database-access README](./database-access.md) for information on how to pull data to update these fixtures.

View file

@ -754,7 +754,7 @@ Example: `cf ssh getgov-za`
| 1 | **emailTo** | Specifies where the email will be emailed. Defaults to help@get.gov on production. |
## Populate federal agency initials and FCEB
This script adds to the "is_fceb" and "initials" fields on the FederalAgency model. This script expects a CSV of federal CIOs to pull from, which can be sourced from [here](https://docs.google.com/spreadsheets/d/14oXHFpKyUXS5_mDWARPusghGdHCrP67jCleOknaSx38/edit?gid=479328070#gid=479328070).
This script adds to the "is_fceb" and "acronym" fields on the FederalAgency model. This script expects a CSV of federal CIOs to pull from, which can be sourced from [here](https://docs.google.com/spreadsheets/d/14oXHFpKyUXS5_mDWARPusghGdHCrP67jCleOknaSx38/edit?gid=479328070#gid=479328070).
### Running on sandboxes

View file

@ -9,17 +9,16 @@ Simple scripts are provided as detailed below.
### Export
To export from the source environment, run the following command from src directory:
manage.py export_tables
Connect to the source sandbox and run the command:
cf ssh {source-app}
/tmp/lifecycle/shell
./manage.py export_tables
`cf ssh {source-app}`
`/tmp/lifecycle/shell`
`./manage.py export_tables`
example exporting from getgov-stable:
cf ssh getgov-stable
/tmp/lifecycle/shell
./manage.py export_tables
`cf ssh getgov-stable`
`/tmp/lifecycle/shell`
`./manage.py export_tables`
This exports a file, exported_tables.zip, to the tmp directory
@ -42,14 +41,16 @@ After exporting the file from the target environment, scp the exported_tables.zi
file from the target environment to local. Run the below commands from local.
Get passcode by running:
cf ssh-code
`cf ssh-code`
scp file from source app to local file:
scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app {source-app} --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip {local_file_path}
`scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app {source-app} --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip {local_file_path}`
when prompted, supply the passcode retrieved in the 'cf ssh-code' command
example copying from stable to local cwd:
scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-stable --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip .
`scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-stable --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip .`
`scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-stable --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip .`
### Import
@ -63,14 +64,14 @@ that there are no database conflicts on import.
In order to delete all rows from the appropriate tables, run the following
command:
cf ssh {target-app}
/tmp/lifecycle/shell
./manage.py clean_tables
`cf ssh {target-app}`
`/tmp/lifecycle/shell`
`./manage.py clean_tables`
example cleaning getgov-backup:
cf ssh getgov-backup
/tmp/lifecycle/backup
./manage.py clean_tables
`cf ssh getgov-backup`
`/tmp/lifecycle/shell`
`./manage.py clean_tables`
For reference, this deletes all rows from the following tables:
@ -96,28 +97,30 @@ with --skipEppSave option set to False. If you set to False, it will attempt to
records to the registry on load. If this is unset, or set to True, it will load the database and not
attempt to update the registry on load.
Please note that there is currently a bug (missing batch importing, see #2862) so this may not work
smoothly right now currently.
To scp the exported_tables.zip file from local to the sandbox, run the following:
Get passcode by running:
cf ssh-code
`cf ssh-code`
scp file from local to target app:
scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app {target-app} --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 {local_file_path} ssh.fr.cloud.gov:app/tmp/exported_tables.zip
`scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app {target-app} --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 {local_file_path} ssh.fr.cloud.gov:app/tmp/exported_tables.zip`
when prompted, supply the passcode retrieved in the 'cf ssh-code' command
example copy of local file in tmp to getgov-backup:
scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-backup --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 tmp/exported_tables.zip ssh.fr.cloud.gov:app/tmp/exported_tables.zip
`scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-backup --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 exported_tables.zip ssh.fr.cloud.gov:app/tmp/exported_tables.zip`
Then connect to a shell in the target environment, and run the following import command:
cf ssh {target-app}
/tmp/lifecycle/shell
./manage.py import_tables
`cf ssh {target-app}`
`/tmp/lifecycle/shell`
`./manage.py import_tables`
example cleaning getgov-backup:
cf ssh getgov-backup
/tmp/lifecycle/backup
./manage.py import_tables --no-skipEppSave
`cf ssh getgov-backup`
`/tmp/lifecycle/shell`
`./manage.py import_tables --no-skipEppSave`
For reference, this imports tables in the following order:

View file

@ -0,0 +1,32 @@
---
applications:
- name: getgov-el
buildpacks:
- python_buildpack
path: ../../src
instances: 1
memory: 512M
stack: cflinuxfs4
timeout: 180
command: ./run.sh
health-check-type: http
health-check-http-endpoint: /health
health-check-invocation-timeout: 40
env:
# Send stdout and stderr straight to the terminal without buffering
PYTHONUNBUFFERED: yup
# Tell Django where to find its configuration
DJANGO_SETTINGS_MODULE: registrar.config.settings
# Tell Django where it is being hosted
DJANGO_BASE_URL: https://getgov-el.app.cloud.gov
# Tell Django how much stuff to log
DJANGO_LOG_LEVEL: INFO
# default public site location
GETGOV_PUBLIC_SITE_URL: https://get.gov
# Flag to disable/enable features in prod environments
IS_PRODUCTION: False
routes:
- route: getgov-el.app.cloud.gov
services:
- getgov-credentials
- getgov-el-database

View file

@ -9,9 +9,7 @@ from registrar.utility.admin_helpers import get_action_needed_reason_default_ema
from django.conf import settings
from django.shortcuts import redirect
from django_fsm import get_available_FIELD_transitions, FSMField
from registrar.models.domain_information import DomainInformation
from registrar.models.domain_invitation import DomainInvitation
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models import DomainInformation, Portfolio, UserPortfolioPermission, DomainInvitation
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from waffle.decorators import flag_is_active
from django.contrib import admin, messages
@ -23,6 +21,11 @@ from epplibwrapper.errors import ErrorCode, RegistryError
from registrar.models.user_domain_role import UserDomainRole
from waffle.admin import FlagAdmin
from waffle.models import Sample, Switch
from registrar.utility.admin_helpers import (
get_all_action_needed_reason_emails,
get_action_needed_reason_default_email,
get_field_links_as_list,
)
from registrar.models import Contact, Domain, DomainRequest, DraftDomain, User, Website, SeniorOfficial
from registrar.utility.constants import BranchChoices
from registrar.utility.errors import FSMDomainRequestError, FSMErrorCodes
@ -758,9 +761,10 @@ class MyUserAdmin(BaseUserAdmin, ImportExportModelAdmin):
},
),
("Important dates", {"fields": ("last_login", "date_joined")}),
("Associated portfolios", {"fields": ("portfolios",)}),
)
readonly_fields = ("verification_type",)
readonly_fields = ("verification_type", "portfolios")
analyst_fieldsets = (
(
@ -783,6 +787,7 @@ class MyUserAdmin(BaseUserAdmin, ImportExportModelAdmin):
},
),
("Important dates", {"fields": ("last_login", "date_joined")}),
("Associated portfolios", {"fields": ("portfolios",)}),
)
# TODO: delete after we merge organization feature
@ -862,6 +867,14 @@ class MyUserAdmin(BaseUserAdmin, ImportExportModelAdmin):
ordering = ["first_name", "last_name", "email"]
search_help_text = "Search by first name, last name, or email."
def portfolios(self, obj: models.User):
"""Returns a list of links for each related suborg"""
portfolio_ids = obj.get_portfolios().values_list("portfolio", flat=True)
queryset = models.Portfolio.objects.filter(id__in=portfolio_ids)
return get_field_links_as_list(queryset, "portfolio", msg_for_none="No portfolios.")
portfolios.short_description = "Portfolios" # type: ignore
def get_search_results(self, request, queryset, search_term):
"""
Override for get_search_results. This affects any upstream model using autocomplete_fields,
@ -1258,9 +1271,18 @@ class UserPortfolioPermissionAdmin(ListHeaderAdmin):
list_display = [
"user",
"portfolio",
"get_roles",
]
autocomplete_fields = ["user", "portfolio"]
search_fields = ["user__first_name", "user__last_name", "user__email", "portfolio__organization_name"]
search_help_text = "Search by first name, last name, email, or portfolio."
def get_roles(self, obj):
readable_roles = obj.get_readable_roles()
return ", ".join(readable_roles)
get_roles.short_description = "Roles" # type: ignore
class UserDomainRoleAdmin(ListHeaderAdmin, ImportExportModelAdmin):
@ -1544,33 +1566,6 @@ class DomainInformationAdmin(ListHeaderAdmin, ImportExportModelAdmin):
change_form_template = "django/admin/domain_information_change_form.html"
superuser_only_fields = [
"portfolio",
"sub_organization",
]
# DEVELOPER's NOTE:
# Normally, to exclude a field from an Admin form, we could simply utilize
# Django's "exclude" feature. However, it causes a "missing key" error if we
# go that route for this particular form. The error gets thrown by our
# custom fieldset.html code and is due to the fact that "exclude" removes
# fields from base_fields but not fieldsets. Rather than reworking our
# custom frontend, it seems more straightforward (and easier to read) to simply
# modify the fieldsets list so that it excludes any fields we want to remove
# based on permissions (eg. superuser_only_fields) or other conditions.
def get_fieldsets(self, request, obj=None):
fieldsets = self.fieldsets
# Create a modified version of fieldsets to exclude certain fields
if not request.user.has_perm("registrar.full_access_permission"):
modified_fieldsets = []
for name, data in fieldsets:
fields = data.get("fields", [])
fields = [field for field in fields if field not in DomainInformationAdmin.superuser_only_fields]
modified_fieldsets.append((name, {**data, "fields": fields}))
return modified_fieldsets
return fieldsets
def get_readonly_fields(self, request, obj=None):
"""Set the read-only state on form elements.
We have 1 conditions that determine which fields are read-only:
@ -1867,33 +1862,6 @@ class DomainRequestAdmin(ListHeaderAdmin, ImportExportModelAdmin):
]
filter_horizontal = ("current_websites", "alternative_domains", "other_contacts")
superuser_only_fields = [
"portfolio",
"sub_organization",
]
# DEVELOPER's NOTE:
# Normally, to exclude a field from an Admin form, we could simply utilize
# Django's "exclude" feature. However, it causes a "missing key" error if we
# go that route for this particular form. The error gets thrown by our
# custom fieldset.html code and is due to the fact that "exclude" removes
# fields from base_fields but not fieldsets. Rather than reworking our
# custom frontend, it seems more straightforward (and easier to read) to simply
# modify the fieldsets list so that it excludes any fields we want to remove
# based on permissions (eg. superuser_only_fields) or other conditions.
def get_fieldsets(self, request, obj=None):
fieldsets = super().get_fieldsets(request, obj)
# Create a modified version of fieldsets to exclude certain fields
if not request.user.has_perm("registrar.full_access_permission"):
modified_fieldsets = []
for name, data in fieldsets:
fields = data.get("fields", [])
fields = tuple(field for field in fields if field not in self.superuser_only_fields)
modified_fieldsets.append((name, {**data, "fields": fields}))
return modified_fieldsets
return fieldsets
# Table ordering
# NOTE: This impacts the select2 dropdowns (combobox)
# Currentl, there's only one for requests on DomainInfo
@ -3003,39 +2971,59 @@ class VerifiedByStaffAdmin(ListHeaderAdmin):
class PortfolioAdmin(ListHeaderAdmin):
class Meta:
"""Contains meta information about this class"""
model = models.Portfolio
fields = "__all__"
_meta = Meta()
change_form_template = "django/admin/portfolio_change_form.html"
fieldsets = [
# created_on is the created_at field, and portfolio_type is f"{organization_type} - {federal_type}"
(None, {"fields": ["portfolio_type", "organization_name", "creator", "created_on", "notes"]}),
("Portfolio members", {"fields": ["display_admins", "display_members"]}),
("Portfolio domains", {"fields": ["domains", "domain_requests"]}),
# created_on is the created_at field
(None, {"fields": ["creator", "created_on", "notes"]}),
("Type of organization", {"fields": ["organization_type", "federal_type"]}),
(
"Organization name and mailing address",
{
"fields": [
"organization_name",
"federal_agency",
]
},
),
(
"Show details",
{
"classes": ["collapse--dgfieldset"],
"description": "Extends organization name and mailing address",
"fields": [
"state_territory",
"address_line1",
"address_line2",
"city",
"zipcode",
"urbanization",
]
],
},
),
("Portfolio members", {"fields": ["display_admins", "display_members"]}),
("Domains and requests", {"fields": ["domains", "domain_requests"]}),
("Suborganizations", {"fields": ["suborganizations"]}),
("Senior official", {"fields": ["senior_official"]}),
]
# This is the fieldset display when adding a new model
add_fieldsets = [
(None, {"fields": ["organization_name", "creator", "notes"]}),
(None, {"fields": ["creator", "notes"]}),
("Type of organization", {"fields": ["organization_type"]}),
(
"Organization name and mailing address",
{
"fields": [
"organization_name",
"federal_agency",
"state_territory",
"address_line1",
@ -3049,7 +3037,7 @@ class PortfolioAdmin(ListHeaderAdmin):
("Senior official", {"fields": ["senior_official"]}),
]
list_display = ("organization_name", "federal_agency", "creator")
list_display = ("organization_name", "organization_type", "federal_type", "creator")
search_fields = ["organization_name"]
search_help_text = "Search by organization name."
readonly_fields = [
@ -3062,23 +3050,35 @@ class PortfolioAdmin(ListHeaderAdmin):
"domains",
"domain_requests",
"suborganizations",
"portfolio_type",
"display_admins",
"display_members",
"creator",
# As of now this means that only federal agency can update this, but this will change.
"senior_official",
]
analyst_readonly_fields = [
"organization_name",
]
def get_admin_users(self, obj):
# Filter UserPortfolioPermission objects related to the portfolio
admin_permissions = UserPortfolioPermission.objects.filter(
portfolio=obj, roles__contains=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
)
admin_permissions = self.get_user_portfolio_permission_admins(obj)
# Get the user objects associated with these permissions
admin_users = User.objects.filter(portfolio_permissions__in=admin_permissions)
return admin_users
def get_user_portfolio_permission_admins(self, obj):
"""Returns each admin on UserPortfolioPermission for a given portfolio."""
if obj:
return obj.portfolio_users.filter(
portfolio=obj, roles__contains=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
)
else:
return []
def get_non_admin_users(self, obj):
# Filter UserPortfolioPermission objects related to the portfolio that do NOT have the "Admin" role
non_admin_permissions = UserPortfolioPermission.objects.filter(portfolio=obj).exclude(
@ -3090,82 +3090,12 @@ class PortfolioAdmin(ListHeaderAdmin):
return non_admin_users
def display_admins(self, obj):
"""Get joined users who are Admin, unpack and return an HTML block.
'DJA readonly can't handle querysets, so we need to unpack and return html here.
Alternatively, we could return querysets in context but that would limit where this
data would display in a custom change form without extensive template customization.
Will be used in the field_readonly block"""
admins = self.get_admin_users(obj)
if not admins:
return format_html("<p>No admins found.</p>")
admin_details = ""
for portfolio_admin in admins:
change_url = reverse("admin:registrar_user_change", args=[portfolio_admin.pk])
admin_details += "<address class='margin-bottom-2 dja-address-contact-list'>"
admin_details += f'<a href="{change_url}">{escape(portfolio_admin)}</a><br>'
admin_details += f"{escape(portfolio_admin.title)}<br>"
admin_details += f"{escape(portfolio_admin.email)}"
admin_details += "<div class='admin-icon-group admin-icon-group__clipboard-link'>"
admin_details += f"<input aria-hidden='true' class='display-none' value='{escape(portfolio_admin.email)}'>"
admin_details += (
"<button class='usa-button usa-button--unstyled padding-right-1 usa-button--icon padding-left-05"
+ "button--clipboard copy-to-clipboard text-no-underline' type='button'>"
)
admin_details += "<svg class='usa-icon'>"
admin_details += "<use aria-hidden='true' xlink:href='/public/img/sprite.svg#content_copy'></use>"
admin_details += "</svg>"
admin_details += "Copy"
admin_details += "</button>"
admin_details += "</div><br>"
admin_details += f"{escape(portfolio_admin.phone)}"
admin_details += "</address>"
return format_html(admin_details)
display_admins.short_description = "Administrators" # type: ignore
def display_members(self, obj):
"""Get joined users who have roles/perms that are not Admin, unpack and return an HTML block.
DJA readonly can't handle querysets, so we need to unpack and return html here.
Alternatively, we could return querysets in context but that would limit where this
data would display in a custom change form without extensive template customization.
Will be used in the after_help_text block."""
members = self.get_non_admin_users(obj)
if not members:
return ""
member_details = (
"<table><thead><tr><th>Name</th><th>Title</th><th>Email</th>"
+ "<th>Phone</th><th>Roles</th></tr></thead><tbody>"
)
for member in members:
full_name = member.get_formatted_name()
member_details += "<tr>"
member_details += f"<td>{escape(full_name)}</td>"
member_details += f"<td>{escape(member.title)}</td>"
member_details += f"<td>{escape(member.email)}</td>"
member_details += f"<td>{escape(member.phone)}</td>"
member_details += "<td>"
for role in member.portfolio_role_summary(obj):
member_details += f"<span class='usa-tag'>{escape(role)}</span> "
member_details += "</td></tr>"
member_details += "</tbody></table>"
return format_html(member_details)
display_members.short_description = "Members" # type: ignore
def display_members_summary(self, obj):
"""Will be passed as context and used in the field_readonly block."""
members = self.get_non_admin_users(obj)
if not members:
return {}
return self.get_field_links_as_list(members, "user", separator=", ")
def get_user_portfolio_permission_non_admins(self, obj):
"""Returns each admin on UserPortfolioPermission for a given portfolio."""
if obj:
return obj.portfolio_users.exclude(roles__contains=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN])
else:
return []
def federal_type(self, obj: models.Portfolio):
"""Returns the federal_type field"""
@ -3180,16 +3110,10 @@ class PortfolioAdmin(ListHeaderAdmin):
created_on.short_description = "Created on" # type: ignore
def portfolio_type(self, obj: models.Portfolio):
"""Returns the portfolio type, or "-" if the result is empty"""
return obj.portfolio_type if obj.portfolio_type else "-"
portfolio_type.short_description = "Portfolio type" # type: ignore
def suborganizations(self, obj: models.Portfolio):
"""Returns a list of links for each related suborg"""
queryset = obj.get_suborganizations()
return self.get_field_links_as_list(queryset, "suborganization")
return get_field_links_as_list(queryset, "suborganization")
suborganizations.short_description = "Suborganizations" # type: ignore
@ -3218,6 +3142,28 @@ class PortfolioAdmin(ListHeaderAdmin):
domain_requests.short_description = "Domain requests" # type: ignore
def display_admins(self, obj):
"""Returns the number of administrators for this portfolio"""
admin_count = len(self.get_user_portfolio_permission_admins(obj))
if admin_count > 0:
url = reverse("admin:registrar_userportfoliopermission_changelist") + f"?portfolio={obj.id}"
# Create a clickable link with the count
return format_html(f'<a href="{url}">{admin_count} administrators</a>')
return "No administrators found."
display_admins.short_description = "Administrators" # type: ignore
def display_members(self, obj):
"""Returns the number of members for this portfolio"""
member_count = len(self.get_user_portfolio_permission_non_admins(obj))
if member_count > 0:
url = reverse("admin:registrar_userportfoliopermission_changelist") + f"?portfolio={obj.id}"
# Create a clickable link with the count
return format_html(f'<a href="{url}">{member_count} members</a>')
return "No additional members found."
display_members.short_description = "Members" # type: ignore
# Creates select2 fields (with search bars)
autocomplete_fields = [
"creator",
@ -3225,59 +3171,6 @@ class PortfolioAdmin(ListHeaderAdmin):
"senior_official",
]
def get_field_links_as_list(
self, queryset, model_name, attribute_name=None, link_info_attribute=None, separator=None
):
"""
Generate HTML links for items in a queryset, using a specified attribute for link text.
Args:
queryset: The queryset of items to generate links for.
model_name: The model name used to construct the admin change URL.
attribute_name: The attribute or method name to use for link text. If None, the item itself is used.
link_info_attribute: Appends f"({value_of_attribute})" to the end of the link.
separator: The separator to use between links in the resulting HTML.
If none, an unordered list is returned.
Returns:
A formatted HTML string with links to the admin change pages for each item.
"""
links = []
for item in queryset:
# This allows you to pass in attribute_name="get_full_name" for instance.
if attribute_name:
item_display_value = self.value_of_attribute(item, attribute_name)
else:
item_display_value = item
if item_display_value:
change_url = reverse(f"admin:registrar_{model_name}_change", args=[item.pk])
link = f'<a href="{change_url}">{escape(item_display_value)}</a>'
if link_info_attribute:
link += f" ({self.value_of_attribute(item, link_info_attribute)})"
if separator:
links.append(link)
else:
links.append(f"<li>{link}</li>")
# If no separator is specified, just return an unordered list.
if separator:
return format_html(separator.join(links)) if links else "-"
else:
links = "".join(links)
return format_html(f'<ul class="add-list-reset">{links}</ul>') if links else "-"
def value_of_attribute(self, obj, attribute_name: str):
"""Returns the value of getattr if the attribute isn't callable.
If it is, execute the underlying function and return that result instead."""
value = getattr(obj, attribute_name)
if callable(value):
value = value()
return value
def get_fieldsets(self, request, obj=None):
"""Override of the default get_fieldsets definition to add an add_fieldsets view"""
# This is the add view if no obj exists
@ -3310,10 +3203,15 @@ class PortfolioAdmin(ListHeaderAdmin):
def change_view(self, request, object_id, form_url="", extra_context=None):
"""Add related suborganizations and domain groups.
Add the summary for the portfolio members field (list of members that link to change_forms)."""
obj = self.get_object(request, object_id)
obj: Portfolio = self.get_object(request, object_id)
extra_context = extra_context or {}
extra_context["skip_additional_contact_info"] = True
extra_context["display_members_summary"] = self.display_members_summary(obj)
if obj:
extra_context["members"] = self.get_user_portfolio_permission_non_admins(obj)
extra_context["admins"] = self.get_user_portfolio_permission_admins(obj)
extra_context["domains"] = obj.get_domains(order_by=["domain__name"])
extra_context["domain_requests"] = obj.get_domain_requests(order_by=["requested_domain__name"])
return super().change_view(request, object_id, form_url, extra_context)
def save_model(self, request, obj, form, change):
@ -3330,6 +3228,14 @@ class PortfolioAdmin(ListHeaderAdmin):
is_federal = obj.organization_type == DomainRequest.OrganizationChoices.FEDERAL
if is_federal and obj.organization_name is None:
obj.organization_name = obj.federal_agency.agency
# Remove this line when senior_official is no longer readonly in /admin.
if obj.federal_agency:
if obj.federal_agency.so_federal_agency.exists():
obj.senior_official = obj.federal_agency.so_federal_agency.first()
else:
obj.senior_official = None
super().save_model(request, obj, form, change)
@ -3344,7 +3250,7 @@ class FederalAgencyResource(resources.ModelResource):
class FederalAgencyAdmin(ListHeaderAdmin, ImportExportModelAdmin):
list_display = ["agency"]
search_fields = ["agency"]
search_help_text = "Search by agency name."
search_help_text = "Search by federal agency."
ordering = ["agency"]
resource_classes = [FederalAgencyResource]
@ -3401,6 +3307,7 @@ class SuborganizationAdmin(ListHeaderAdmin, ImportExportModelAdmin):
"portfolio",
]
search_fields = ["name"]
search_help_text = "Search by suborganization."
change_form_template = "django/admin/suborg_change_form.html"

View file

@ -944,10 +944,15 @@ document.addEventListener('DOMContentLoaded', function() {
// $ symbolically denotes that this is using jQuery
let $federalAgency = django.jQuery("#id_federal_agency");
let organizationType = document.getElementById("id_organization_type");
if ($federalAgency && organizationType) {
let readonlyOrganizationType = document.querySelector(".field-organization_type .readonly");
let organizationNameContainer = document.querySelector(".field-organization_name");
let federalType = document.querySelector(".field-federal_type");
if ($federalAgency && (organizationType || readonlyOrganizationType)) {
// Attach the change event listener
$federalAgency.on("change", function() {
handleFederalAgencyChange($federalAgency, organizationType);
handleFederalAgencyChange($federalAgency, organizationType, readonlyOrganizationType, organizationNameContainer, federalType);
});
}
@ -963,9 +968,33 @@ document.addEventListener('DOMContentLoaded', function() {
handleStateTerritoryChange(stateTerritory, urbanizationField);
});
}
// Handle hiding the organization name field when the organization_type is federal.
// Run this first one page load, then secondly on a change event.
handleOrganizationTypeChange(organizationType, organizationNameContainer, federalType);
organizationType.addEventListener("change", function() {
handleOrganizationTypeChange(organizationType, organizationNameContainer, federalType);
});
});
function handleFederalAgencyChange(federalAgency, organizationType) {
function handleOrganizationTypeChange(organizationType, organizationNameContainer, federalType) {
if (organizationType && organizationNameContainer) {
let selectedValue = organizationType.value;
if (selectedValue === "federal") {
hideElement(organizationNameContainer);
if (federalType) {
showElement(federalType);
}
} else {
showElement(organizationNameContainer);
if (federalType) {
hideElement(federalType);
}
}
}
}
function handleFederalAgencyChange(federalAgency, organizationType, readonlyOrganizationType, organizationNameContainer, federalType) {
// Don't do anything on page load
if (isInitialPageLoad) {
isInitialPageLoad = false;
@ -980,27 +1009,31 @@ document.addEventListener('DOMContentLoaded', function() {
return;
}
let organizationTypeValue = organizationType ? organizationType.value : readonlyOrganizationType.innerText.toLowerCase();
if (selectedText !== "Non-Federal Agency") {
if (organizationType.value !== "federal") {
if (organizationTypeValue !== "federal") {
if (organizationType){
organizationType.value = "federal";
}else {
readonlyOrganizationType.innerText = "Federal"
}
}
}else {
if (organizationType.value === "federal") {
if (organizationTypeValue === "federal") {
if (organizationType){
organizationType.value = "";
}else {
readonlyOrganizationType.innerText = "-"
}
}
}
// Get the associated senior official with this federal agency
let $seniorOfficial = django.jQuery("#id_senior_official");
if (!$seniorOfficial) {
console.log("Could not find the senior official field");
return;
}
handleOrganizationTypeChange(organizationType, organizationNameContainer, federalType);
// Determine if any changes are necessary to the display of portfolio type or federal type
// based on changes to the Federal Agency
let federalPortfolioApi = document.getElementById("federal_and_portfolio_types_from_agency_json_url").value;
fetch(`${federalPortfolioApi}?organization_type=${organizationType.value}&agency_name=${selectedText}`)
fetch(`${federalPortfolioApi}?&agency_name=${selectedText}`)
.then(response => {
const statusCode = response.status;
return response.json().then(data => ({ statusCode, data }));
@ -1011,7 +1044,6 @@ document.addEventListener('DOMContentLoaded', function() {
return;
}
updateReadOnly(data.federal_type, '.field-federal_type');
updateReadOnly(data.portfolio_type, '.field-portfolio_type');
})
.catch(error => console.error("Error fetching federal and portfolio types: ", error));
@ -1019,6 +1051,9 @@ document.addEventListener('DOMContentLoaded', function() {
// If we can update the contact information, it'll be shown again.
hideElement(contactList.parentElement);
let seniorOfficialAddUrl = document.getElementById("senior-official-add-url").value;
let $seniorOfficial = django.jQuery("#id_senior_official");
let readonlySeniorOfficial = document.querySelector(".field-senior_official .readonly");
let seniorOfficialApi = document.getElementById("senior_official_from_agency_json_url").value;
fetch(`${seniorOfficialApi}?agency_name=${selectedText}`)
.then(response => {
@ -1029,7 +1064,12 @@ document.addEventListener('DOMContentLoaded', function() {
if (data.error) {
// Clear the field if the SO doesn't exist.
if (statusCode === 404) {
if ($seniorOfficial && $seniorOfficial.length > 0) {
$seniorOfficial.val("").trigger("change");
}else {
// Show the "create one now" text if this field is none in readonly mode.
readonlySeniorOfficial.innerHTML = `<a href="${seniorOfficialAddUrl}">No senior official found. Create one now.</a>`;
}
console.warn("Record not found: " + data.error);
}else {
console.error("Error in AJAX call: " + data.error);
@ -1041,27 +1081,40 @@ document.addEventListener('DOMContentLoaded', function() {
updateContactInfo(data);
showElement(contactList.parentElement);
// Get the associated senior official with this federal agency
let seniorOfficialId = data.id;
let seniorOfficialName = [data.first_name, data.last_name].join(" ");
if ($seniorOfficial && $seniorOfficial.length > 0) {
// If the senior official is a dropdown field, edit that
updateSeniorOfficialDropdown($seniorOfficial, seniorOfficialId, seniorOfficialName);
}else {
if (readonlySeniorOfficial) {
let seniorOfficialLink = `<a href=/admin/registrar/seniorofficial/${seniorOfficialId}/change/>${seniorOfficialName}</a>`
readonlySeniorOfficial.innerHTML = seniorOfficialName ? seniorOfficialLink : "-";
}
}
})
.catch(error => console.error("Error fetching senior official: ", error));
}
function updateSeniorOfficialDropdown(dropdown, seniorOfficialId, seniorOfficialName) {
if (!seniorOfficialId || !seniorOfficialName || !seniorOfficialName.trim()){
// Clear the field if the SO doesn't exist
$seniorOfficial.val("").trigger("change");
dropdown.val("").trigger("change");
return;
}
// Add the senior official to the dropdown.
// This format supports select2 - if we decide to convert this field in the future.
if ($seniorOfficial.find(`option[value='${seniorOfficialId}']`).length) {
if (dropdown.find(`option[value='${seniorOfficialId}']`).length) {
// Select the value that is associated with the current Senior Official.
$seniorOfficial.val(seniorOfficialId).trigger("change");
dropdown.val(seniorOfficialId).trigger("change");
} else {
// Create a DOM Option that matches the desired Senior Official. Then append it and select it.
let userOption = new Option(seniorOfficialName, seniorOfficialId, true, true);
$seniorOfficial.append(userOption).trigger("change");
dropdown.append(userOption).trigger("change");
}
})
.catch(error => console.error("Error fetching senior official: ", error));
}
function handleStateTerritoryChange(stateTerritory, urbanizationField) {

View file

@ -1853,6 +1853,125 @@ class DomainRequestsTable extends LoadTableBase {
}
}
class MembersTable extends LoadTableBase {
constructor() {
super('.members__table', '.members__table-wrapper', '#members__search-field', '#members__search-field-submit', '.members__reset-search', '.members__reset-filters', '.members__no-data', '.members__no-search-results');
}
/**
* Loads rows in the members list, as well as updates pagination around the members list
* based on the supplied attributes.
* @param {*} page - the page number of the results (starts with 1)
* @param {*} sortBy - the sort column option
* @param {*} order - the sort order {asc, desc}
* @param {*} scroll - control for the scrollToElement functionality
* @param {*} status - control for the status filter
* @param {*} searchTerm - the search term
* @param {*} portfolio - the portfolio id
*/
loadTable(page, sortBy = this.currentSortBy, order = this.currentOrder, scroll = this.scrollToTable, status = this.currentStatus, searchTerm =this.currentSearchTerm, portfolio = this.portfolioValue) {
// --------- SEARCH
let searchParams = new URLSearchParams(
{
"page": page,
"sort_by": sortBy,
"order": order,
"status": status,
"search_term": searchTerm
}
);
if (portfolio)
searchParams.append("portfolio", portfolio)
// --------- FETCH DATA
// fetch json of page of domais, given params
let baseUrl = document.getElementById("get_members_json_url");
if (!baseUrl) {
return;
}
let baseUrlValue = baseUrl.innerHTML;
if (!baseUrlValue) {
return;
}
let url = `${baseUrlValue}?${searchParams.toString()}` //TODO: uncomment for search function
fetch(url)
.then(response => response.json())
.then(data => {
if (data.error) {
console.error('Error in AJAX call: ' + data.error);
return;
}
// handle the display of proper messaging in the event that no members exist in the list or search returns no results
this.updateDisplay(data, this.tableWrapper, this.noTableWrapper, this.noSearchResultsWrapper, this.currentSearchTerm);
// identify the DOM element where the domain list will be inserted into the DOM
const memberList = document.querySelector('.members__table tbody');
memberList.innerHTML = '';
data.members.forEach(member => {
// const actionUrl = domain.action_url;
const member_name = member.name;
const member_email = member.email;
const last_active = member.last_active;
const action_url = member.action_url;
const action_label = member.action_label;
const svg_icon = member.svg_icon;
const row = document.createElement('tr');
let admin_tagHTML = ``;
if (member.is_admin)
admin_tagHTML = `<span class="usa-tag margin-left-1 bg-primary">Admin</span>`
row.innerHTML = `
<th scope="row" role="rowheader" data-label="member email">
${member_email ? member_email : member_name} ${admin_tagHTML}
</th>
<td data-sort-value="${last_active}" data-label="last_active">
${last_active}
</td>
<td>
<a href="${action_url}">
<svg class="usa-icon" aria-hidden="true" focusable="false" role="img" width="24">
<use xlink:href="/public/img/sprite.svg#${svg_icon}"></use>
</svg>
${action_label} <span class="usa-sr-only">${member_name}</span>
</a>
</td>
`;
memberList.appendChild(row);
});
// Do not scroll on first page load
if (scroll)
ScrollToElement('class', 'members');
this.scrollToTable = true;
// update pagination
this.updatePagination(
'member',
'#members-pagination',
'#members-pagination .usa-pagination__counter',
'#members',
data.page,
data.num_pages,
data.has_previous,
data.has_next,
data.total,
);
this.currentSortBy = sortBy;
this.currentOrder = order;
this.currentSearchTerm = searchTerm;
})
.catch(error => console.error('Error fetching members:', error));
}
}
/**
* An IIFE that listens for DOM Content to be loaded, then executes. This function
@ -1926,6 +2045,23 @@ const utcDateString = (dateString) => {
};
/**
* An IIFE that listens for DOM Content to be loaded, then executes. This function
* initializes the domains list and associated functionality on the home page of the app.
*
*/
document.addEventListener('DOMContentLoaded', function() {
const isMembersPage = document.querySelector("#members")
if (isMembersPage){
const membersTable = new MembersTable();
if (membersTable.tableWrapper) {
// Initial load
membersTable.loadTable(1);
}
}
});
/**
* An IIFE that displays confirmation modal on the user profile page
*/

View file

@ -126,7 +126,8 @@ html[data-theme="light"] {
body.dashboard,
body.change-list,
body.change-form,
.custom-admin-template, dt {
.custom-admin-template,
.dl-dja dt {
color: var(--body-fg);
}
.usa-table td {
@ -155,7 +156,8 @@ html[data-theme="dark"] {
body.dashboard,
body.change-list,
body.change-form,
.custom-admin-template, dt {
.custom-admin-template,
.dl-dja dt {
color: var(--body-fg);
}
.usa-table td {
@ -453,7 +455,8 @@ details.dja-detail-table {
background-color: var(--body-bg);
.dja-details-summary {
cursor: pointer;
color: var(--body-quiet-color);
color: var(--link-fg);
text-decoration: underline;
}
@media (max-width: 1024px){
@ -921,3 +924,8 @@ ul.add-list-reset {
word-break: break-all;
max-width: 100%;
}
.organization-admin-label {
font-weight: 600;
font-size: .8125rem;
}

View file

@ -68,21 +68,6 @@ legend.float-left-tablet + button.float-right-tablet {
}
}
// Custom style for disabled inputs
@media (prefers-color-scheme: light) {
.usa-input:disabled, .usa-select:disabled, .usa-textarea:disabled {
background-color: #eeeeee;
color: #666666;
}
}
@media (prefers-color-scheme: dark) {
.usa-input:disabled, .usa-select:disabled, .usa-textarea:disabled {
background-color: var(--body-fg);
color: var(--close-button-hover-bg);
}
}
.read-only-label {
font-size: size('body', 'sm');
color: color('primary-dark');

View file

@ -476,6 +476,8 @@ class JsonServerFormatter(ServerFormatter):
def format(self, record):
formatted_record = super().format(record)
if not hasattr(record, "server_time"):
record.server_time = self.formatTime(record, self.datefmt)
log_entry = {"server_time": record.server_time, "level": record.levelname, "message": formatted_record}
return json.dumps(log_entry)
@ -721,6 +723,7 @@ ALLOWED_HOSTS = [
"getgov-stable.app.cloud.gov",
"getgov-staging.app.cloud.gov",
"getgov-development.app.cloud.gov",
"getgov-el.app.cloud.gov",
"getgov-ad.app.cloud.gov",
"getgov-ms.app.cloud.gov",
"getgov-ag.app.cloud.gov",

View file

@ -22,20 +22,22 @@ from registrar.views.report_views import (
ExportDataTypeUser,
)
from registrar.views.domain_request import Step
# --jsons
from registrar.views.domain_requests_json import get_domain_requests_json
from registrar.views.transfer_user import TransferUserView
from registrar.views.domains_json import get_domains_json
from registrar.views.portfolio_members_json import get_portfolio_members_json
from registrar.views.utility.api_views import (
get_senior_official_from_federal_agency_json,
get_federal_and_portfolio_types_from_federal_agency_json,
get_action_needed_email_for_user_json,
get_rejection_email_for_user_json,
)
from registrar.views.domains_json import get_domains_json
from registrar.views.domain_request import Step
from registrar.views.transfer_user import TransferUserView
from registrar.views.utility import always_404
from api.views import available, rdap, get_current_federal, get_current_full
DOMAIN_REQUEST_NAMESPACE = views.DomainRequestWizard.URL_NAMESPACE
domain_request_urls = [
path("", views.DomainRequestWizard.as_view(), name=""),
@ -75,6 +77,16 @@ urlpatterns = [
views.PortfolioNoDomainsView.as_view(),
name="no-portfolio-domains",
),
path(
"members/",
views.PortfolioMembersView.as_view(),
name="members",
),
# path(
# "no-organization-members/",
# views.PortfolioNoMembersView.as_view(),
# name="no-portfolio-members",
# ),
path(
"requests/",
views.PortfolioDomainRequestsView.as_view(),
@ -282,6 +294,7 @@ urlpatterns = [
),
path("get-domains-json/", get_domains_json, name="get_domains_json"),
path("get-domain-requests-json/", get_domain_requests_json, name="get_domain_requests_json"),
path("get-portfolio-members-json/", get_portfolio_members_json, name="get_portfolio_members_json"),
]
# Djangooidc strips out context data from that context, so we define a custom error

View file

@ -97,5 +97,5 @@ def portfolio_permissions(request):
def is_widescreen_mode(request):
widescreen_paths = ["/domains/", "/requests/"]
widescreen_paths = ["/domains/", "/requests/", "/members/"]
return {"is_widescreen_mode": any(path in request.path for path in widescreen_paths) or request.path == "/"}

View file

@ -0,0 +1,138 @@
from datetime import timedelta
from django.utils import timezone
import logging
import random
from faker import Faker
from django.db import transaction
from registrar.fixtures.fixtures_requests import DomainRequestFixture
from registrar.fixtures.fixtures_users import UserFixture
from registrar.models import User, DomainRequest
from registrar.models.domain import Domain
fake = Faker()
logger = logging.getLogger(__name__)
class DomainFixture(DomainRequestFixture):
"""Create two domains and permissions on them for each user.
One domain will have a past expiration date.
Depends on fixtures_requests.
Make sure this class' `load` method is called from `handle`
in management/commands/load.py, then use `./manage.py load`
to run this code.
"""
@classmethod
def load(cls):
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
with transaction.atomic():
try:
# Get the usernames of users created in the UserFixture
created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF]
# Filter users to only include those created by the fixture
users = list(User.objects.filter(username__in=created_usernames))
except Exception as e:
logger.warning(e)
return
# Approve each user associated with `in review` status domains
cls._approve_domain_requests(users)
@staticmethod
def _generate_fake_expiration_date(days_in_future=365):
"""Generates a fake expiration date between 1 and 365 days in the future."""
current_date = timezone.now().date() # nosec
return current_date + timedelta(days=random.randint(1, days_in_future)) # nosec
@staticmethod
def _generate_fake_expiration_date_in_past():
"""Generates a fake expiration date up to 365 days in the past."""
current_date = timezone.now().date() # nosec
return current_date + timedelta(days=random.randint(-365, -1)) # nosec
@classmethod
def _approve_request(cls, domain_request, users):
"""Helper function to approve a domain request."""
if not domain_request:
return None
if domain_request.investigator is None:
# Assign random investigator if not already assigned
domain_request.investigator = random.choice(users) # nosec
# Approve the domain request
domain_request.approve(send_email=False)
return domain_request
@classmethod
def _approve_domain_requests(cls, users):
"""Approves one current and one expired request per user."""
domain_requests_to_update = []
expired_requests = []
for user in users:
# Get the latest and second-to-last domain requests
domain_requests = DomainRequest.objects.filter(
creator=user, status=DomainRequest.DomainRequestStatus.IN_REVIEW
).order_by("-id")[:2]
# Latest domain request
domain_request = domain_requests[0] if domain_requests else None
# Second-to-last domain request (expired)
domain_request_expired = domain_requests[1] if len(domain_requests) > 1 else None
# Approve the current domain request
if domain_request:
cls._approve_request(domain_request, users)
domain_requests_to_update.append(domain_request)
# Approve the expired domain request
if domain_request_expired:
cls._approve_request(domain_request_expired, users)
domain_requests_to_update.append(domain_request_expired)
expired_requests.append(domain_request_expired)
# Perform bulk update for the domain requests
cls._bulk_update_requests(domain_requests_to_update)
# Retrieve all domains associated with the domain requests
domains_to_update = Domain.objects.filter(domain_info__domain_request__in=domain_requests_to_update)
# Loop through and update expiration dates for domains
for domain in domains_to_update:
domain_request = domain.domain_info.domain_request
# Set the expiration date based on whether the request is expired
if domain_request in expired_requests:
domain.expiration_date = cls._generate_fake_expiration_date_in_past()
else:
domain.expiration_date = cls._generate_fake_expiration_date()
# Perform bulk update for the domains
cls._bulk_update_domains(domains_to_update)
@classmethod
def _bulk_update_requests(cls, domain_requests_to_update):
"""Bulk update domain requests."""
if domain_requests_to_update:
try:
DomainRequest.objects.bulk_update(domain_requests_to_update, ["status", "investigator"])
logger.info(f"Successfully updated {len(domain_requests_to_update)} requests.")
except Exception as e:
logger.error(f"Unexpected error during requests bulk update: {e}")
@classmethod
def _bulk_update_domains(cls, domains_to_update):
"""Bulk update domains with expiration dates."""
if domains_to_update:
try:
Domain.objects.bulk_update(domains_to_update, ["expiration_date"])
logger.info(f"Successfully updated {len(domains_to_update)} domains.")
except Exception as e:
logger.error(f"Unexpected error during domains bulk update: {e}")

View file

@ -0,0 +1,125 @@
import logging
import random
from faker import Faker
from django.db import transaction
from registrar.models import User, DomainRequest, FederalAgency
from registrar.models.portfolio import Portfolio
from registrar.models.senior_official import SeniorOfficial
fake = Faker()
logger = logging.getLogger(__name__)
class PortfolioFixture:
"""
Creates 2 pre-defined portfolios with the infrastructure to add more.
Make sure this class' `load` method is called from `handle`
in management/commands/load.py, then use `./manage.py load`
to run this code.
"""
PORTFOLIOS = [
{
"organization_name": "Hotel California",
},
{
"organization_name": "Wish You Were Here",
},
]
@classmethod
def fake_so(cls):
return {
"first_name": fake.first_name(),
"last_name": fake.last_name(),
"title": fake.job(),
"email": fake.ascii_safe_email(),
"phone": "201-555-5555",
}
@classmethod
def _set_non_foreign_key_fields(cls, portfolio: Portfolio, portfolio_dict: dict):
"""Helper method used by `load`."""
portfolio.organization_type = (
portfolio_dict["organization_type"]
if "organization_type" in portfolio_dict
else DomainRequest.OrganizationChoices.FEDERAL
)
portfolio.notes = portfolio_dict["notes"] if "notes" in portfolio_dict else None
portfolio.address_line1 = (
portfolio_dict["address_line1"] if "address_line1" in portfolio_dict else fake.street_address()
)
portfolio.address_line2 = portfolio_dict["address_line2"] if "address_line2" in portfolio_dict else None
portfolio.city = portfolio_dict["city"] if "city" in portfolio_dict else fake.city()
portfolio.state_territory = (
portfolio_dict["state_territory"] if "state_territory" in portfolio_dict else fake.state_abbr()
)
portfolio.zipcode = portfolio_dict["zipcode"] if "zipcode" in portfolio_dict else fake.postalcode()
portfolio.urbanization = portfolio_dict["urbanization"] if "urbanization" in portfolio_dict else None
portfolio.security_contact_email = (
portfolio_dict["security_contact_email"] if "security_contact_email" in portfolio_dict else fake.email()
)
@classmethod
def _set_foreign_key_fields(cls, portfolio: Portfolio, portfolio_dict: dict, user: User):
"""Helper method used by `load`."""
if not portfolio.senior_official:
if portfolio_dict.get("senior_official") is not None:
portfolio.senior_official, _ = SeniorOfficial.objects.get_or_create(**portfolio_dict["senior_official"])
else:
portfolio.senior_official = SeniorOfficial.objects.create(**cls.fake_so())
if not portfolio.federal_agency:
if portfolio_dict.get("federal_agency") is not None:
portfolio.federal_agency, _ = FederalAgency.objects.get_or_create(name=portfolio_dict["federal_agency"])
else:
federal_agencies = FederalAgency.objects.all()
# Random choice of agency for selects, used as placeholders for testing.
portfolio.federal_agency = random.choice(federal_agencies) # nosec
@classmethod
def load(cls):
"""Creates portfolios."""
logger.info("Going to load %s portfolios" % len(cls.PORTFOLIOS))
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
with transaction.atomic():
try:
user = User.objects.all().last()
except Exception as e:
logger.warning(e)
return
portfolios_to_create = []
for portfolio_data in cls.PORTFOLIOS:
organization_name = portfolio_data["organization_name"]
# Check if portfolio with the organization name already exists
if Portfolio.objects.filter(organization_name=organization_name).exists():
logger.info(
f"Portfolio with organization name '{organization_name}' already exists, skipping creation."
)
continue
try:
portfolio = Portfolio(
creator=user,
organization_name=portfolio_data["organization_name"],
)
cls._set_non_foreign_key_fields(portfolio, portfolio_data)
cls._set_foreign_key_fields(portfolio, portfolio_data, user)
portfolios_to_create.append(portfolio)
except Exception as e:
logger.warning(e)
# Bulk create domain requests
if len(portfolios_to_create) > 0:
try:
Portfolio.objects.bulk_create(portfolios_to_create)
logger.info(f"Successfully created {len(portfolios_to_create)} portfolios")
except Exception as e:
logger.warning(f"Error bulk creating portfolios: {e}")

View file

@ -0,0 +1,325 @@
from datetime import timedelta
from django.utils import timezone
import logging
import random
from faker import Faker
from django.db import transaction
from registrar.fixtures.fixtures_portfolios import PortfolioFixture
from registrar.fixtures.fixtures_users import UserFixture
from registrar.models import User, DomainRequest, DraftDomain, Contact, Website, FederalAgency
from registrar.models.portfolio import Portfolio
from registrar.models.suborganization import Suborganization
fake = Faker()
logger = logging.getLogger(__name__)
class DomainRequestFixture:
"""
Creates domain requests for each user in the database,
assign portfolios and suborgs.
Creates 3 in_review requests, one for approving with an expired domain,
one for approving with a non-expired domain, and one for leaving in in_review.
Depends on fixtures_portfolios and fixtures_suborganizations.
Make sure this class' `load` method is called from `handle`
in management/commands/load.py, then use `./manage.py load`
to run this code.
"""
# any fields not specified here will be filled in with fake data or defaults
# NOTE BENE: each fixture must have `organization_name` for uniqueness!
# Here is a more complete example as a template:
# {
# "status": "started",
# "organization_name": "Example - Just started",
# "generic_org_type": "federal",
# "federal_agency": None,
# "federal_type": None,
# "address_line1": None,
# "address_line2": None,
# "city": None,
# "state_territory": None,
# "zipcode": None,
# "urbanization": None,
# "purpose": None,
# "anything_else": None,
# "is_policy_acknowledged": None,
# "senior_official": None,
# "other_contacts": [],
# "current_websites": [],
# "alternative_domains": [],
# },
DOMAINREQUESTS = [
{
"status": DomainRequest.DomainRequestStatus.STARTED,
"organization_name": "Example - Finished but not submitted",
},
{
"status": DomainRequest.DomainRequestStatus.SUBMITTED,
"organization_name": "Example - Submitted but pending investigation",
},
{
"status": DomainRequest.DomainRequestStatus.IN_REVIEW,
"organization_name": "Example - In investigation",
},
{
"status": DomainRequest.DomainRequestStatus.IN_REVIEW,
"organization_name": "Example - Approved",
},
{
"status": DomainRequest.DomainRequestStatus.IN_REVIEW,
"organization_name": "Example - Approved, domain expired",
},
{
"status": DomainRequest.DomainRequestStatus.WITHDRAWN,
"organization_name": "Example - Withdrawn",
},
{
"status": DomainRequest.DomainRequestStatus.ACTION_NEEDED,
"organization_name": "Example - Action needed",
},
{
"status": "rejected",
"organization_name": "Example - Rejected",
},
]
@classmethod
def fake_contact(cls):
return {
"first_name": fake.first_name(),
"middle_name": None,
"last_name": fake.last_name(),
"title": fake.job(),
"email": fake.ascii_safe_email(),
"phone": "201-555-5555",
}
@classmethod
def fake_dot_gov(cls):
return f"{fake.slug()}.gov"
@classmethod
def fake_expiration_date(cls):
"""Generates a fake expiration date between 0 and 1 year in the future."""
current_date = timezone.now().date()
days_in_future = random.randint(0, 365) # nosec
return current_date + timedelta(days=days_in_future)
@classmethod
def _set_non_foreign_key_fields(cls, request: DomainRequest, request_dict: dict):
"""Helper method used by `load`."""
request.status = request_dict["status"] if "status" in request_dict else "started"
# TODO for a future ticket: Allow for more than just "federal" here
request.generic_org_type = request_dict["generic_org_type"] if "generic_org_type" in request_dict else "federal"
if request.status != "started":
request.last_submitted_date = fake.date()
request.federal_type = (
request_dict["federal_type"]
if "federal_type" in request_dict
else random.choice(["executive", "judicial", "legislative"]) # nosec
)
request.address_line1 = (
request_dict["address_line1"] if "address_line1" in request_dict else fake.street_address()
)
request.address_line2 = request_dict["address_line2"] if "address_line2" in request_dict else None
request.city = request_dict["city"] if "city" in request_dict else fake.city()
request.state_territory = (
request_dict["state_territory"] if "state_territory" in request_dict else fake.state_abbr()
)
request.zipcode = request_dict["zipcode"] if "zipcode" in request_dict else fake.postalcode()
request.urbanization = request_dict["urbanization"] if "urbanization" in request_dict else None
request.purpose = request_dict["purpose"] if "purpose" in request_dict else fake.paragraph()
request.has_cisa_representative = (
request_dict["has_cisa_representative"] if "has_cisa_representative" in request_dict else True
)
request.cisa_representative_email = (
request_dict["cisa_representative_email"] if "cisa_representative_email" in request_dict else fake.email()
)
request.cisa_representative_first_name = (
request_dict["cisa_representative_first_name"]
if "cisa_representative_first_name" in request_dict
else fake.first_name()
)
request.cisa_representative_last_name = (
request_dict["cisa_representative_last_name"]
if "cisa_representative_last_name" in request_dict
else fake.last_name()
)
request.has_anything_else_text = (
request_dict["has_anything_else_text"] if "has_anything_else_text" in request_dict else True
)
request.anything_else = request_dict["anything_else"] if "anything_else" in request_dict else fake.paragraph()
request.is_policy_acknowledged = (
request_dict["is_policy_acknowledged"] if "is_policy_acknowledged" in request_dict else True
)
@classmethod
def _set_foreign_key_fields(cls, request: DomainRequest, request_dict: dict, user: User):
"""Helper method used by `load`."""
request.investigator = cls._get_investigator(request, request_dict, user)
request.senior_official = cls._get_senior_official(request, request_dict)
request.requested_domain = cls._get_requested_domain(request, request_dict)
request.federal_agency = cls._get_federal_agency(request, request_dict)
request.portfolio = cls._get_portfolio(request, request_dict)
request.sub_organization = cls._get_sub_organization(request, request_dict)
@classmethod
def _get_investigator(cls, request: DomainRequest, request_dict: dict, user: User):
if not request.investigator:
return User.objects.get(username=user.username) if "investigator" in request_dict else None
return request.investigator
@classmethod
def _get_senior_official(cls, request: DomainRequest, request_dict: dict):
if not request.senior_official:
if "senior_official" in request_dict and request_dict["senior_official"] is not None:
return Contact.objects.get_or_create(**request_dict["senior_official"])[0]
return Contact.objects.create(**cls.fake_contact())
return request.senior_official
@classmethod
def _get_requested_domain(cls, request: DomainRequest, request_dict: dict):
if not request.requested_domain:
if "requested_domain" in request_dict and request_dict["requested_domain"] is not None:
return DraftDomain.objects.get_or_create(name=request_dict["requested_domain"])[0]
return DraftDomain.objects.create(name=cls.fake_dot_gov())
return request.requested_domain
@classmethod
def _get_federal_agency(cls, request: DomainRequest, request_dict: dict):
if not request.federal_agency:
if "federal_agency" in request_dict and request_dict["federal_agency"] is not None:
return FederalAgency.objects.get_or_create(name=request_dict["federal_agency"])[0]
return random.choice(FederalAgency.objects.all()) # nosec
return request.federal_agency
@classmethod
def _get_portfolio(cls, request: DomainRequest, request_dict: dict):
if not request.portfolio:
if "portfolio" in request_dict and request_dict["portfolio"] is not None:
return Portfolio.objects.get_or_create(name=request_dict["portfolio"])[0]
return cls._get_random_portfolio()
return request.portfolio
@classmethod
def _get_sub_organization(cls, request: DomainRequest, request_dict: dict):
if not request.sub_organization:
if "sub_organization" in request_dict and request_dict["sub_organization"] is not None:
return Suborganization.objects.get_or_create(name=request_dict["sub_organization"])[0]
return cls._get_random_sub_organization()
return request.sub_organization
@classmethod
def _get_random_portfolio(cls):
try:
organization_names = [portfolio["organization_name"] for portfolio in PortfolioFixture.PORTFOLIOS]
portfolio_options = Portfolio.objects.filter(organization_name__in=organization_names)
return random.choice(portfolio_options) if portfolio_options.exists() else None # nosec
except Exception as e:
logger.warning(f"Expected fixture portfolio, did not find it: {e}")
return None
@classmethod
def _get_random_sub_organization(cls):
try:
suborg_options = [Suborganization.objects.first(), Suborganization.objects.last()]
return random.choice(suborg_options) # nosec
except Exception as e:
logger.warning(f"Expected fixture sub_organization, did not find it: {e}")
return None
@classmethod
def _set_many_to_many_relations(cls, request: DomainRequest, request_dict: dict):
"""Helper method used by `load`."""
if "other_contacts" in request_dict:
for contact in request_dict["other_contacts"]:
request.other_contacts.add(Contact.objects.get_or_create(**contact)[0])
elif not request.other_contacts.exists():
other_contacts = [
Contact.objects.create(**cls.fake_contact()) for _ in range(random.randint(1, 3)) # nosec
]
request.other_contacts.add(*other_contacts)
if "current_websites" in request_dict:
for website in request_dict["current_websites"]:
request.current_websites.add(Website.objects.get_or_create(website=website)[0])
elif not request.current_websites.exists():
current_websites = [
Website.objects.create(website=fake.uri()) for _ in range(random.randint(0, 3)) # nosec
]
request.current_websites.add(*current_websites)
if "alternative_domains" in request_dict:
for domain in request_dict["alternative_domains"]:
request.alternative_domains.add(Website.objects.get_or_create(website=domain)[0])
elif not request.alternative_domains.exists():
alternative_domains = [
Website.objects.create(website=cls.fake_dot_gov()) for _ in range(random.randint(0, 3)) # nosec
]
request.alternative_domains.add(*alternative_domains)
@classmethod
def load(cls):
"""Creates domain requests for each user in the database."""
logger.info("Going to load %s domain requests" % len(cls.DOMAINREQUESTS))
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
with transaction.atomic():
try:
# Get the usernames of users created in the UserFixture
created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF]
# Filter users to only include those created by the fixture
users = list(User.objects.filter(username__in=created_usernames))
except Exception as e:
logger.warning(e)
return
cls._create_domain_requests(users)
@classmethod
def _create_domain_requests(cls, users):
"""Creates DomainRequests given a list of users."""
domain_requests_to_create = []
for user in users:
for request_data in cls.DOMAINREQUESTS:
# Prepare DomainRequest objects
try:
domain_request = DomainRequest(
creator=user,
organization_name=request_data["organization_name"],
)
cls._set_non_foreign_key_fields(domain_request, request_data)
cls._set_foreign_key_fields(domain_request, request_data, user)
domain_requests_to_create.append(domain_request)
except Exception as e:
logger.warning(e)
# Bulk create domain requests
cls._bulk_create_requests(domain_requests_to_create)
# Now many-to-many relationships
for domain_request in domain_requests_to_create:
try:
cls._set_many_to_many_relations(domain_request, request_data)
except Exception as e:
logger.warning(e)
@classmethod
def _bulk_create_requests(cls, domain_requests_to_create):
"""Bulk create domain requests."""
if len(domain_requests_to_create) > 0:
try:
DomainRequest.objects.bulk_create(domain_requests_to_create)
logger.info(f"Successfully created {len(domain_requests_to_create)} requests.")
except Exception as e:
logger.error(f"Unexpected error during requests bulk creation: {e}")

View file

@ -0,0 +1,87 @@
import logging
from faker import Faker
from django.db import transaction
from registrar.models.portfolio import Portfolio
from registrar.models.suborganization import Suborganization
fake = Faker()
logger = logging.getLogger(__name__)
class SuborganizationFixture:
"""
Creates 2 pre-defined suborg with the infrastructure to add more.
Depends on fixtures_portfolios.
Make sure this class' `load` method is called from `handle`
in management/commands/load.py, then use `./manage.py load`
to run this code.
"""
SUBORGS = [
{
"name": "Take it Easy",
},
{
"name": "Welcome to the Machine",
},
]
@classmethod
def load(cls):
"""Creates suborganizations."""
logger.info(f"Going to load {len(cls.SUBORGS)} suborgs")
with transaction.atomic():
portfolios = cls._get_portfolios()
if not portfolios:
return
suborgs_to_create = cls._prepare_suborgs_to_create(portfolios)
cls._bulk_create_suborgs(suborgs_to_create)
@classmethod
def _get_portfolios(cls):
"""Fetches portfolios with organization_name 'Hotel California' and 'Wish You Were Here'
and logs warnings if not found."""
try:
portfolio1 = Portfolio.objects.filter(organization_name="Hotel California").first()
portfolio2 = Portfolio.objects.filter(organization_name="Wish You Were Here").first()
if not portfolio1 or not portfolio2:
logger.warning("One or both portfolios not found.")
return None
return portfolio1, portfolio2
except Exception as e:
logger.warning(f"Error fetching portfolios: {e}")
return None
@classmethod
def _prepare_suborgs_to_create(cls, portfolios):
"""Prepares a list of suborganizations to create, avoiding duplicates."""
portfolio1, portfolio2 = portfolios
suborgs_to_create = []
try:
if not Suborganization.objects.filter(name=cls.SUBORGS[0]["name"]).exists():
suborgs_to_create.append(Suborganization(portfolio=portfolio1, name=cls.SUBORGS[0]["name"]))
if not Suborganization.objects.filter(name=cls.SUBORGS[1]["name"]).exists():
suborgs_to_create.append(Suborganization(portfolio=portfolio2, name=cls.SUBORGS[1]["name"]))
except Exception as e:
logger.warning(f"Error creating suborg objects: {e}")
return suborgs_to_create
@classmethod
def _bulk_create_suborgs(cls, suborgs_to_create):
"""Bulk creates suborganizations and logs success or errors."""
if suborgs_to_create:
try:
Suborganization.objects.bulk_create(suborgs_to_create)
logger.info(f"Successfully created {len(suborgs_to_create)} suborgs")
except Exception as e:
logger.warning(f"Error bulk creating suborgs: {e}")

View file

@ -0,0 +1,86 @@
import logging
from faker import Faker
from django.db import transaction
from registrar.fixtures.fixtures_portfolios import PortfolioFixture
from registrar.fixtures.fixtures_users import UserFixture
from registrar.models import User
from registrar.models.portfolio import Portfolio
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
fake = Faker()
logger = logging.getLogger(__name__)
class UserPortfolioPermissionFixture:
"""Create user portfolio permissions for each user.
Each user will be admin on 2 portfolios.
Depends on fixture_portfolios"""
@classmethod
def load(cls):
logger.info("Going to set user portfolio permissions")
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
with transaction.atomic():
try:
# Get the usernames of users created in the UserFixture
created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF]
# Filter users to only include those created by the fixture
users = list(User.objects.filter(username__in=created_usernames))
organization_names = [portfolio["organization_name"] for portfolio in PortfolioFixture.PORTFOLIOS]
portfolios = list(Portfolio.objects.filter(organization_name__in=organization_names))
if not users:
logger.warning("User fixtures missing.")
return
if not portfolios:
logger.warning("Portfolio fixtures missing.")
return
except Exception as e:
logger.warning(e)
return
user_portfolio_permissions_to_create = []
for user in users:
for portfolio in portfolios:
try:
if not UserPortfolioPermission.objects.filter(user=user, portfolio=portfolio).exists():
user_portfolio_permission = UserPortfolioPermission(
user=user,
portfolio=portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
)
user_portfolio_permissions_to_create.append(user_portfolio_permission)
else:
logger.info(
f"Permission exists for user '{user.username}' "
f"on portfolio '{portfolio.organization_name}'."
)
except Exception as e:
logger.warning(e)
# Bulk create permissions
cls._bulk_create_permissions(user_portfolio_permissions_to_create)
@classmethod
def _bulk_create_permissions(cls, user_portfolio_permissions_to_create):
"""Bulk creates permissions and logs success or errors."""
if user_portfolio_permissions_to_create:
try:
UserPortfolioPermission.objects.bulk_create(user_portfolio_permissions_to_create)
logger.info(
f"Successfully created {len(user_portfolio_permissions_to_create)} user portfolio permissions."
)
except Exception as e:
logger.error(f"Unexpected error during portfolio permission bulk creation: {e}")
else:
logger.info("No new user portfolio permissions to create.")

View file

@ -23,129 +23,123 @@ class UserFixture:
"""
ADMINS = [
{
"username": "43a7fa8d-0550-4494-a6fe-81500324d590",
"first_name": "Jyoti",
"last_name": "Bock",
"email": "jyotibock@truss.works",
},
{
"username": "aad084c3-66cc-4632-80eb-41cdf5c5bcbf",
"first_name": "Aditi",
"last_name": "Green",
"email": "aditidevelops+01@gmail.com",
"title": "Positive vibes",
},
{
"username": "be17c826-e200-4999-9389-2ded48c43691",
"first_name": "Matthew",
"last_name": "Spence",
"title": "Hollywood hair",
},
{
"username": "5f283494-31bd-49b5-b024-a7e7cae00848",
"first_name": "Rachid",
"last_name": "Mrad",
"email": "rachid.mrad@associates.cisa.dhs.gov",
"title": "Common pirate",
},
{
"username": "eb2214cd-fc0c-48c0-9dbd-bc4cd6820c74",
"first_name": "Alysia",
"last_name": "Broddrick",
"email": "abroddrick@truss.works",
"title": "I drink coffee and know things",
},
{
"username": "8f8e7293-17f7-4716-889b-1990241cbd39",
"first_name": "Katherine",
"last_name": "Osos",
"email": "kosos@truss.works",
"title": "Grove keeper",
},
{
"username": "70488e0a-e937-4894-a28c-16f5949effd4",
"first_name": "Gaby",
"last_name": "DiSarli",
"email": "gaby@truss.works",
"title": "De Stijl",
},
{
"username": "83c2b6dd-20a2-4cac-bb40-e22a72d2955c",
"first_name": "Cameron",
"last_name": "Dixon",
"email": "cameron.dixon@cisa.dhs.gov",
},
{
"username": "0353607a-cbba-47d2-98d7-e83dcd5b90ea",
"first_name": "Ryan",
"last_name": "Brooks",
"title": "Product owner",
},
{
"username": "30001ee7-0467-4df2-8db2-786e79606060",
"first_name": "Zander",
"last_name": "Adkinson",
"title": "ACME specialist",
},
{
"username": "2bf518c2-485a-4c42-ab1a-f5a8b0a08484",
"first_name": "Paul",
"last_name": "Kuykendall",
"title": "Dr. Silvertongue",
},
{
"username": "2a88a97b-be96-4aad-b99e-0b605b492c78",
"first_name": "Rebecca",
"last_name": "Hsieh",
"email": "rebecca.hsieh@truss.works",
"title": "Catlady",
},
{
"username": "fa69c8e8-da83-4798-a4f2-263c9ce93f52",
"first_name": "David",
"last_name": "Kennedy",
"email": "david.kennedy@ecstech.com",
"title": "Mean lean coding machine",
},
{
"username": "f14433d8-f0e9-41bf-9c72-b99b110e665d",
"first_name": "Nicolle",
"last_name": "LeClair",
"email": "nicolle.leclair@ecstech.com",
"title": "Nightowl",
},
{
"username": "24840450-bf47-4d89-8aa9-c612fe68f9da",
"first_name": "Erin",
"last_name": "Song",
"title": "Catlady 2",
},
{
"username": "e0ea8b94-6e53-4430-814a-849a7ca45f21",
"first_name": "Kristina",
"last_name": "Yin",
"title": "Hufflepuff prefect",
},
{
"username": "ac49d7c1-368a-4e6b-8f1d-60250e20a16f",
"first_name": "Vicky",
"last_name": "Chin",
"email": "szu.chin@associates.cisa.dhs.gov",
"title": "Ze whip",
},
{
"username": "66bb1a5a-a091-4d7f-a6cf-4d772b4711c7",
"first_name": "Christina",
"last_name": "Burnett",
"email": "christina.burnett@cisa.dhs.gov",
},
{
"username": "012f844d-8a0f-4225-9d82-cbf87bff1d3e",
"first_name": "Riley",
"last_name": "Orr",
"email": "riley+320@truss.works",
"title": "Groovy",
},
{
"username": "76612d84-66b0-4ae9-9870-81e98b9858b6",
"first_name": "Anna",
"last_name": "Gingle",
"email": "annagingle@truss.works",
"title": "Sweetwater sailor",
},
]
STAFF = [
{
"username": "a5906815-dd80-4c64-aebe-2da6a4c9d7a4",
"first_name": "Jyoti-Analyst",
"last_name": "Bock-Analyst",
"email": "jyotibock+1@truss.works",
},
{
"username": "ffec5987-aa84-411b-a05a-a7ee5cbcde54",
"first_name": "Aditi-Analyst",
@ -231,18 +225,6 @@ class UserFixture:
"last_name": "Burnett-Analyst",
"email": "christina.burnett@gwe.cisa.dhs.gov",
},
{
"username": "d9839768-0c17-4fa2-9c8e-36291eef5c11",
"first_name": "Alex-Analyst",
"last_name": "Mcelya-Analyst",
"email": "ALEXANDER.MCELYA@cisa.dhs.gov",
},
{
"username": "082a066f-e0a4-45f6-8672-4343a1208a36",
"first_name": "Riley-Analyst",
"last_name": "Orr-Analyst",
"email": "riley+321@truss.works",
},
{
"username": "e1e350b1-cfc1-4753-a6cb-3ae6d912f99c",
"first_name": "Anna-Analyst",
@ -254,29 +236,61 @@ class UserFixture:
# Additional emails to add to the AllowedEmail whitelist.
ADDITIONAL_ALLOWED_EMAILS: list[str] = ["davekenn4242@gmail.com", "rachid_mrad@hotmail.com"]
@classmethod
def load_users(cls, users, group_name, are_superusers=False):
logger.info(f"Going to load {len(users)} users in group {group_name}")
for user_data in users:
try:
user, _ = User.objects.get_or_create(username=user_data["username"])
user.is_superuser = are_superusers
user.first_name = user_data["first_name"]
user.last_name = user_data["last_name"]
if "email" in user_data:
user.email = user_data["email"]
user.is_staff = True
user.is_active = True
# This verification type will get reverted to "regular" (or whichever is applicables)
# once the user logs in for the first time (as they then got verified through different means).
# In the meantime, we can still describe how the user got here in the first place.
user.verification_type = User.VerificationTypeChoices.FIXTURE_USER
"""Loads the users into the database and assigns them to the specified group."""
logger.info(f"Going to load {len(users)} users for group {group_name}")
group = UserGroup.objects.get(name=group_name)
user.groups.add(group)
user.save()
logger.debug(f"User object created for {user_data['first_name']}")
# Prepare sets of existing usernames and IDs in one query
user_identifiers = [(user.get("username"), user.get("id")) for user in users]
existing_users = User.objects.filter(
username__in=[user[0] for user in user_identifiers] + [user[1] for user in user_identifiers]
).values_list("username", "id")
existing_usernames = set(user[0] for user in existing_users)
existing_user_ids = set(user[1] for user in existing_users)
# Filter out users with existing IDs or usernames
new_users = [
User(
id=user_data.get("id"),
first_name=user_data.get("first_name"),
last_name=user_data.get("last_name"),
username=user_data.get("username"),
email=user_data.get("email", ""),
title=user_data.get("title", "Peon"),
phone=user_data.get("phone", "2022222222"),
is_active=user_data.get("is_active", True),
is_staff=True,
is_superuser=are_superusers,
)
for user_data in users
if user_data.get("username") not in existing_usernames and user_data.get("id") not in existing_user_ids
]
# Perform bulk creation for new users
if new_users:
try:
User.objects.bulk_create(new_users)
logger.info(f"Created {len(new_users)} new users.")
except Exception as e:
logger.warning(e)
logger.info(f"All users in group {group_name} loaded.")
logger.error(f"Unexpected error during user bulk creation: {e}")
else:
logger.info("No new users to create.")
# Get all users to be updated (both new and existing)
created_or_existing_users = User.objects.filter(username__in=[user.get("username") for user in users])
# Filter out users who are already in the group
users_not_in_group = created_or_existing_users.exclude(groups__id=group.id)
# Add only users who are not already in the group
if users_not_in_group.exists():
group.user_set.add(*users_not_in_group)
logger.info(f"Users loaded for group {group_name}.")
def load_allowed_emails(cls, users, additional_emails):
"""Populates a whitelist of allowed emails (as defined in this list)"""
@ -284,37 +298,33 @@ class UserFixture:
if additional_emails:
logger.info(f"Going to load {len(additional_emails)} additional allowed emails")
# Load user emails
allowed_emails = []
existing_emails = set(AllowedEmail.objects.values_list("email", flat=True))
new_allowed_emails = []
for user_data in users:
user_email = user_data.get("email")
if user_email and user_email not in allowed_emails:
allowed_emails.append(AllowedEmail(email=user_email))
else:
first_name = user_data.get("first_name")
last_name = user_data.get("last_name")
logger.warning(f"Could not add email to whitelist for {first_name} {last_name}.")
if user_email and user_email not in existing_emails:
new_allowed_emails.append(AllowedEmail(email=user_email))
# Load additional emails
allowed_emails.extend([AllowedEmail(email=email) for email in additional_emails])
# Load additional emails, only if they don't exist already
for email in additional_emails:
if email not in existing_emails:
new_allowed_emails.append(AllowedEmail(email=email))
if allowed_emails:
AllowedEmail.objects.bulk_create(allowed_emails)
logger.info(f"Loaded {len(allowed_emails)} allowed emails")
if new_allowed_emails:
try:
AllowedEmail.objects.bulk_create(new_allowed_emails)
logger.info(f"Loaded {len(new_allowed_emails)} allowed emails")
except Exception as e:
logger.error(f"Unexpected error during allowed emails bulk creation: {e}")
else:
logger.info("No allowed emails to load")
@classmethod
def load(cls):
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
# This is slightly different then bulk_create or bulk_update, in that
# you still get the same behaviour of .save(), but those incremental
# steps now do not need to close/reopen a db connection,
# instead they share one.
with transaction.atomic():
cls.load_users(cls, cls.ADMINS, "full_access_group", are_superusers=True)
cls.load_users(cls, cls.STAFF, "cisa_analysts_group")
cls.load_users(cls.ADMINS, "full_access_group", are_superusers=True)
cls.load_users(cls.STAFF, "cisa_analysts_group")
# Combine ADMINS and STAFF lists
all_users = cls.ADMINS + cls.STAFF

View file

@ -1,236 +0,0 @@
import logging
import random
from faker import Faker
from django.db import transaction
from registrar.models import User, DomainRequest, DraftDomain, Contact, Website, FederalAgency
fake = Faker()
logger = logging.getLogger(__name__)
class DomainRequestFixture:
"""
Load domain requests into the database.
Make sure this class' `load` method is called from `handle`
in management/commands/load.py, then use `./manage.py load`
to run this code.
"""
# any fields not specified here will be filled in with fake data or defaults
# NOTE BENE: each fixture must have `organization_name` for uniqueness!
# Here is a more complete example as a template:
# {
# "status": "started",
# "organization_name": "Example - Just started",
# "generic_org_type": "federal",
# "federal_agency": None,
# "federal_type": None,
# "address_line1": None,
# "address_line2": None,
# "city": None,
# "state_territory": None,
# "zipcode": None,
# "urbanization": None,
# "purpose": None,
# "anything_else": None,
# "is_policy_acknowledged": None,
# "senior_official": None,
# "other_contacts": [],
# "current_websites": [],
# "alternative_domains": [],
# },
DA = [
{
"status": DomainRequest.DomainRequestStatus.STARTED,
"organization_name": "Example - Finished but not submitted",
},
{
"status": DomainRequest.DomainRequestStatus.SUBMITTED,
"organization_name": "Example - Submitted but pending investigation",
},
{
"status": DomainRequest.DomainRequestStatus.IN_REVIEW,
"organization_name": "Example - In investigation",
},
{
"status": DomainRequest.DomainRequestStatus.IN_REVIEW,
"organization_name": "Example - Approved",
},
{
"status": DomainRequest.DomainRequestStatus.WITHDRAWN,
"organization_name": "Example - Withdrawn",
},
{
"status": DomainRequest.DomainRequestStatus.ACTION_NEEDED,
"organization_name": "Example - Action needed",
},
{
"status": "rejected",
"organization_name": "Example - Rejected",
},
]
@classmethod
def fake_contact(cls):
return {
"first_name": fake.first_name(),
"middle_name": None,
"last_name": fake.last_name(),
"title": fake.job(),
"email": fake.ascii_safe_email(),
"phone": "201-555-5555",
}
@classmethod
def fake_dot_gov(cls):
return f"{fake.slug()}.gov"
@classmethod
def _set_non_foreign_key_fields(cls, da: DomainRequest, app: dict):
"""Helper method used by `load`."""
da.status = app["status"] if "status" in app else "started"
# TODO for a future ticket: Allow for more than just "federal" here
da.generic_org_type = app["generic_org_type"] if "generic_org_type" in app else "federal"
da.last_submitted_date = fake.date()
da.federal_type = (
app["federal_type"]
if "federal_type" in app
else random.choice(["executive", "judicial", "legislative"]) # nosec
)
da.address_line1 = app["address_line1"] if "address_line1" in app else fake.street_address()
da.address_line2 = app["address_line2"] if "address_line2" in app else None
da.city = app["city"] if "city" in app else fake.city()
da.state_territory = app["state_territory"] if "state_territory" in app else fake.state_abbr()
da.zipcode = app["zipcode"] if "zipcode" in app else fake.postalcode()
da.urbanization = app["urbanization"] if "urbanization" in app else None
da.purpose = app["purpose"] if "purpose" in app else fake.paragraph()
da.anything_else = app["anything_else"] if "anything_else" in app else None
da.is_policy_acknowledged = app["is_policy_acknowledged"] if "is_policy_acknowledged" in app else True
@classmethod
def _set_foreign_key_fields(cls, da: DomainRequest, app: dict, user: User):
"""Helper method used by `load`."""
if not da.investigator:
da.investigator = User.objects.get(username=user.username) if "investigator" in app else None
if not da.senior_official:
if "senior_official" in app and app["senior_official"] is not None:
da.senior_official, _ = Contact.objects.get_or_create(**app["senior_official"])
else:
da.senior_official = Contact.objects.create(**cls.fake_contact())
if not da.requested_domain:
if "requested_domain" in app and app["requested_domain"] is not None:
da.requested_domain, _ = DraftDomain.objects.get_or_create(name=app["requested_domain"])
else:
da.requested_domain = DraftDomain.objects.create(name=cls.fake_dot_gov())
if not da.federal_agency:
if "federal_agency" in app and app["federal_agency"] is not None:
da.federal_agency, _ = FederalAgency.objects.get_or_create(name=app["federal_agency"])
else:
federal_agencies = FederalAgency.objects.all()
# Random choice of agency for selects, used as placeholders for testing.
da.federal_agency = random.choice(federal_agencies) # nosec
@classmethod
def _set_many_to_many_relations(cls, da: DomainRequest, app: dict):
"""Helper method used by `load`."""
if "other_contacts" in app:
for contact in app["other_contacts"]:
da.other_contacts.add(Contact.objects.get_or_create(**contact)[0])
elif not da.other_contacts.exists():
other_contacts = [
Contact.objects.create(**cls.fake_contact()) for _ in range(random.randint(0, 3)) # nosec
]
da.other_contacts.add(*other_contacts)
if "current_websites" in app:
for website in app["current_websites"]:
da.current_websites.add(Website.objects.get_or_create(website=website)[0])
elif not da.current_websites.exists():
current_websites = [
Website.objects.create(website=fake.uri()) for _ in range(random.randint(0, 3)) # nosec
]
da.current_websites.add(*current_websites)
if "alternative_domains" in app:
for domain in app["alternative_domains"]:
da.alternative_domains.add(Website.objects.get_or_create(website=domain)[0])
elif not da.alternative_domains.exists():
alternative_domains = [
Website.objects.create(website=cls.fake_dot_gov()) for _ in range(random.randint(0, 3)) # nosec
]
da.alternative_domains.add(*alternative_domains)
@classmethod
def load(cls):
"""Creates domain requests for each user in the database."""
logger.info("Going to load %s domain requests" % len(cls.DA))
try:
users = list(User.objects.all()) # force evaluation to catch db errors
except Exception as e:
logger.warning(e)
return
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
with transaction.atomic():
cls._create_domain_requests(users)
@classmethod
def _create_domain_requests(cls, users):
"""Creates DomainRequests given a list of users"""
for user in users:
logger.debug("Loading domain requests for %s" % user)
for app in cls.DA:
try:
da, _ = DomainRequest.objects.get_or_create(
creator=user,
organization_name=app["organization_name"],
)
cls._set_non_foreign_key_fields(da, app)
cls._set_foreign_key_fields(da, app, user)
da.save()
cls._set_many_to_many_relations(da, app)
except Exception as e:
logger.warning(e)
class DomainFixture(DomainRequestFixture):
"""Create one domain and permissions on it for each user."""
@classmethod
def load(cls):
try:
users = list(User.objects.all()) # force evaluation to catch db errors
except Exception as e:
logger.warning(e)
return
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
with transaction.atomic():
# approve each user associated with `in review` status domains
DomainFixture._approve_domain_requests(users)
@staticmethod
def _approve_domain_requests(users):
"""Approves all provided domain requests if they are in the state in_review"""
for user in users:
domain_request = DomainRequest.objects.filter(
creator=user, status=DomainRequest.DomainRequestStatus.IN_REVIEW
).last()
logger.debug(f"Approving {domain_request} for {user}")
# All approvals require an investigator, so if there is none,
# assign one.
if domain_request.investigator is None:
# All "users" in fixtures have admin perms per prior config.
# No need to check for that.
domain_request.investigator = random.choice(users) # nosec
domain_request.approve(send_email=False)
domain_request.save()

View file

@ -1,11 +1,13 @@
import logging
from django.core.management.base import BaseCommand
from auditlog.context import disable_auditlog # type: ignore
from registrar.fixtures_users import UserFixture
from registrar.fixtures_domain_requests import DomainRequestFixture, DomainFixture
from auditlog.context import disable_auditlog
from registrar.fixtures.fixtures_domains import DomainFixture
from registrar.fixtures.fixtures_portfolios import PortfolioFixture
from registrar.fixtures.fixtures_requests import DomainRequestFixture
from registrar.fixtures.fixtures_suborganizations import SuborganizationFixture
from registrar.fixtures.fixtures_user_portfolio_permissions import UserPortfolioPermissionFixture
from registrar.fixtures.fixtures_users import UserFixture # type: ignore
logger = logging.getLogger(__name__)
@ -16,6 +18,9 @@ class Command(BaseCommand):
# https://github.com/jazzband/django-auditlog/issues/17
with disable_auditlog():
UserFixture.load()
PortfolioFixture.load()
SuborganizationFixture.load()
DomainRequestFixture.load()
DomainFixture.load()
UserPortfolioPermissionFixture.load()
logger.info("All fixtures loaded.")

View file

@ -36,13 +36,13 @@ class Command(BaseCommand, PopulateScriptTemplate):
self.federal_agency_dict[agency_name.strip()] = (initials, agency_status)
# Update every federal agency record
self.mass_update_records(FederalAgency, {"agency__isnull": False}, ["initials", "is_fceb"])
self.mass_update_records(FederalAgency, {"agency__isnull": False}, ["acronym", "is_fceb"])
def update_record(self, record: FederalAgency):
"""For each record, update the initials and is_fceb field if data exists for it"""
initials, agency_status = self.federal_agency_dict.get(record.agency)
record.initials = initials
record.acronym = initials
if agency_status and isinstance(agency_status, str) and agency_status.strip().upper() == "FCEB":
record.is_fceb = True
else:

View file

@ -0,0 +1,40 @@
# Generated by Django 4.2.10 on 2024-09-25 00:49
import django.contrib.postgres.fields
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("registrar", "0128_alter_domaininformation_state_territory_and_more"),
]
operations = [
migrations.AlterField(
model_name="portfolioinvitation",
name="portfolio_roles",
field=django.contrib.postgres.fields.ArrayField(
base_field=models.CharField(
choices=[("organization_admin", "Admin"), ("organization_member", "Member")], max_length=50
),
blank=True,
help_text="Select one or more roles.",
null=True,
size=None,
),
),
migrations.AlterField(
model_name="userportfoliopermission",
name="roles",
field=django.contrib.postgres.fields.ArrayField(
base_field=models.CharField(
choices=[("organization_admin", "Admin"), ("organization_member", "Member")], max_length=50
),
blank=True,
help_text="Select one or more roles.",
null=True,
size=None,
),
),
]

View file

@ -0,0 +1,146 @@
# Generated by Django 4.2.10 on 2024-09-30 17:59
from django.db import migrations, models
import django.db.models.deletion
import registrar.models.federal_agency
class Migration(migrations.Migration):
dependencies = [
("registrar", "0129_alter_portfolioinvitation_portfolio_roles_and_more"),
]
operations = [
migrations.RenameField(
model_name="federalagency",
old_name="initials",
new_name="acronym",
),
migrations.AlterField(
model_name="federalagency",
name="acronym",
field=models.CharField(
blank=True,
help_text="Acronym commonly used to reference the federal agency (Optional)",
max_length=10,
null=True,
),
),
migrations.AlterField(
model_name="domaininformation",
name="portfolio",
field=models.ForeignKey(
blank=True,
help_text="If blank, domain is not associated with a portfolio.",
null=True,
on_delete=django.db.models.deletion.PROTECT,
related_name="information_portfolio",
to="registrar.portfolio",
),
),
migrations.AlterField(
model_name="domaininformation",
name="sub_organization",
field=models.ForeignKey(
blank=True,
help_text="If blank, domain is associated with the overarching organization for this portfolio.",
null=True,
on_delete=django.db.models.deletion.PROTECT,
related_name="information_sub_organization",
to="registrar.suborganization",
verbose_name="Suborganization",
),
),
migrations.AlterField(
model_name="domainrequest",
name="portfolio",
field=models.ForeignKey(
blank=True,
help_text="If blank, request is not associated with a portfolio.",
null=True,
on_delete=django.db.models.deletion.PROTECT,
related_name="DomainRequest_portfolio",
to="registrar.portfolio",
),
),
migrations.AlterField(
model_name="domainrequest",
name="sub_organization",
field=models.ForeignKey(
blank=True,
help_text="If blank, request is associated with the overarching organization for this portfolio.",
null=True,
on_delete=django.db.models.deletion.PROTECT,
related_name="request_sub_organization",
to="registrar.suborganization",
verbose_name="Suborganization",
),
),
migrations.AlterField(
model_name="federalagency",
name="federal_type",
field=models.CharField(
blank=True,
choices=[("executive", "Executive"), ("judicial", "Judicial"), ("legislative", "Legislative")],
max_length=20,
null=True,
),
),
migrations.AlterField(
model_name="federalagency",
name="is_fceb",
field=models.BooleanField(
blank=True, help_text="Federal Civilian Executive Branch (FCEB)", null=True, verbose_name="FCEB"
),
),
migrations.AlterField(
model_name="portfolio",
name="federal_agency",
field=models.ForeignKey(
default=registrar.models.federal_agency.FederalAgency.get_non_federal_agency,
on_delete=django.db.models.deletion.PROTECT,
to="registrar.federalagency",
),
),
migrations.AlterField(
model_name="portfolio",
name="organization_name",
field=models.CharField(blank=True, null=True),
),
migrations.AlterField(
model_name="portfolio",
name="organization_type",
field=models.CharField(
blank=True,
choices=[
("federal", "Federal"),
("interstate", "Interstate"),
("state_or_territory", "State or territory"),
("tribal", "Tribal"),
("county", "County"),
("city", "City"),
("special_district", "Special district"),
("school_district", "School district"),
],
max_length=255,
null=True,
),
),
migrations.AlterField(
model_name="portfolio",
name="senior_official",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.PROTECT,
related_name="portfolios",
to="registrar.seniorofficial",
),
),
migrations.AlterField(
model_name="suborganization",
name="name",
field=models.CharField(max_length=1000, unique=True, verbose_name="Suborganization"),
),
]

View file

@ -0,0 +1,37 @@
# This migration creates the create_full_access_group and create_cisa_analyst_group groups
# It is dependent on 0079 (which populates federal agencies)
# If permissions on the groups need changing, edit CISA_ANALYST_GROUP_PERMISSIONS
# in the user_group model then:
# [NOT RECOMMENDED]
# step 1: docker-compose exec app ./manage.py migrate --fake registrar 0035_contenttypes_permissions
# step 2: docker-compose exec app ./manage.py migrate registrar 0036_create_groups
# step 3: fake run the latest migration in the migrations list
# [RECOMMENDED]
# Alternatively:
# step 1: duplicate the migration that loads data
# step 2: docker-compose exec app ./manage.py migrate
from django.db import migrations
from registrar.models import UserGroup
from typing import Any
# For linting: RunPython expects a function reference,
# so let's give it one
def create_groups(apps, schema_editor) -> Any:
UserGroup.create_cisa_analyst_group(apps, schema_editor)
UserGroup.create_full_access_group(apps, schema_editor)
class Migration(migrations.Migration):
dependencies = [
("registrar", "0130_remove_federalagency_initials_federalagency_acronym_and_more"),
]
operations = [
migrations.RunPython(
create_groups,
reverse_code=migrations.RunPython.noop,
atomic=True,
),
]

View file

@ -0,0 +1,36 @@
# Generated by Django 4.2.10 on 2024-10-02 14:17
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
("registrar", "0131_create_groups_v17"),
]
operations = [
migrations.AlterField(
model_name="domaininformation",
name="portfolio",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.PROTECT,
related_name="information_portfolio",
to="registrar.portfolio",
),
),
migrations.AlterField(
model_name="domainrequest",
name="portfolio",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.PROTECT,
related_name="DomainRequest_portfolio",
to="registrar.portfolio",
),
),
]

View file

@ -63,7 +63,6 @@ class DomainInformation(TimeStampedModel):
null=True,
blank=True,
related_name="information_portfolio",
help_text="Portfolio associated with this domain",
)
sub_organization = models.ForeignKey(
@ -72,7 +71,8 @@ class DomainInformation(TimeStampedModel):
null=True,
blank=True,
related_name="information_sub_organization",
help_text="The suborganization that this domain is included under",
help_text="If blank, domain is associated with the overarching organization for this portfolio.",
verbose_name="Suborganization",
)
domain_request = models.OneToOneField(

View file

@ -332,7 +332,6 @@ class DomainRequest(TimeStampedModel):
null=True,
blank=True,
related_name="DomainRequest_portfolio",
help_text="Portfolio associated with this domain request",
)
sub_organization = models.ForeignKey(
@ -341,7 +340,8 @@ class DomainRequest(TimeStampedModel):
null=True,
blank=True,
related_name="request_sub_organization",
help_text="The suborganization that this domain request is included under",
help_text="If blank, request is associated with the overarching organization for this portfolio.",
verbose_name="Suborganization",
)
# This is the domain request user who created this domain request.

View file

@ -22,21 +22,20 @@ class FederalAgency(TimeStampedModel):
choices=BranchChoices.choices,
null=True,
blank=True,
help_text="Federal agency type (executive, judicial, legislative, etc.)",
)
initials = models.CharField(
acronym = models.CharField(
max_length=10,
null=True,
blank=True,
help_text="Agency initials",
help_text="Acronym commonly used to reference the federal agency (Optional)",
)
is_fceb = models.BooleanField(
null=True,
blank=True,
verbose_name="FCEB",
help_text="Determines if this agency is FCEB",
help_text="Federal Civilian Executive Branch (FCEB)",
)
def __str__(self) -> str:

View file

@ -2,7 +2,6 @@ from django.db import models
from registrar.models.domain_request import DomainRequest
from registrar.models.federal_agency import FederalAgency
from registrar.utility.constants import BranchChoices
from .utility.time_stamped_model import TimeStampedModel
@ -34,7 +33,6 @@ class Portfolio(TimeStampedModel):
organization_name = models.CharField(
null=True,
blank=True,
verbose_name="Portfolio organization",
)
organization_type = models.CharField(
@ -42,7 +40,6 @@ class Portfolio(TimeStampedModel):
choices=OrganizationChoices.choices,
null=True,
blank=True,
help_text="Type of organization",
)
notes = models.TextField(
@ -53,7 +50,6 @@ class Portfolio(TimeStampedModel):
federal_agency = models.ForeignKey(
"registrar.FederalAgency",
on_delete=models.PROTECT,
help_text="Associated federal agency",
unique=False,
default=FederalAgency.get_non_federal_agency,
)
@ -64,6 +60,7 @@ class Portfolio(TimeStampedModel):
unique=False,
null=True,
blank=True,
related_name="portfolios",
)
address_line1 = models.CharField(
@ -125,23 +122,6 @@ class Portfolio(TimeStampedModel):
super().save(*args, **kwargs)
@property
def portfolio_type(self):
"""
Returns a combination of organization_type / federal_type, seperated by ' - '.
If no federal_type is found, we just return the org type.
"""
return self.get_portfolio_type(self.organization_type, self.federal_type)
@classmethod
def get_portfolio_type(cls, organization_type, federal_type):
org_type_label = cls.OrganizationChoices.get_org_label(organization_type)
agency_type_label = BranchChoices.get_branch_label(federal_type)
if organization_type == cls.OrganizationChoices.FEDERAL and agency_type_label:
return " - ".join([org_type_label, agency_type_label])
else:
return org_type_label
@property
def federal_type(self):
"""Returns the federal_type value on the underlying federal_agency field"""
@ -152,13 +132,19 @@ class Portfolio(TimeStampedModel):
return federal_agency.federal_type if federal_agency else None
# == Getters for domains == #
def get_domains(self):
def get_domains(self, order_by=None):
"""Returns all DomainInformations associated with this portfolio"""
if not order_by:
return self.information_portfolio.all()
else:
return self.information_portfolio.all().order_by(*order_by)
def get_domain_requests(self):
def get_domain_requests(self, order_by=None):
"""Returns all DomainRequests associated with this portfolio"""
if not order_by:
return self.DomainRequest_portfolio.all()
else:
return self.DomainRequest_portfolio.all().order_by(*order_by)
# == Getters for suborganization == #
def get_suborganizations(self):

View file

@ -10,7 +10,7 @@ class Suborganization(TimeStampedModel):
name = models.CharField(
unique=True,
max_length=1000,
help_text="Suborganization",
verbose_name="Suborganization",
)
portfolio = models.ForeignKey(

View file

@ -66,6 +66,30 @@ class UserGroup(Group):
"model": "federalagency",
"permissions": ["add_federalagency", "change_federalagency", "delete_federalagency"],
},
{
"app_label": "registrar",
"model": "portfolio",
"permissions": ["add_portfolio", "change_portfolio", "delete_portfolio"],
},
{
"app_label": "registrar",
"model": "suborganization",
"permissions": ["add_suborganization", "change_suborganization", "delete_suborganization"],
},
{
"app_label": "registrar",
"model": "seniorofficial",
"permissions": ["add_seniorofficial", "change_seniorofficial", "delete_seniorofficial"],
},
{
"app_label": "registrar",
"model": "userportfoliopermission",
"permissions": [
"add_userportfoliopermission",
"change_userportfoliopermission",
"delete_userportfoliopermission",
],
},
]
# Avoid error: You can't execute queries until the end
@ -113,7 +137,6 @@ class UserGroup(Group):
+ cisa_analysts_group.name
)
cisa_analysts_group.save()
logger.debug("CISA Analyst permissions added to group " + cisa_analysts_group.name)
except Exception as e:
logger.error(f"Error creating analyst permissions group: {e}")
@ -135,7 +158,6 @@ class UserGroup(Group):
# Assign all permissions to the group
full_access_group.permissions.add(*all_permissions)
full_access_group.save()
logger.debug("All permissions added to group " + full_access_group.name)
except Exception as e:
logger.error(f"Error creating full access group: {e}")

View file

@ -15,8 +15,6 @@ class UserPortfolioPermission(TimeStampedModel):
PORTFOLIO_ROLE_PERMISSIONS = {
UserPortfolioRoleChoices.ORGANIZATION_ADMIN: [
UserPortfolioPermissionChoices.VIEW_ALL_DOMAINS,
UserPortfolioPermissionChoices.VIEW_MEMBERS,
UserPortfolioPermissionChoices.EDIT_MEMBERS,
UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS,
UserPortfolioPermissionChoices.EDIT_REQUESTS,
UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
@ -25,14 +23,6 @@ class UserPortfolioPermission(TimeStampedModel):
UserPortfolioPermissionChoices.VIEW_SUBORGANIZATION,
UserPortfolioPermissionChoices.EDIT_SUBORGANIZATION,
],
UserPortfolioRoleChoices.ORGANIZATION_ADMIN_READ_ONLY: [
UserPortfolioPermissionChoices.VIEW_ALL_DOMAINS,
UserPortfolioPermissionChoices.VIEW_MEMBERS,
UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS,
UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
# Domain: field specific permissions
UserPortfolioPermissionChoices.VIEW_SUBORGANIZATION,
],
UserPortfolioRoleChoices.ORGANIZATION_MEMBER: [
UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
],
@ -75,7 +65,19 @@ class UserPortfolioPermission(TimeStampedModel):
)
def __str__(self):
return f"User '{self.user}' on Portfolio '{self.portfolio}' " f"<Roles: {self.roles}>" if self.roles else ""
readable_roles = []
if self.roles:
readable_roles = self.get_readable_roles()
return f"{self.user}" f" <Roles: {', '.join(readable_roles)}>" if self.roles else ""
def get_readable_roles(self):
"""Returns a readable list of self.roles"""
readable_roles = []
if self.roles:
readable_roles = sorted(
[UserPortfolioRoleChoices.get_user_portfolio_role_label(role) for role in self.roles]
)
return readable_roles
def _get_portfolio_permissions(self):
"""
@ -103,7 +105,8 @@ class UserPortfolioPermission(TimeStampedModel):
existing_permissions = UserPortfolioPermission.objects.filter(user=self.user)
if not flag_is_active_for_user(self.user, "multiple_portfolios") and existing_permissions.exists():
raise ValidationError(
"Only one portfolio permission is allowed per user when multiple portfolios are disabled."
"This user is already assigned to a portfolio. "
"Based on current waffle flag settings, users cannot be assigned to multiple portfolios."
)
# Check if portfolio is set without accessing the related object.

View file

@ -334,3 +334,12 @@ def get_url_name(path):
except Resolver404:
logger.error(f"No matching URL name found for path: {path}")
return None
def value_of_attribute(obj, attribute_name: str):
"""Returns the value of getattr if the attribute isn't callable.
If it is, execute the underlying function and return that result instead."""
value = getattr(obj, attribute_name)
if callable(value):
value = value()
return value

View file

@ -7,9 +7,12 @@ class UserPortfolioRoleChoices(models.TextChoices):
"""
ORGANIZATION_ADMIN = "organization_admin", "Admin"
ORGANIZATION_ADMIN_READ_ONLY = "organization_admin_read_only", "Admin read only"
ORGANIZATION_MEMBER = "organization_member", "Member"
@classmethod
def get_user_portfolio_role_label(cls, user_portfolio_role):
return cls(user_portfolio_role).label if user_portfolio_role else None
class UserPortfolioPermissionChoices(models.TextChoices):
""" """
@ -29,3 +32,7 @@ class UserPortfolioPermissionChoices(models.TextChoices):
# Domain: field specific permissions
VIEW_SUBORGANIZATION = "view_suborganization", "View suborganization"
EDIT_SUBORGANIZATION = "edit_suborganization", "Edit suborganization"
@classmethod
def get_user_portfolio_permission_label(cls, user_portfolio_permission):
return cls(user_portfolio_permission).label if user_portfolio_permission else None

View file

@ -49,11 +49,15 @@ class CheckUserProfileMiddleware:
self.setup_page,
self.logout_page,
"/admin",
# These are here as there is a bug with this middleware that breaks djangos built in debug console.
# The debug console uses this directory, but since this overrides that, it throws errors.
"/__debug__",
]
self.other_excluded_pages = [
self.profile_page,
self.logout_page,
"/admin",
"/__debug__",
]
self.excluded_pages = {

View file

@ -39,7 +39,7 @@
None<br>
{% endif %}
{% else %}
{% elif not hide_no_contact_info_message %}
No additional contact information found.<br>
{% endif %}

View file

@ -119,7 +119,7 @@ This is using a custom implementation fieldset.html (see admin/fieldset.html)
{% endfor %}
{% endwith %}
</div>
{% elif field.field.name == "display_admins" or field.field.name == "domain_managers" or field.field.namd == "invited_domain_managers" %}
{% elif field.field.name == "domain_managers" or field.field.name == "invited_domain_managers" %}
<div class="readonly">{{ field.contents|safe }}</div>
{% elif field.field.name == "display_members" %}
<div class="readonly">
@ -376,13 +376,6 @@ This is using a custom implementation fieldset.html (see admin/fieldset.html)
</details>
{% endif %}
{% endwith %}
{% elif field.field.name == "display_members" and field.contents %}
<details class="margin-top-1 dja-detail-table" aria-role="button" open>
<summary class="padding-1 padding-left-0 dja-details-summary">Details</summary>
<div class="grid-container margin-left-0 padding-left-0 padding-right-0 dja-details-contents">
{{ field.contents|safe }}
</div>
</details>
{% elif field.field.name == "state_territory" and original_object|model_name_lowercase != 'portfolio' %}
<div class="flex-container margin-top-2">
<span>

View file

@ -0,0 +1,9 @@
{% comment %} This view provides a detail button that can be used to show/hide content {% endcomment %}
<details class="margin-top-1 dja-detail-table" aria-role="button" {% if start_open %}open{% else %}closed{% endif %}>
<summary class="padding-1 padding-left-0 dja-details-summary">Details</summary>
<div class="grid-container margin-left-0 padding-left-0 padding-right-0 dja-details-contents">
{% block detail_content %}
{% endblock detail_content%}
</div>
</details>

View file

@ -0,0 +1,48 @@
{% extends "django/admin/includes/details_button.html" %}
{% load static url_helpers %}
{% block detail_content %}
<table>
<thead>
<tr>
<th>Name</th>
<th>Title</th>
<th>Email</th>
<th>Phone</th>
</tr>
</thead>
<tbody>
{% for admin in admins %}
{% url 'admin:registrar_userportfoliopermission_change' admin.pk as url %}
<tr>
<td><a href={{url}}>{{ admin.user.get_formatted_name}}</a></td>
<td>{{ admin.user.title }}</td>
<td>
{% if admin.user.email %}
{{ admin.user.email }}
{% else %}
None
{% endif %}
</td>
<td>{{ admin.user.phone }}</td>
<td class="padding-left-1 text-size-small">
{% if admin.user.email %}
<input aria-hidden="true" class="display-none" value="{{ admin.user.email }}" />
<button
class="usa-button usa-button--unstyled padding-right-1 usa-button--icon button--clipboard copy-to-clipboard usa-button__small-text text-no-underline"
type="button"
>
<svg
class="usa-icon"
>
<use aria-hidden="true" xlink:href="{%static 'img/sprite.svg'%}#content_copy"></use>
</svg>
<span>Copy email</span>
</button>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endblock detail_content %}

View file

@ -0,0 +1,26 @@
{% extends "django/admin/includes/details_button.html" %}
{% load static url_helpers %}
{% block detail_content %}
<table>
<thead>
<tr>
<th>Name</th>
<th>Status</th>
</tr>
</thead>
<tbody>
{% for domain_request in domain_requests %}
{% url 'admin:registrar_domainrequest_change' domain_request.pk as url %}
<tr>
<td><a href={{url}}>{{ domain_request }}</a></td>
{% if domain_request.get_status_display %}
<td>{{ domain_request.get_status_display }}</td>
{% else %}
<td>None</td>
{% endif %}
</tr>
{% endfor %}
</tbody>
</table>
{% endblock detail_content %}

View file

@ -0,0 +1,30 @@
{% extends "django/admin/includes/details_button.html" %}
{% load static url_helpers %}
{% block detail_content %}
<table>
<thead>
<tr>
<th>Name</th>
<th>State</th>
</tr>
</thead>
<tbody>
{% for domain_info in domains %}
{% if domain_info.domain %}
{% with domain=domain_info.domain %}
{% url 'admin:registrar_domain_change' domain.pk as url %}
<tr>
<td><a href={{url}}>{{ domain }}</a></td>
{% if domain and domain.get_state_display %}
<td>{{ domain.get_state_display }}</td>
{% else %}
<td>None</td>
{% endif %}
</tr>
{% endwith %}
{% endif %}
{% endfor %}
</tbody>
</table>
{% endblock detail_content%}

View file

@ -0,0 +1,61 @@
{% extends "django/admin/includes/detail_table_fieldset.html" %}
{% load custom_filters %}
{% load static url_helpers %}
{% block field_readonly %}
{% if field.field.name == "display_admins" or field.field.name == "display_members" %}
<div class="readonly">{{ field.contents|safe }}</div>
{% elif field.field.name == "roles" %}
<div class="readonly">
{% if get_readable_roles %}
{{ get_readable_roles }}
{% else %}
<p>No roles found.</p>
{% endif %}
</div>
{% elif field.field.name == "additional_permissions" %}
<div class="readonly">
{% if display_permissions %}
{{ display_permissions }}
{% else %}
<p>No additional permissions found.</p>
{% endif %}
</div>
{% elif field.field.name == "senior_official" %}
{% if original_object.senior_official %}
<div class="readonly">{{ field.contents }}</div>
{% else %}
{% url "admin:registrar_seniorofficial_add" as url %}
<div class="readonly">
<a href={{ url }}>No senior official found. Create one now.</a>
</div>
{% endif %}
{% else %}
<div class="readonly">{{ field.contents }}</div>
{% endif %}
{% endblock field_readonly%}
{% block after_help_text %}
{% if field.field.name == "senior_official" %}
<div class="flex-container">
<label aria-label="Senior official contact details"></label>
{% include "django/admin/includes/contact_detail_list.html" with user=original_object.senior_official no_title_top_padding=field.is_readonly hide_no_contact_info_message=True %}
</div>
{% elif field.field.name == "display_admins" %}
{% if admins|length > 0 %}
{% include "django/admin/includes/portfolio/portfolio_admins_table.html" with admins=admins %}
{% endif %}
{% elif field.field.name == "display_members" %}
{% if members|length > 0 %}
{% include "django/admin/includes/portfolio/portfolio_members_table.html" with members=members %}
{% endif %}
{% elif field.field.name == "domains" %}
{% if domains|length > 0 %}
{% include "django/admin/includes/portfolio/portfolio_domains_table.html" with domains=domains %}
{% endif %}
{% elif field.field.name == "domain_requests" %}
{% if domain_requests|length > 0 %}
{% include "django/admin/includes/portfolio/portfolio_domain_requests_table.html" with domain_requests=domain_requests %}
{% endif %}
{% endif %}
{% endblock after_help_text %}

View file

@ -0,0 +1,55 @@
{% extends "django/admin/includes/details_button.html" %}
{% load custom_filters %}
{% load static url_helpers %}
{% block detail_content %}
<table>
<thead>
<tr>
<th>Name</th>
<th>Title</th>
<th>Email</th>
<th>Phone</th>
<th>Roles</th>
</tr>
</thead>
<tbody>
{% for member in members %}
{% url 'admin:registrar_userportfoliopermission_change' member.pk as url %}
<tr>
<td><a href={{url}}>{{ member.user.get_formatted_name}}</a></td>
<td>{{ member.user.title }}</td>
<td>
{% if member.user.email %}
{{ member.user.email }}
{% else %}
None
{% endif %}
</td>
<td>{{ member.user.phone }}</td>
<td>
{% for role in member.user|portfolio_role_summary:original %}
<span class="usa-tag">{{ role }}</span>
{% endfor %}
</td>
<td class="padding-left-1 text-size-small">
{% if member.user.email %}
<input aria-hidden="true" class="display-none" value="{{ member.user.email }}" />
<button
class="usa-button usa-button--unstyled padding-right-1 usa-button--icon button--clipboard copy-to-clipboard usa-button__small-text text-no-underline"
type="button"
>
<svg
class="usa-icon"
>
<use aria-hidden="true" xlink:href="{%static 'img/sprite.svg'%}#content_copy"></use>
</svg>
<span>Copy email</span>
</button>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endblock %}

View file

@ -8,19 +8,14 @@
<input id="senior_official_from_agency_json_url" class="display-none" value="{{url}}" />
{% url 'get-federal-and-portfolio-types-from-federal-agency-json' as url %}
<input id="federal_and_portfolio_types_from_agency_json_url" class="display-none" value="{{url}}" />
{% url "admin:registrar_seniorofficial_add" as url %}
<input id="senior-official-add-url" class="display-none" value="{{url}}" />
{{ block.super }}
{% endblock content %}
{% block field_sets %}
{% for fieldset in adminform %}
{% comment %}
This is a placeholder for now.
Disclaimer:
When extending the fieldset view consider whether you need to make a new one that extends from detail_table_fieldset.
detail_table_fieldset is used on multiple admin pages, so a change there can have unintended consequences.
{% endcomment %}
{% include "django/admin/includes/detail_table_fieldset.html" with original_object=original %}
{% include "django/admin/includes/portfolio/portfolio_fieldset.html" with original_object=original %}
{% endfor %}
{% endblock %}

View file

@ -8,6 +8,7 @@
<div class="mobile:grid-col-12 tablet:grid-col-6 desktop:grid-col-4">
<h3>Domain requests</h3>
<ul class="margin-0 padding-0">
{% if domains|length > 0 %}
{% for domain_request in domain_requests %}
<li>
<a href="{% url 'admin:registrar_domainrequest_change' domain_request.pk %}">
@ -16,11 +17,15 @@
({{ domain_request.status }})
</li>
{% endfor %}
{% else %}
<li>No domain requests.</li>
{% endif %}
</ul>
</div>
<div class="mobile:grid-col-12 tablet:grid-col-6 desktop:grid-col-4">
<h3>Domains</h3>
<ul class="margin-0 padding-0">
{% if domains|length > 0 %}
{% for domain in domains %}
<li>
<a href="{% url 'admin:registrar_domain_change' domain.pk %}">
@ -29,6 +34,9 @@
({{ domain.state }})
</li>
{% endfor %}
{% else %}
<li>No domains.</li>
{% endif %}
</ul>
</div>
</div>

View file

@ -17,26 +17,6 @@
{% endblock %}
{% block after_related_objects %}
{% if portfolios %}
<div class="module aligned padding-3">
<h2>Portfolio information</h2>
<div class="grid-row grid-gap mobile:padding-x-1 desktop:padding-x-4">
<div class="mobile:grid-col-12 tablet:grid-col-6 desktop:grid-col-4">
<h3>Portfolios</h3>
<ul class="margin-0 padding-0">
{% for portfolio in portfolios %}
<li>
<a href="{% url 'admin:registrar_portfolio_change' portfolio.pk %}">
{{ portfolio }}
</a>
</li>
{% endfor %}
</ul>
</div>
</div>
</div>
{% endif %}
<div class="module aligned padding-3">
<h2>Associated requests and domains</h2>
<div class="grid-row grid-gap mobile:padding-x-1 desktop:padding-x-4">

View file

@ -0,0 +1,16 @@
{% extends 'django/admin/email_clipboard_change_form.html' %}
{% load custom_filters %}
{% load i18n static %}
{% block field_sets %}
{% for fieldset in adminform %}
{% comment %}
This is a placeholder for now.
Disclaimer:
When extending the fieldset view consider whether you need to make a new one that extends from detail_table_fieldset.
detail_table_fieldset is used on multiple admin pages, so a change there can have unintended consequences.
{% endcomment %}
{% include "django/admin/includes/user_portfolio_permission_fieldset.html" with original_object=original %}
{% endfor %}
{% endblock %}

View file

@ -91,9 +91,9 @@
</li>
{% endif %}
{% if has_organization_members_flag %}
{% if has_organization_members_flag and has_view_members_portfolio_permission %}
<li class="usa-nav__primary-item">
<a href="#" class="usa-nav-link">
<a href="/members/" class="usa-nav-link {% if path|is_members_subpage %} usa-current{% endif %}">
Members
</a>
</li>

View file

@ -0,0 +1,80 @@
{% load static %}
<!-- Embedding the portfolio value in a data attribute -->
<span id="portfolio-js-value" class="display-none" data-portfolio="{{ portfolio.id }}"></span>
{% comment %} Stores the json endpoint in a url for easier access {% endcomment %}
{% url 'get_portfolio_members_json' as url %}
<span id="get_members_json_url" class="display-none">{{url}}</span>
<section class="section-outlined members margin-top-0 section-outlined--border-base-light" id="members">
<div class="section-outlined__header margin-bottom-3 grid-row">
<!-- ---------- SEARCH ---------- -->
<div class="section-outlined__search mobile:grid-col-12 desktop:grid-col-6">
<section aria-label="Members search component" class="margin-top-2">
<form class="usa-search usa-search--small" method="POST" role="search">
{% csrf_token %}
<button class="usa-button usa-button--unstyled margin-right-3 members__reset-search display-none" type="button">
<svg class="usa-icon" aria-hidden="true" focusable="false" role="img" width="24">
<use xlink:href="{%static 'img/sprite.svg'%}#close"></use>
</svg>
Reset
</button>
<label class="usa-sr-only" for="members__search-field">Search by member name</label>
<input
class="usa-input"
id="members__search-field"
type="search"
name="search"
placeholder="Search by member name"
/>
<button class="usa-button" type="submit" id="members__search-field-submit">
<img
src="{% static 'img/usa-icons-bg/search--white.svg' %}"
class="usa-search__submit-icon"
alt="Search"
/>
</button>
</form>
</section>
</div>
</div>
<!-- ---------- MAIN TABLE ---------- -->
<div class="members__table-wrapper display-none usa-table-container--scrollable margin-top-0" tabindex="0">
<table class="usa-table usa-table--borderless usa-table--stacked dotgov-table dotgov-table--stacked members__table">
<caption class="sr-only">Your registered members</caption>
<thead>
<tr>
<th data-sortable="member" scope="col" role="columnheader">Member</th>
<th data-sortable="last_active" scope="col" role="columnheader">Last Active</th>
<th
scope="col"
role="columnheader"
>
<span class="usa-sr-only">Action</span>
</th>
</tr>
</thead>
<tbody>
<!-- AJAX will populate this tbody -->
</tbody>
</table>
<div
class="usa-sr-only usa-table__announcement-region"
aria-live="polite"
></div>
</div>
<div class="members__no-data display-none">
<p>You don't have any members.</p>
</div>
<div class="members__no-search-results display-none">
<p>No results found</p>
</div>
</section>
<nav aria-label="Pagination" class="usa-pagination flex-justify" id="members-pagination">
<span class="usa-pagination__counter text-base-dark padding-left-2 margin-bottom-1">
<!-- Count will be dynamically populated by JS -->
</span>
<ul class="usa-pagination__list">
<!-- Pagination links will be dynamically populated by JS -->
</ul>
</nav>

View file

@ -0,0 +1,33 @@
{% extends 'portfolio_base.html' %}
{% load static %}
{% block title %} Members | {% endblock %}
{% block wrapper_class %}
{{ block.super }} dashboard--grey-1
{% endblock %}
{% block portfolio_content %}
{% block messages %}
{% include "includes/form_messages.html" %}
{% endblock %}
<div id="main-content">
<div class="grid-row grid-gap">
<div class="mobile:grid-col-12 tablet:grid-col-6">
<h1 id="members-header">Members</h1>
</div>
<div class="mobile:grid-col-12 tablet:grid-col-6">
<p class="float-right-tablet tablet:margin-y-0">
<a href="#" class="usa-button"
>
Add a new member
</a>
</p>
</div>
</div>
{% include "includes/members_table.html" with portfolio=portfolio %}
</div>
{% endblock %}

View file

@ -239,3 +239,23 @@ def is_portfolio_subpage(path):
"senior-official",
]
return get_url_name(path) in url_names
@register.filter(name="is_members_subpage")
def is_members_subpage(path):
"""Checks if the given page is a subpage of members.
Takes a path name, like '/organization/'."""
# Since our pages aren't unified under a common path, we need this approach for now.
url_names = [
"members",
]
return get_url_name(path) in url_names
@register.filter(name="portfolio_role_summary")
def portfolio_role_summary(user, portfolio):
"""Returns the value of user.portfolio_role_summary"""
if user and portfolio:
return user.portfolio_role_summary(portfolio)
else:
return []

View file

@ -2097,36 +2097,11 @@ class TestPortfolioAdmin(TestCase):
)
display_admins = self.admin.display_admins(self.portfolio)
self.assertIn(
f'<a href="/admin/registrar/user/{admin_user_1.pk}/change/">Gerald Meoward meaoward@gov.gov</a>',
display_admins,
)
self.assertIn("Captain", display_admins)
self.assertIn(
f'<a href="/admin/registrar/user/{admin_user_2.pk}/change/">Arnold Poopy poopy@gov.gov</a>', display_admins
)
self.assertIn("Major", display_admins)
display_members_summary = self.admin.display_members_summary(self.portfolio)
self.assertIn(
f'<a href="/admin/registrar/user/{admin_user_3.pk}/change/">Mad Max madmax@gov.gov</a>',
display_members_summary,
)
self.assertIn(
f'<a href="/admin/registrar/user/{admin_user_4.pk}/change/">Agent Smith thematrix@gov.gov</a>',
display_members_summary,
)
url = reverse("admin:registrar_userportfoliopermission_changelist") + f"?portfolio={self.portfolio.id}"
self.assertIn(f'<a href="{url}">2 administrators</a>', display_admins)
display_members = self.admin.display_members(self.portfolio)
self.assertIn("Mad Max", display_members)
self.assertIn("<span class='usa-tag'>Member</span>", display_members)
self.assertIn("Road warrior", display_members)
self.assertIn("Agent Smith", display_members)
self.assertIn("<span class='usa-tag'>Domain requestor</span>", display_members)
self.assertIn("Program", display_members)
self.assertIn(f'<a href="{url}">2 members</a>', display_members)
class TestTransferUser(WebTest):

View file

@ -100,7 +100,6 @@ class GetFederalPortfolioTypeJsonTest(TestCase):
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertEqual(data["federal_type"], "Judicial")
self.assertEqual(data["portfolio_type"], "Federal - Judicial")
@less_console_noise_decorator
def test_get_federal_and_portfolio_types_json_authenticated_regularuser(self):

View file

@ -1387,18 +1387,18 @@ class TestPopulateFederalAgencyInitialsAndFceb(TestCase):
self.agency4.refresh_from_db()
# Check if FederalAgency objects were updated correctly
self.assertEqual(self.agency1.initials, "ABMC")
self.assertEqual(self.agency1.acronym, "ABMC")
self.assertTrue(self.agency1.is_fceb)
self.assertEqual(self.agency2.initials, "ACHP")
self.assertEqual(self.agency2.acronym, "ACHP")
self.assertTrue(self.agency2.is_fceb)
# We expect that this field doesn't have any data,
# as none is specified in the CSV
self.assertIsNone(self.agency3.initials)
self.assertIsNone(self.agency3.acronym)
self.assertIsNone(self.agency3.is_fceb)
self.assertEqual(self.agency4.initials, "KC")
self.assertEqual(self.agency4.acronym, "KC")
self.assertFalse(self.agency4.is_fceb)
@less_console_noise_decorator
@ -1411,7 +1411,7 @@ class TestPopulateFederalAgencyInitialsAndFceb(TestCase):
# Verify that the missing agency was not updated
missing_agency.refresh_from_db()
self.assertIsNone(missing_agency.initials)
self.assertIsNone(missing_agency.acronym)
self.assertIsNone(missing_agency.is_fceb)

View file

@ -40,10 +40,22 @@ class TestGroups(TestCase):
"add_federalagency",
"change_federalagency",
"delete_federalagency",
"add_portfolio",
"change_portfolio",
"delete_portfolio",
"add_seniorofficial",
"change_seniorofficial",
"delete_seniorofficial",
"add_suborganization",
"change_suborganization",
"delete_suborganization",
"analyst_access_permission",
"change_user",
"delete_userdomainrole",
"view_userdomainrole",
"add_userportfoliopermission",
"change_userportfoliopermission",
"delete_userportfoliopermission",
"add_verifiedbystaff",
"change_verifiedbystaff",
"delete_verifiedbystaff",
@ -51,6 +63,7 @@ class TestGroups(TestCase):
# Get the codenames of actual permissions associated with the group
actual_permissions = [p.codename for p in cisa_analysts_group.permissions.all()]
self.maxDiff = None
# Assert that the actual permissions match the expected permissions
self.assertListEqual(actual_permissions, expected_permissions)

View file

@ -1332,7 +1332,10 @@ class TestUserPortfolioPermission(TestCase):
self.assertEqual(
cm.exception.message,
"Only one portfolio permission is allowed per user when multiple portfolios are disabled.",
(
"This user is already assigned to a portfolio. "
"Based on current waffle flag settings, users cannot be assigned to multiple portfolios."
),
)

View file

@ -356,11 +356,6 @@ class ExportDataTest(MockDbForIndividualTests, MockEppLib):
self.assertIn(self.domain_3.name, csv_content)
self.assertNotIn(self.domain_2.name, csv_content)
# Test the output for readonly admin
portfolio_permission.roles = [UserPortfolioRoleChoices.ORGANIZATION_ADMIN_READ_ONLY]
portfolio_permission.save()
portfolio_permission.refresh_from_db()
# Get the csv content
csv_content = self._run_domain_data_type_user_export(request)
self.assertIn(self.domain_1.name, csv_content)

View file

@ -1568,7 +1568,7 @@ class TestDomainSuborganization(TestDomainOverview):
# Add portfolio perms to the user object
portfolio_permission, _ = UserPortfolioPermission.objects.get_or_create(
user=self.user, portfolio=portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN_READ_ONLY]
user=self.user, portfolio=portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER]
)
self.assertEqual(self.domain_information.sub_organization, suborg)

View file

@ -0,0 +1,175 @@
from django.urls import reverse
from registrar.models.portfolio import Portfolio
from registrar.models.user import User
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from .test_views import TestWithUser
from django_webtest import WebTest # type: ignore
class GetPortfolioMembersJsonTest(TestWithUser, WebTest):
@classmethod
def setUpClass(cls):
super().setUpClass()
# Create additional users
cls.user2 = User.objects.create(
username="test_user2",
first_name="Second",
last_name="User",
email="second@example.com",
phone="8003112345",
title="Member",
)
cls.user3 = User.objects.create(
username="test_user3",
first_name="Third",
last_name="User",
email="third@example.com",
phone="8003113456",
title="Member",
)
cls.user4 = User.objects.create(
username="test_user4",
first_name="Fourth",
last_name="User",
email="fourth@example.com",
phone="8003114567",
title="Admin",
)
# Create Portfolio
cls.portfolio = Portfolio.objects.create(creator=cls.user, organization_name="Test Portfolio")
# Assign permissions
UserPortfolioPermission.objects.create(
user=cls.user,
portfolio=cls.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_MEMBERS,
UserPortfolioPermissionChoices.EDIT_MEMBERS,
],
)
UserPortfolioPermission.objects.create(
user=cls.user2,
portfolio=cls.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
UserPortfolioPermission.objects.create(
user=cls.user3,
portfolio=cls.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
UserPortfolioPermission.objects.create(
user=cls.user4,
portfolio=cls.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
)
def setUp(self):
super().setUp()
self.app.set_user(self.user.username)
def test_get_portfolio_members_json_authenticated(self):
"""Test that portfolio members are returned properly for an authenticated user."""
response = self.app.get(reverse("get_portfolio_members_json"), params={"portfolio": self.portfolio.id})
self.assertEqual(response.status_code, 200)
data = response.json
# Check pagination info
self.assertEqual(data["page"], 1)
self.assertFalse(data["has_previous"])
self.assertFalse(data["has_next"])
self.assertEqual(data["num_pages"], 1)
self.assertEqual(data["total"], 4)
self.assertEqual(data["unfiltered_total"], 4)
# Check the number of members
self.assertEqual(len(data["members"]), 4)
# Check member fields
expected_emails = {self.user.email, self.user2.email, self.user3.email, self.user4.email}
actual_emails = {member["email"] for member in data["members"]}
self.assertEqual(expected_emails, actual_emails)
def test_pagination(self):
"""Test that pagination works properly when there are more members than page size."""
# Create additional members to exceed page size of 10
for i in range(5, 15):
user, _ = User.objects.get_or_create(
username=f"test_user{i}",
first_name=f"User{i}",
last_name=f"Last{i}",
email=f"user{i}@example.com",
phone=f"80031156{i}",
title="Member",
)
UserPortfolioPermission.objects.create(
user=user,
portfolio=self.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
response = self.app.get(
reverse("get_portfolio_members_json"), params={"portfolio": self.portfolio.id, "page": 1}
)
self.assertEqual(response.status_code, 200)
data = response.json
# Check pagination info
self.assertEqual(data["page"], 1)
self.assertTrue(data["has_next"])
self.assertFalse(data["has_previous"])
self.assertEqual(data["num_pages"], 2)
self.assertEqual(data["total"], 14)
self.assertEqual(data["unfiltered_total"], 14)
# Check the number of members on page 1
self.assertEqual(len(data["members"]), 10)
response = self.app.get(
reverse("get_portfolio_members_json"), params={"portfolio": self.portfolio.id, "page": 2}
)
self.assertEqual(response.status_code, 200)
data = response.json
# Check pagination info for page 2
self.assertEqual(data["page"], 2)
self.assertFalse(data["has_next"])
self.assertTrue(data["has_previous"])
self.assertEqual(data["num_pages"], 2)
# Check the number of members on page 2
self.assertEqual(len(data["members"]), 4)
def test_search(self):
"""Test search functionality for portfolio members."""
# Search by name
response = self.app.get(
reverse("get_portfolio_members_json"), params={"portfolio": self.portfolio.id, "search_term": "Second"}
)
self.assertEqual(response.status_code, 200)
data = response.json
self.assertEqual(len(data["members"]), 1)
self.assertEqual(data["members"][0]["name"], "Second User")
self.assertEqual(data["members"][0]["email"], "second@example.com")
# Search by email
response = self.app.get(
reverse("get_portfolio_members_json"),
params={"portfolio": self.portfolio.id, "search_term": "fourth@example.com"},
)
self.assertEqual(response.status_code, 200)
data = response.json
self.assertEqual(len(data["members"]), 1)
self.assertEqual(data["members"][0]["email"], "fourth@example.com")
# Search with no matching results
response = self.app.get(
reverse("get_portfolio_members_json"), params={"portfolio": self.portfolio.id, "search_term": "NonExistent"}
)
self.assertEqual(response.status_code, 200)
data = response.json
self.assertEqual(len(data["members"]), 0)

View file

@ -10,6 +10,7 @@ from registrar.models import (
UserDomainRole,
User,
)
from registrar.models.user_group import UserGroup
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from .common import MockSESClient, completed_domain_request, create_test_user
@ -666,6 +667,195 @@ class TestPortfolio(WebTest):
self.assertContains(home, "Hotel California")
self.assertContains(home, "Members")
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_cannot_view_members_table(self):
"""Test that user without proper permission is denied access to members view"""
# Users can only view the members table if they have
# Portfolio Permission "view_members" selected.
# NOTE: Admins, by default, do NOT have permission
# to view/edit members. This must be enabled explicitly
# in the "additional permissions" section for a portfolio
# permission.
#
# Scenarios to test include;
# (1) - User is not admin and can view portfolio, but not the members table
# (1) - User is admin and can view portfolio, but not the members table
# --- non-admin
self.app.set_user(self.user.username)
UserPortfolioPermission.objects.get_or_create(
user=self.user,
portfolio=self.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
],
)
# Verify that the user cannot access the members page
# This will redirect the user to the members page.
self.client.force_login(self.user)
response = self.client.get(reverse("members"), follow=True)
# Assert the response is a 403 Forbidden
self.assertEqual(response.status_code, 403)
# --- admin
UserPortfolioPermission.objects.filter(user=self.user, portfolio=self.portfolio).update(
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
)
# Verify that the user cannot access the members page
# This will redirect the user to the members page.
response = self.client.get(reverse("members"), follow=True)
# Assert the response is a 403 Forbidden
self.assertEqual(response.status_code, 403)
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_can_view_members_table(self):
"""Test that user with proper permission is able to access members view"""
self.app.set_user(self.user.username)
UserPortfolioPermission.objects.get_or_create(
user=self.user,
portfolio=self.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
UserPortfolioPermissionChoices.VIEW_MEMBERS,
],
)
# Verify that the user can access the members page
# This will redirect the user to the members page.
self.client.force_login(self.user)
response = self.client.get(reverse("members"), follow=True)
# Make sure the page loaded
self.assertEqual(response.status_code, 200)
# ---- Useful debugging stub to see what "assertContains" is finding
# pattern = r'Members'
# matches = re.findall(pattern, response.content.decode('utf-8'))
# for match in matches:
# TerminalHelper.colorful_logger(logger.info, TerminalColors.OKCYAN, f"{match}")
# Make sure the page loaded
self.assertContains(response, "Members")
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_can_manage_members(self):
"""Test that user with proper permission is able to manage members"""
user = self.user
self.app.set_user(user.username)
# give user permissions to view AND manage members
UserPortfolioPermission.objects.get_or_create(
user=self.user,
portfolio=self.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
UserPortfolioPermissionChoices.VIEW_MEMBERS,
UserPortfolioPermissionChoices.EDIT_MEMBERS,
],
)
# Give user permissions to modify user objects in the DB
group, _ = UserGroup.objects.get_or_create(name="full_access_group")
# Add the user to the group
user.groups.set([group])
# Verify that the user can access the members page
# This will redirect the user to the members page.
self.client.force_login(self.user)
response = self.client.get(reverse("members"), follow=True)
# Make sure the page loaded
self.assertEqual(response.status_code, 200)
# Verify that manage settings are sent in the dynamic HTML
self.client.force_login(self.user)
response = self.client.get(reverse("get_portfolio_members_json") + f"?portfolio={self.portfolio.pk}")
self.assertContains(response, '"action_label": "Manage"')
self.assertContains(response, '"svg_icon": "settings"')
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_view_only_members(self):
"""Test that user with view only permission settings can only
view members (not manage them)"""
user = self.user
self.app.set_user(user.username)
# give user permissions to view AND manage members
UserPortfolioPermission.objects.get_or_create(
user=self.user,
portfolio=self.portfolio,
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
UserPortfolioPermissionChoices.VIEW_MEMBERS,
],
)
# Give user permissions to modify user objects in the DB
group, _ = UserGroup.objects.get_or_create(name="full_access_group")
# Add the user to the group
user.groups.set([group])
# Verify that the user can access the members page
# This will redirect the user to the members page.
self.client.force_login(self.user)
response = self.client.get(reverse("members"), follow=True)
# Make sure the page loaded
self.assertEqual(response.status_code, 200)
# Verify that view-only settings are sent in the dynamic HTML
response = self.client.get(reverse("get_portfolio_members_json") + f"?portfolio={self.portfolio.pk}")
print(response.content)
self.assertContains(response, '"action_label": "View"')
self.assertContains(response, '"svg_icon": "visibility"')
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_members_admin_detection(self):
"""Test that user with proper permission is able to manage members"""
user = self.user
self.app.set_user(user.username)
# give user permissions to view AND manage members
UserPortfolioPermission.objects.get_or_create(
user=self.user,
portfolio=self.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
UserPortfolioPermissionChoices.VIEW_MEMBERS,
UserPortfolioPermissionChoices.EDIT_MEMBERS,
],
)
# Give user permissions to modify user objects in the DB
group, _ = UserGroup.objects.get_or_create(name="full_access_group")
# Add the user to the group
user.groups.set([group])
# Verify that the user can access the members page
# This will redirect the user to the members page.
self.client.force_login(self.user)
response = self.client.get(reverse("members"), follow=True)
# Make sure the page loaded
self.assertEqual(response.status_code, 200)
# Verify that admin info is sent in the dynamic HTML
response = self.client.get(reverse("get_portfolio_members_json") + f"?portfolio={self.portfolio.pk}")
# TerminalHelper.colorful_logger(logger.info, TerminalColors.OKCYAN, f"{response.content}")
self.assertContains(response, '"is_admin": true')
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
def test_portfolio_domain_requests_page_when_user_has_no_permissions(self):

View file

@ -1,5 +1,9 @@
from registrar.models.domain_request import DomainRequest
from django.template.loader import get_template
from django.utils.html import format_html
from django.urls import reverse
from django.utils.html import escape
from registrar.models.utility.generic_helper import value_of_attribute
def get_action_needed_reason_default_email(domain_request, action_needed_reason):
@ -37,3 +41,56 @@ def _get_default_email(domain_request, file_path, reason, excluded_reasons=None)
email_body_text_cleaned = email_body_text.strip().lstrip("\n") if email_body_text else None
return email_body_text_cleaned
def get_field_links_as_list(
queryset,
model_name,
attribute_name=None,
link_info_attribute=None,
separator=None,
msg_for_none="-",
):
"""
Generate HTML links for items in a queryset, using a specified attribute for link text.
Args:
queryset: The queryset of items to generate links for.
model_name: The model name used to construct the admin change URL.
attribute_name: The attribute or method name to use for link text. If None, the item itself is used.
link_info_attribute: Appends f"({value_of_attribute})" to the end of the link.
separator: The separator to use between links in the resulting HTML.
If none, an unordered list is returned.
msg_for_none: What to return when the field would otherwise display None.
Defaults to `-`.
Returns:
A formatted HTML string with links to the admin change pages for each item.
"""
links = []
for item in queryset:
# This allows you to pass in attribute_name="get_full_name" for instance.
if attribute_name:
item_display_value = value_of_attribute(item, attribute_name)
else:
item_display_value = item
if item_display_value:
change_url = reverse(f"admin:registrar_{model_name}_change", args=[item.pk])
link = f'<a href="{change_url}">{escape(item_display_value)}</a>'
if link_info_attribute:
link += f" ({value_of_attribute(item, link_info_attribute)})"
if separator:
links.append(link)
else:
links.append(f"<li>{link}</li>")
# If no separator is specified, just return an unordered list.
if separator:
return format_html(separator.join(links)) if links else msg_for_none
else:
links = "".join(links)
return format_html(f'<ul class="add-list-reset">{links}</ul>') if links else msg_for_none

View file

@ -0,0 +1,125 @@
from django.http import JsonResponse
from django.core.paginator import Paginator
from django.contrib.auth.decorators import login_required
from django.db.models import Q
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user import User
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
@login_required
def get_portfolio_members_json(request):
"""Given the current request,
get all members that are associated with the given portfolio"""
portfolio = request.GET.get("portfolio")
member_ids = get_member_ids_from_request(request, portfolio)
objects = User.objects.filter(id__in=member_ids)
admin_ids = UserPortfolioPermission.objects.filter(
portfolio=portfolio,
roles__overlap=[
UserPortfolioRoleChoices.ORGANIZATION_ADMIN,
],
).values_list("user__id", flat=True)
portfolio_invitation_emails = PortfolioInvitation.objects.filter(portfolio=portfolio).values_list(
"email", flat=True
)
unfiltered_total = objects.count()
objects = apply_search(objects, request)
# objects = apply_status_filter(objects, request)
objects = apply_sorting(objects, request)
paginator = Paginator(objects, 10)
page_number = request.GET.get("page", 1)
page_obj = paginator.get_page(page_number)
members = [
serialize_members(request, portfolio, member, request.user, admin_ids, portfolio_invitation_emails)
for member in page_obj.object_list
]
return JsonResponse(
{
"members": members,
"page": page_obj.number,
"num_pages": paginator.num_pages,
"has_previous": page_obj.has_previous(),
"has_next": page_obj.has_next(),
"total": paginator.count,
"unfiltered_total": unfiltered_total,
}
)
def get_member_ids_from_request(request, portfolio):
"""Given the current request,
get all members that are associated with the given portfolio"""
member_ids = []
if portfolio:
member_ids = UserPortfolioPermission.objects.filter(portfolio=portfolio).values_list("user__id", flat=True)
return member_ids
def apply_search(queryset, request):
search_term = request.GET.get("search_term")
if search_term:
queryset = queryset.filter(
Q(username__icontains=search_term)
| Q(first_name__icontains=search_term)
| Q(last_name__icontains=search_term)
| Q(email__icontains=search_term)
)
return queryset
def apply_sorting(queryset, request):
sort_by = request.GET.get("sort_by", "id") # Default to 'id'
order = request.GET.get("order", "asc") # Default to 'asc'
if sort_by == "member":
sort_by = ["email", "first_name", "middle_name", "last_name"]
else:
sort_by = [sort_by]
if order == "desc":
sort_by = [f"-{field}" for field in sort_by]
return queryset.order_by(*sort_by)
def serialize_members(request, portfolio, member, user, admin_ids, portfolio_invitation_emails):
# ------- VIEW ONLY
# If not view_only (the user has permissions to edit/manage users), show the gear icon with "Manage" link.
# If view_only (the user only has view user permissions), show the "View" link (no gear icon).
# We check on user_group_permision to account for the upcoming "Manage portfolio" button on admin.
user_can_edit_other_users = False
for user_group_permission in ["registrar.full_access_permission", "registrar.change_user"]:
if user.has_perm(user_group_permission):
user_can_edit_other_users = True
break
view_only = not user.has_edit_members_portfolio_permission(portfolio) or not user_can_edit_other_users
# ------- USER STATUSES
is_invited = member.email in portfolio_invitation_emails
last_active = "Invited" if is_invited else "Unknown"
if member.last_login:
last_active = member.last_login.strftime("%b. %d, %Y")
is_admin = member.id in admin_ids
# ------- SERIALIZE
member_json = {
"id": member.id,
"name": member.get_formatted_name(),
"email": member.email,
"is_admin": is_admin,
"last_active": last_active,
"action_url": "#", # reverse("members", kwargs={"pk": member.id}), # TODO: Future ticket?
"action_label": ("View" if view_only else "Manage"),
"svg_icon": ("visibility" if view_only else "settings"),
}
return member_json

View file

@ -12,6 +12,7 @@ from registrar.views.utility.permission_views import (
PortfolioDomainsPermissionView,
PortfolioBasePermissionView,
NoPortfolioDomainsPermissionView,
PortfolioMembersPermissionView,
)
from django.views.generic import View
from django.views.generic.edit import FormMixin
@ -41,6 +42,15 @@ class PortfolioDomainRequestsView(PortfolioDomainRequestsPermissionView, View):
return render(request, "portfolio_requests.html")
class PortfolioMembersView(PortfolioMembersPermissionView, View):
template_name = "portfolio_members.html"
def get(self, request):
"""Add additional context data to the template."""
return render(request, "portfolio_members.html")
class PortfolioNoDomainsView(NoPortfolioDomainsPermissionView, View):
"""Some users have access to the underlying portfolio, but not any domains.
This is a custom view which explains that to the user - and denotes who to contact.

View file

@ -55,11 +55,9 @@ def get_federal_and_portfolio_types_from_federal_agency_json(request):
portfolio_type = None
agency_name = request.GET.get("agency_name")
organization_type = request.GET.get("organization_type")
agency = FederalAgency.objects.filter(agency=agency_name).first()
if agency:
federal_type = Portfolio.get_federal_type(agency)
portfolio_type = Portfolio.get_portfolio_type(organization_type, federal_type)
federal_type = BranchChoices.get_branch_label(federal_type) if federal_type else "-"
response_data = {

View file

@ -490,7 +490,7 @@ class PortfolioMembersPermission(PortfolioBasePermission):
up from the portfolio's primary key in self.kwargs["pk"]"""
portfolio = self.request.session.get("portfolio")
if not self.request.user.has_view_members(portfolio):
if not self.request.user.has_view_members_portfolio_permission(portfolio):
return False
return super().has_permission()