diff --git a/docs/developer/README.md b/docs/developer/README.md
index 9ddb35352..0fa9d9a8c 100644
--- a/docs/developer/README.md
+++ b/docs/developer/README.md
@@ -173,7 +173,7 @@ You can change the logging verbosity, if needed. Do a web search for "django log
## Mock data
-[load.py](../../src/registrar/management/commands/load.py) called from docker-compose (locally) and reset-db.yml (upper) loads the fixtures from [fixtures_user.py](../../src/registrar/fixtures_users.py) and [fixtures_domain_requests.py](../../src/registrar/fixtures_domain_requests.py), giving you some test data to play with while developing.
+[load.py](../../src/registrar/management/commands/load.py) called from docker-compose (locally) and reset-db.yml (upper) loads the fixtures from [fixtures_user.py](../../src/registrar/fixtures/fixtures_users.py) and the rest of the data-loading fixtures in that fixtures folder, giving you some test data to play with while developing.
See the [database-access README](./database-access.md) for information on how to pull data to update these fixtures.
diff --git a/docs/operations/import_export.md b/docs/operations/import_export.md
index b7fd50d52..224fe9a5f 100644
--- a/docs/operations/import_export.md
+++ b/docs/operations/import_export.md
@@ -9,17 +9,16 @@ Simple scripts are provided as detailed below.
### Export
To export from the source environment, run the following command from src directory:
-manage.py export_tables
Connect to the source sandbox and run the command:
-cf ssh {source-app}
-/tmp/lifecycle/shell
-./manage.py export_tables
+`cf ssh {source-app}`
+`/tmp/lifecycle/shell`
+`./manage.py export_tables`
example exporting from getgov-stable:
-cf ssh getgov-stable
-/tmp/lifecycle/shell
-./manage.py export_tables
+`cf ssh getgov-stable`
+`/tmp/lifecycle/shell`
+`./manage.py export_tables`
This exports a file, exported_tables.zip, to the tmp directory
@@ -42,14 +41,16 @@ After exporting the file from the target environment, scp the exported_tables.zi
file from the target environment to local. Run the below commands from local.
Get passcode by running:
-cf ssh-code
+`cf ssh-code`
scp file from source app to local file:
-scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app {source-app} --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip {local_file_path}
+`scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app {source-app} --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip {local_file_path}`
when prompted, supply the passcode retrieved in the 'cf ssh-code' command
example copying from stable to local cwd:
-scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-stable --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip .
+`scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-stable --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip .`
+
+`scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-stable --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip .`
### Import
@@ -63,14 +64,14 @@ that there are no database conflicts on import.
In order to delete all rows from the appropriate tables, run the following
command:
-cf ssh {target-app}
-/tmp/lifecycle/shell
-./manage.py clean_tables
+`cf ssh {target-app}`
+`/tmp/lifecycle/shell`
+`./manage.py clean_tables`
example cleaning getgov-backup:
-cf ssh getgov-backup
-/tmp/lifecycle/backup
-./manage.py clean_tables
+`cf ssh getgov-backup`
+`/tmp/lifecycle/shell`
+`./manage.py clean_tables`
For reference, this deletes all rows from the following tables:
@@ -96,28 +97,30 @@ with --skipEppSave option set to False. If you set to False, it will attempt to
records to the registry on load. If this is unset, or set to True, it will load the database and not
attempt to update the registry on load.
+Please note that there is currently a bug (missing batch importing, see #2862) so this may not work
+smoothly right now currently.
+
To scp the exported_tables.zip file from local to the sandbox, run the following:
Get passcode by running:
-cf ssh-code
+`cf ssh-code`
scp file from local to target app:
-scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app {target-app} --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 {local_file_path} ssh.fr.cloud.gov:app/tmp/exported_tables.zip
+`scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app {target-app} --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 {local_file_path} ssh.fr.cloud.gov:app/tmp/exported_tables.zip`
when prompted, supply the passcode retrieved in the 'cf ssh-code' command
example copy of local file in tmp to getgov-backup:
-scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-backup --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 tmp/exported_tables.zip ssh.fr.cloud.gov:app/tmp/exported_tables.zip
-
+`scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-backup --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 exported_tables.zip ssh.fr.cloud.gov:app/tmp/exported_tables.zip`
Then connect to a shell in the target environment, and run the following import command:
-cf ssh {target-app}
-/tmp/lifecycle/shell
-./manage.py import_tables
+`cf ssh {target-app}`
+`/tmp/lifecycle/shell`
+`./manage.py import_tables`
example cleaning getgov-backup:
-cf ssh getgov-backup
-/tmp/lifecycle/backup
-./manage.py import_tables --no-skipEppSave
+`cf ssh getgov-backup`
+`/tmp/lifecycle/shell`
+`./manage.py import_tables --no-skipEppSave`
For reference, this imports tables in the following order:
diff --git a/src/api/tests/test_rdap.py b/src/api/tests/test_rdap.py
new file mode 100644
index 000000000..789a99152
--- /dev/null
+++ b/src/api/tests/test_rdap.py
@@ -0,0 +1,66 @@
+"""Test the domain rdap lookup API."""
+
+import json
+
+from django.contrib.auth import get_user_model
+from django.test import RequestFactory
+from django.test import TestCase
+
+from ..views import rdap
+
+API_BASE_PATH = "/api/v1/rdap/?domain="
+
+
+class RdapViewTest(TestCase):
+ """Test that the RDAP view function works as expected"""
+
+ def setUp(self):
+ super().setUp()
+ self.user = get_user_model().objects.create(username="username")
+ self.factory = RequestFactory()
+
+ def test_rdap_get_no_tld(self):
+ """RDAP API successfully fetches RDAP for domain without a TLD"""
+ request = self.factory.get(API_BASE_PATH + "whitehouse")
+ request.user = self.user
+ response = rdap(request, domain="whitehouse")
+ # contains the right text
+ self.assertContains(response, "rdap")
+ # can be parsed into JSON with appropriate keys
+ response_object = json.loads(response.content)
+ self.assertIn("rdapConformance", response_object)
+
+ def test_rdap_invalid_domain(self):
+ """RDAP API accepts invalid domain queries and returns JSON response
+ with appropriate error codes"""
+ request = self.factory.get(API_BASE_PATH + "whitehouse.com")
+ request.user = self.user
+ response = rdap(request, domain="whitehouse.com")
+
+ self.assertContains(response, "errorCode")
+ response_object = json.loads(response.content)
+ self.assertIn("errorCode", response_object)
+
+
+class RdapAPITest(TestCase):
+ """Test that the API can be called as expected."""
+
+ def setUp(self):
+ super().setUp()
+ username = "test_user"
+ first_name = "First"
+ last_name = "Last"
+ email = "info@example.com"
+ title = "title"
+ phone = "8080102431"
+ self.user = get_user_model().objects.create(
+ username=username, title=title, first_name=first_name, last_name=last_name, email=email, phone=phone
+ )
+
+ def test_rdap_get(self):
+ """Can call RDAP API"""
+ self.client.force_login(self.user)
+ response = self.client.get(API_BASE_PATH + "whitehouse.gov")
+ self.assertContains(response, "rdap")
+ response_object = json.loads(response.content)
+ self.assertIn("rdapConformance", response_object)
diff --git a/src/api/views.py b/src/api/views.py
index 2199e15ac..a7b4bde75 100644
--- a/src/api/views.py
+++ b/src/api/views.py
@@ -2,7 +2,7 @@
from django.apps import apps
from django.views.decorators.http import require_http_methods
-from django.http import HttpResponse
+from django.http import HttpResponse, JsonResponse
from django.utils.safestring import mark_safe
from registrar.templatetags.url_helpers import public_site_url
@@ -18,7 +18,7 @@ from cachetools.func import ttl_cache
from registrar.utility.s3_bucket import S3ClientError, S3ClientHelper
-DOMAIN_FILE_URL = "https://raw.githubusercontent.com/cisagov/dotgov-data/main/current-full.csv"
+RDAP_URL = "https://rdap.cloudflareregistry.com/rdap/domain/{domain}"
DOMAIN_API_MESSAGES = {
@@ -41,30 +41,6 @@ DOMAIN_API_MESSAGES = {
}
-# this file doesn't change that often, nor is it that big, so cache the result
-# in memory for ten minutes
-@ttl_cache(ttl=600)
-def _domains():
- """Return a list of the current .gov domains.
-
- Fetch a file from DOMAIN_FILE_URL, parse the CSV for the domain,
- lowercase everything and return the list.
- """
- DraftDomain = apps.get_model("registrar.DraftDomain")
- # 5 second timeout
- file_contents = requests.get(DOMAIN_FILE_URL, timeout=5).text
- domains = set()
- # skip the first line
- for line in file_contents.splitlines()[1:]:
- # get the domain before the first comma
- domain = line.split(",", 1)[0]
- # sanity-check the string we got from the file here
- if DraftDomain.string_could_be_domain(domain):
- # lowercase everything when we put it in domains
- domains.add(domain.lower())
- return domains
-
-
def check_domain_available(domain):
"""Return true if the given domain is available.
@@ -99,6 +75,22 @@ def available(request, domain=""):
return json_response
+@require_http_methods(["GET"])
+@login_not_required
+# Since we cache domain RDAP data, cache time may need to be re-evaluated this if we encounter any memory issues
+@ttl_cache(ttl=600)
+def rdap(request, domain=""):
+ """Returns JSON dictionary of a domain's RDAP data from Cloudflare API"""
+ domain = request.GET.get("domain", "")
+
+ # If inputted domain doesn't have a TLD, append .gov to it
+ if "." not in domain:
+ domain = f"{domain}.gov"
+
+ rdap_data = requests.get(RDAP_URL.format(domain=domain), timeout=5).json()
+ return JsonResponse(rdap_data)
+
+
@require_http_methods(["GET"])
@login_not_required
def get_current_full(request, file_name="current-full.csv"):
diff --git a/src/registrar/assets/js/get-gov.js b/src/registrar/assets/js/get-gov.js
index 027ef4344..8dd0fcf14 100644
--- a/src/registrar/assets/js/get-gov.js
+++ b/src/registrar/assets/js/get-gov.js
@@ -1853,6 +1853,125 @@ class DomainRequestsTable extends LoadTableBase {
}
}
+class MembersTable extends LoadTableBase {
+
+ constructor() {
+ super('.members__table', '.members__table-wrapper', '#members__search-field', '#members__search-field-submit', '.members__reset-search', '.members__reset-filters', '.members__no-data', '.members__no-search-results');
+ }
+ /**
+ * Loads rows in the members list, as well as updates pagination around the members list
+ * based on the supplied attributes.
+ * @param {*} page - the page number of the results (starts with 1)
+ * @param {*} sortBy - the sort column option
+ * @param {*} order - the sort order {asc, desc}
+ * @param {*} scroll - control for the scrollToElement functionality
+ * @param {*} status - control for the status filter
+ * @param {*} searchTerm - the search term
+ * @param {*} portfolio - the portfolio id
+ */
+ loadTable(page, sortBy = this.currentSortBy, order = this.currentOrder, scroll = this.scrollToTable, status = this.currentStatus, searchTerm =this.currentSearchTerm, portfolio = this.portfolioValue) {
+
+ // --------- SEARCH
+ let searchParams = new URLSearchParams(
+ {
+ "page": page,
+ "sort_by": sortBy,
+ "order": order,
+ "status": status,
+ "search_term": searchTerm
+ }
+ );
+ if (portfolio)
+ searchParams.append("portfolio", portfolio)
+
+
+ // --------- FETCH DATA
+ // fetch json of page of domais, given params
+ let baseUrl = document.getElementById("get_members_json_url");
+ if (!baseUrl) {
+ return;
+ }
+
+ let baseUrlValue = baseUrl.innerHTML;
+ if (!baseUrlValue) {
+ return;
+ }
+
+ let url = `${baseUrlValue}?${searchParams.toString()}` //TODO: uncomment for search function
+ fetch(url)
+ .then(response => response.json())
+ .then(data => {
+ if (data.error) {
+ console.error('Error in AJAX call: ' + data.error);
+ return;
+ }
+
+ // handle the display of proper messaging in the event that no members exist in the list or search returns no results
+ this.updateDisplay(data, this.tableWrapper, this.noTableWrapper, this.noSearchResultsWrapper, this.currentSearchTerm);
+
+ // identify the DOM element where the domain list will be inserted into the DOM
+ const memberList = document.querySelector('.members__table tbody');
+ memberList.innerHTML = '';
+
+ data.members.forEach(member => {
+ // const actionUrl = domain.action_url;
+ const member_name = member.name;
+ const member_email = member.email;
+ const last_active = member.last_active;
+ const action_url = member.action_url;
+ const action_label = member.action_label;
+ const svg_icon = member.svg_icon;
+
+ const row = document.createElement('tr');
+
+ let admin_tagHTML = ``;
+ if (member.is_admin)
+ admin_tagHTML = `Admin `
+
+ row.innerHTML = `
+
+ ${member_email ? member_email : member_name} ${admin_tagHTML}
+
+
+ ${last_active}
+
+
+
+
+
+
+ ${action_label} ${member_name}
+
+
+ `;
+ memberList.appendChild(row);
+ });
+
+ // Do not scroll on first page load
+ if (scroll)
+ ScrollToElement('class', 'members');
+ this.scrollToTable = true;
+
+ // update pagination
+ this.updatePagination(
+ 'member',
+ '#members-pagination',
+ '#members-pagination .usa-pagination__counter',
+ '#members',
+ data.page,
+ data.num_pages,
+ data.has_previous,
+ data.has_next,
+ data.total,
+ );
+ this.currentSortBy = sortBy;
+ this.currentOrder = order;
+ this.currentSearchTerm = searchTerm;
+ })
+ .catch(error => console.error('Error fetching members:', error));
+ }
+}
+
/**
* An IIFE that listens for DOM Content to be loaded, then executes. This function
@@ -1926,6 +2045,23 @@ const utcDateString = (dateString) => {
};
+
+/**
+ * An IIFE that listens for DOM Content to be loaded, then executes. This function
+ * initializes the domains list and associated functionality on the home page of the app.
+ *
+ */
+document.addEventListener('DOMContentLoaded', function() {
+ const isMembersPage = document.querySelector("#members")
+ if (isMembersPage){
+ const membersTable = new MembersTable();
+ if (membersTable.tableWrapper) {
+ // Initial load
+ membersTable.loadTable(1);
+ }
+ }
+});
+
/**
* An IIFE that displays confirmation modal on the user profile page
*/
diff --git a/src/registrar/assets/sass/_theme/_admin.scss b/src/registrar/assets/sass/_theme/_admin.scss
index c0d50bac1..5cea72c4c 100644
--- a/src/registrar/assets/sass/_theme/_admin.scss
+++ b/src/registrar/assets/sass/_theme/_admin.scss
@@ -126,7 +126,8 @@ html[data-theme="light"] {
body.dashboard,
body.change-list,
body.change-form,
- .custom-admin-template, dt {
+ .custom-admin-template,
+ .dl-dja dt {
color: var(--body-fg);
}
.usa-table td {
@@ -155,7 +156,8 @@ html[data-theme="dark"] {
body.dashboard,
body.change-list,
body.change-form,
- .custom-admin-template, dt {
+ .custom-admin-template,
+ .dl-dja dt {
color: var(--body-fg);
}
.usa-table td {
diff --git a/src/registrar/config/urls.py b/src/registrar/config/urls.py
index 76c77955f..3099f468f 100644
--- a/src/registrar/config/urls.py
+++ b/src/registrar/config/urls.py
@@ -22,18 +22,20 @@ from registrar.views.report_views import (
ExportDataTypeUser,
)
-from registrar.views.domain_request import Step
+# --jsons
from registrar.views.domain_requests_json import get_domain_requests_json
-from registrar.views.transfer_user import TransferUserView
+from registrar.views.domains_json import get_domains_json
+from registrar.views.portfolio_members_json import get_portfolio_members_json
from registrar.views.utility.api_views import (
get_senior_official_from_federal_agency_json,
get_federal_and_portfolio_types_from_federal_agency_json,
get_action_needed_email_for_user_json,
)
-from registrar.views.domains_json import get_domains_json
-from registrar.views.utility import always_404
-from api.views import available, get_current_federal, get_current_full
+from registrar.views.domain_request import Step
+from registrar.views.transfer_user import TransferUserView
+from registrar.views.utility import always_404
+from api.views import available, rdap, get_current_federal, get_current_full
DOMAIN_REQUEST_NAMESPACE = views.DomainRequestWizard.URL_NAMESPACE
domain_request_urls = [
@@ -74,6 +76,16 @@ urlpatterns = [
views.PortfolioNoDomainsView.as_view(),
name="no-portfolio-domains",
),
+ path(
+ "members/",
+ views.PortfolioMembersView.as_view(),
+ name="members",
+ ),
+ # path(
+ # "no-organization-members/",
+ # views.PortfolioNoMembersView.as_view(),
+ # name="no-portfolio-members",
+ # ),
path(
"requests/",
views.PortfolioDomainRequestsView.as_view(),
@@ -194,6 +206,7 @@ urlpatterns = [
path("openid/", include("djangooidc.urls")),
path("request/", include((domain_request_urls, DOMAIN_REQUEST_NAMESPACE))),
path("api/v1/available/", available, name="available"),
+ path("api/v1/rdap/", rdap, name="rdap"),
path("api/v1/get-report/current-federal", get_current_federal, name="get-current-federal"),
path("api/v1/get-report/current-full", get_current_full, name="get-current-full"),
path(
@@ -275,6 +288,7 @@ urlpatterns = [
),
path("get-domains-json/", get_domains_json, name="get_domains_json"),
path("get-domain-requests-json/", get_domain_requests_json, name="get_domain_requests_json"),
+ path("get-portfolio-members-json/", get_portfolio_members_json, name="get_portfolio_members_json"),
]
# Djangooidc strips out context data from that context, so we define a custom error
diff --git a/src/registrar/context_processors.py b/src/registrar/context_processors.py
index c62c9a7c4..53f6e8ae7 100644
--- a/src/registrar/context_processors.py
+++ b/src/registrar/context_processors.py
@@ -97,5 +97,5 @@ def portfolio_permissions(request):
def is_widescreen_mode(request):
- widescreen_paths = ["/domains/", "/requests/"]
+ widescreen_paths = ["/domains/", "/requests/", "/members/"]
return {"is_widescreen_mode": any(path in request.path for path in widescreen_paths) or request.path == "/"}
diff --git a/src/registrar/fixtures/fixtures_domains.py b/src/registrar/fixtures/fixtures_domains.py
new file mode 100644
index 000000000..98f13cd43
--- /dev/null
+++ b/src/registrar/fixtures/fixtures_domains.py
@@ -0,0 +1,138 @@
+from datetime import timedelta
+from django.utils import timezone
+import logging
+import random
+from faker import Faker
+from django.db import transaction
+
+from registrar.fixtures.fixtures_requests import DomainRequestFixture
+from registrar.fixtures.fixtures_users import UserFixture
+from registrar.models import User, DomainRequest
+from registrar.models.domain import Domain
+
+fake = Faker()
+logger = logging.getLogger(__name__)
+
+
+class DomainFixture(DomainRequestFixture):
+ """Create two domains and permissions on them for each user.
+ One domain will have a past expiration date.
+
+ Depends on fixtures_requests.
+
+ Make sure this class' `load` method is called from `handle`
+ in management/commands/load.py, then use `./manage.py load`
+ to run this code.
+ """
+
+ @classmethod
+ def load(cls):
+ # Lumped under .atomic to ensure we don't make redundant DB calls.
+ # This bundles them all together, and then saves it in a single call.
+ with transaction.atomic():
+ try:
+ # Get the usernames of users created in the UserFixture
+ created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF]
+
+ # Filter users to only include those created by the fixture
+ users = list(User.objects.filter(username__in=created_usernames))
+ except Exception as e:
+ logger.warning(e)
+ return
+
+ # Approve each user associated with `in review` status domains
+ cls._approve_domain_requests(users)
+
+ @staticmethod
+ def _generate_fake_expiration_date(days_in_future=365):
+ """Generates a fake expiration date between 1 and 365 days in the future."""
+ current_date = timezone.now().date() # nosec
+ return current_date + timedelta(days=random.randint(1, days_in_future)) # nosec
+
+ @staticmethod
+ def _generate_fake_expiration_date_in_past():
+ """Generates a fake expiration date up to 365 days in the past."""
+ current_date = timezone.now().date() # nosec
+ return current_date + timedelta(days=random.randint(-365, -1)) # nosec
+
+ @classmethod
+ def _approve_request(cls, domain_request, users):
+ """Helper function to approve a domain request."""
+ if not domain_request:
+ return None
+
+ if domain_request.investigator is None:
+ # Assign random investigator if not already assigned
+ domain_request.investigator = random.choice(users) # nosec
+
+ # Approve the domain request
+ domain_request.approve(send_email=False)
+
+ return domain_request
+
+ @classmethod
+ def _approve_domain_requests(cls, users):
+ """Approves one current and one expired request per user."""
+ domain_requests_to_update = []
+ expired_requests = []
+
+ for user in users:
+ # Get the latest and second-to-last domain requests
+ domain_requests = DomainRequest.objects.filter(
+ creator=user, status=DomainRequest.DomainRequestStatus.IN_REVIEW
+ ).order_by("-id")[:2]
+
+ # Latest domain request
+ domain_request = domain_requests[0] if domain_requests else None
+ # Second-to-last domain request (expired)
+ domain_request_expired = domain_requests[1] if len(domain_requests) > 1 else None
+
+ # Approve the current domain request
+ if domain_request:
+ cls._approve_request(domain_request, users)
+ domain_requests_to_update.append(domain_request)
+
+ # Approve the expired domain request
+ if domain_request_expired:
+ cls._approve_request(domain_request_expired, users)
+ domain_requests_to_update.append(domain_request_expired)
+ expired_requests.append(domain_request_expired)
+
+ # Perform bulk update for the domain requests
+ cls._bulk_update_requests(domain_requests_to_update)
+
+ # Retrieve all domains associated with the domain requests
+ domains_to_update = Domain.objects.filter(domain_info__domain_request__in=domain_requests_to_update)
+
+ # Loop through and update expiration dates for domains
+ for domain in domains_to_update:
+ domain_request = domain.domain_info.domain_request
+
+ # Set the expiration date based on whether the request is expired
+ if domain_request in expired_requests:
+ domain.expiration_date = cls._generate_fake_expiration_date_in_past()
+ else:
+ domain.expiration_date = cls._generate_fake_expiration_date()
+
+ # Perform bulk update for the domains
+ cls._bulk_update_domains(domains_to_update)
+
+ @classmethod
+ def _bulk_update_requests(cls, domain_requests_to_update):
+ """Bulk update domain requests."""
+ if domain_requests_to_update:
+ try:
+ DomainRequest.objects.bulk_update(domain_requests_to_update, ["status", "investigator"])
+ logger.info(f"Successfully updated {len(domain_requests_to_update)} requests.")
+ except Exception as e:
+ logger.error(f"Unexpected error during requests bulk update: {e}")
+
+ @classmethod
+ def _bulk_update_domains(cls, domains_to_update):
+ """Bulk update domains with expiration dates."""
+ if domains_to_update:
+ try:
+ Domain.objects.bulk_update(domains_to_update, ["expiration_date"])
+ logger.info(f"Successfully updated {len(domains_to_update)} domains.")
+ except Exception as e:
+ logger.error(f"Unexpected error during domains bulk update: {e}")
diff --git a/src/registrar/fixtures/fixtures_portfolios.py b/src/registrar/fixtures/fixtures_portfolios.py
new file mode 100644
index 000000000..2a391fb16
--- /dev/null
+++ b/src/registrar/fixtures/fixtures_portfolios.py
@@ -0,0 +1,125 @@
+import logging
+import random
+from faker import Faker
+from django.db import transaction
+
+from registrar.models import User, DomainRequest, FederalAgency
+from registrar.models.portfolio import Portfolio
+from registrar.models.senior_official import SeniorOfficial
+
+
+fake = Faker()
+logger = logging.getLogger(__name__)
+
+
+class PortfolioFixture:
+ """
+ Creates 2 pre-defined portfolios with the infrastructure to add more.
+
+ Make sure this class' `load` method is called from `handle`
+ in management/commands/load.py, then use `./manage.py load`
+ to run this code.
+ """
+
+ PORTFOLIOS = [
+ {
+ "organization_name": "Hotel California",
+ },
+ {
+ "organization_name": "Wish You Were Here",
+ },
+ ]
+
+ @classmethod
+ def fake_so(cls):
+ return {
+ "first_name": fake.first_name(),
+ "last_name": fake.last_name(),
+ "title": fake.job(),
+ "email": fake.ascii_safe_email(),
+ "phone": "201-555-5555",
+ }
+
+ @classmethod
+ def _set_non_foreign_key_fields(cls, portfolio: Portfolio, portfolio_dict: dict):
+ """Helper method used by `load`."""
+ portfolio.organization_type = (
+ portfolio_dict["organization_type"]
+ if "organization_type" in portfolio_dict
+ else DomainRequest.OrganizationChoices.FEDERAL
+ )
+ portfolio.notes = portfolio_dict["notes"] if "notes" in portfolio_dict else None
+ portfolio.address_line1 = (
+ portfolio_dict["address_line1"] if "address_line1" in portfolio_dict else fake.street_address()
+ )
+ portfolio.address_line2 = portfolio_dict["address_line2"] if "address_line2" in portfolio_dict else None
+ portfolio.city = portfolio_dict["city"] if "city" in portfolio_dict else fake.city()
+ portfolio.state_territory = (
+ portfolio_dict["state_territory"] if "state_territory" in portfolio_dict else fake.state_abbr()
+ )
+ portfolio.zipcode = portfolio_dict["zipcode"] if "zipcode" in portfolio_dict else fake.postalcode()
+ portfolio.urbanization = portfolio_dict["urbanization"] if "urbanization" in portfolio_dict else None
+ portfolio.security_contact_email = (
+ portfolio_dict["security_contact_email"] if "security_contact_email" in portfolio_dict else fake.email()
+ )
+
+ @classmethod
+ def _set_foreign_key_fields(cls, portfolio: Portfolio, portfolio_dict: dict, user: User):
+ """Helper method used by `load`."""
+ if not portfolio.senior_official:
+ if portfolio_dict.get("senior_official") is not None:
+ portfolio.senior_official, _ = SeniorOfficial.objects.get_or_create(**portfolio_dict["senior_official"])
+ else:
+ portfolio.senior_official = SeniorOfficial.objects.create(**cls.fake_so())
+
+ if not portfolio.federal_agency:
+ if portfolio_dict.get("federal_agency") is not None:
+ portfolio.federal_agency, _ = FederalAgency.objects.get_or_create(name=portfolio_dict["federal_agency"])
+ else:
+ federal_agencies = FederalAgency.objects.all()
+ # Random choice of agency for selects, used as placeholders for testing.
+ portfolio.federal_agency = random.choice(federal_agencies) # nosec
+
+ @classmethod
+ def load(cls):
+ """Creates portfolios."""
+ logger.info("Going to load %s portfolios" % len(cls.PORTFOLIOS))
+
+ # Lumped under .atomic to ensure we don't make redundant DB calls.
+ # This bundles them all together, and then saves it in a single call.
+ with transaction.atomic():
+ try:
+ user = User.objects.all().last()
+ except Exception as e:
+ logger.warning(e)
+ return
+
+ portfolios_to_create = []
+ for portfolio_data in cls.PORTFOLIOS:
+ organization_name = portfolio_data["organization_name"]
+
+ # Check if portfolio with the organization name already exists
+ if Portfolio.objects.filter(organization_name=organization_name).exists():
+ logger.info(
+ f"Portfolio with organization name '{organization_name}' already exists, skipping creation."
+ )
+ continue
+
+ try:
+ portfolio = Portfolio(
+ creator=user,
+ organization_name=portfolio_data["organization_name"],
+ )
+ cls._set_non_foreign_key_fields(portfolio, portfolio_data)
+ cls._set_foreign_key_fields(portfolio, portfolio_data, user)
+ portfolios_to_create.append(portfolio)
+ except Exception as e:
+ logger.warning(e)
+
+ # Bulk create domain requests
+ if len(portfolios_to_create) > 0:
+ try:
+ Portfolio.objects.bulk_create(portfolios_to_create)
+ logger.info(f"Successfully created {len(portfolios_to_create)} portfolios")
+ except Exception as e:
+ logger.warning(f"Error bulk creating portfolios: {e}")
diff --git a/src/registrar/fixtures/fixtures_requests.py b/src/registrar/fixtures/fixtures_requests.py
new file mode 100644
index 000000000..f5b57491e
--- /dev/null
+++ b/src/registrar/fixtures/fixtures_requests.py
@@ -0,0 +1,325 @@
+from datetime import timedelta
+from django.utils import timezone
+import logging
+import random
+from faker import Faker
+from django.db import transaction
+
+from registrar.fixtures.fixtures_portfolios import PortfolioFixture
+from registrar.fixtures.fixtures_users import UserFixture
+from registrar.models import User, DomainRequest, DraftDomain, Contact, Website, FederalAgency
+from registrar.models.portfolio import Portfolio
+from registrar.models.suborganization import Suborganization
+
+
+fake = Faker()
+logger = logging.getLogger(__name__)
+
+
+class DomainRequestFixture:
+ """
+ Creates domain requests for each user in the database,
+ assign portfolios and suborgs.
+
+ Creates 3 in_review requests, one for approving with an expired domain,
+ one for approving with a non-expired domain, and one for leaving in in_review.
+
+ Depends on fixtures_portfolios and fixtures_suborganizations.
+
+ Make sure this class' `load` method is called from `handle`
+ in management/commands/load.py, then use `./manage.py load`
+ to run this code.
+ """
+
+ # any fields not specified here will be filled in with fake data or defaults
+ # NOTE BENE: each fixture must have `organization_name` for uniqueness!
+ # Here is a more complete example as a template:
+ # {
+ # "status": "started",
+ # "organization_name": "Example - Just started",
+ # "generic_org_type": "federal",
+ # "federal_agency": None,
+ # "federal_type": None,
+ # "address_line1": None,
+ # "address_line2": None,
+ # "city": None,
+ # "state_territory": None,
+ # "zipcode": None,
+ # "urbanization": None,
+ # "purpose": None,
+ # "anything_else": None,
+ # "is_policy_acknowledged": None,
+ # "senior_official": None,
+ # "other_contacts": [],
+ # "current_websites": [],
+ # "alternative_domains": [],
+ # },
+ DOMAINREQUESTS = [
+ {
+ "status": DomainRequest.DomainRequestStatus.STARTED,
+ "organization_name": "Example - Finished but not submitted",
+ },
+ {
+ "status": DomainRequest.DomainRequestStatus.SUBMITTED,
+ "organization_name": "Example - Submitted but pending investigation",
+ },
+ {
+ "status": DomainRequest.DomainRequestStatus.IN_REVIEW,
+ "organization_name": "Example - In investigation",
+ },
+ {
+ "status": DomainRequest.DomainRequestStatus.IN_REVIEW,
+ "organization_name": "Example - Approved",
+ },
+ {
+ "status": DomainRequest.DomainRequestStatus.IN_REVIEW,
+ "organization_name": "Example - Approved, domain expired",
+ },
+ {
+ "status": DomainRequest.DomainRequestStatus.WITHDRAWN,
+ "organization_name": "Example - Withdrawn",
+ },
+ {
+ "status": DomainRequest.DomainRequestStatus.ACTION_NEEDED,
+ "organization_name": "Example - Action needed",
+ },
+ {
+ "status": "rejected",
+ "organization_name": "Example - Rejected",
+ },
+ ]
+
+ @classmethod
+ def fake_contact(cls):
+ return {
+ "first_name": fake.first_name(),
+ "middle_name": None,
+ "last_name": fake.last_name(),
+ "title": fake.job(),
+ "email": fake.ascii_safe_email(),
+ "phone": "201-555-5555",
+ }
+
+ @classmethod
+ def fake_dot_gov(cls):
+ return f"{fake.slug()}.gov"
+
+ @classmethod
+ def fake_expiration_date(cls):
+ """Generates a fake expiration date between 0 and 1 year in the future."""
+ current_date = timezone.now().date()
+ days_in_future = random.randint(0, 365) # nosec
+ return current_date + timedelta(days=days_in_future)
+
+ @classmethod
+ def _set_non_foreign_key_fields(cls, request: DomainRequest, request_dict: dict):
+ """Helper method used by `load`."""
+ request.status = request_dict["status"] if "status" in request_dict else "started"
+
+ # TODO for a future ticket: Allow for more than just "federal" here
+ request.generic_org_type = request_dict["generic_org_type"] if "generic_org_type" in request_dict else "federal"
+ if request.status != "started":
+ request.last_submitted_date = fake.date()
+ request.federal_type = (
+ request_dict["federal_type"]
+ if "federal_type" in request_dict
+ else random.choice(["executive", "judicial", "legislative"]) # nosec
+ )
+ request.address_line1 = (
+ request_dict["address_line1"] if "address_line1" in request_dict else fake.street_address()
+ )
+ request.address_line2 = request_dict["address_line2"] if "address_line2" in request_dict else None
+ request.city = request_dict["city"] if "city" in request_dict else fake.city()
+ request.state_territory = (
+ request_dict["state_territory"] if "state_territory" in request_dict else fake.state_abbr()
+ )
+ request.zipcode = request_dict["zipcode"] if "zipcode" in request_dict else fake.postalcode()
+ request.urbanization = request_dict["urbanization"] if "urbanization" in request_dict else None
+ request.purpose = request_dict["purpose"] if "purpose" in request_dict else fake.paragraph()
+ request.has_cisa_representative = (
+ request_dict["has_cisa_representative"] if "has_cisa_representative" in request_dict else True
+ )
+ request.cisa_representative_email = (
+ request_dict["cisa_representative_email"] if "cisa_representative_email" in request_dict else fake.email()
+ )
+ request.cisa_representative_first_name = (
+ request_dict["cisa_representative_first_name"]
+ if "cisa_representative_first_name" in request_dict
+ else fake.first_name()
+ )
+ request.cisa_representative_last_name = (
+ request_dict["cisa_representative_last_name"]
+ if "cisa_representative_last_name" in request_dict
+ else fake.last_name()
+ )
+ request.has_anything_else_text = (
+ request_dict["has_anything_else_text"] if "has_anything_else_text" in request_dict else True
+ )
+ request.anything_else = request_dict["anything_else"] if "anything_else" in request_dict else fake.paragraph()
+ request.is_policy_acknowledged = (
+ request_dict["is_policy_acknowledged"] if "is_policy_acknowledged" in request_dict else True
+ )
+
+ @classmethod
+ def _set_foreign_key_fields(cls, request: DomainRequest, request_dict: dict, user: User):
+ """Helper method used by `load`."""
+ request.investigator = cls._get_investigator(request, request_dict, user)
+ request.senior_official = cls._get_senior_official(request, request_dict)
+ request.requested_domain = cls._get_requested_domain(request, request_dict)
+ request.federal_agency = cls._get_federal_agency(request, request_dict)
+ request.portfolio = cls._get_portfolio(request, request_dict)
+ request.sub_organization = cls._get_sub_organization(request, request_dict)
+
+ @classmethod
+ def _get_investigator(cls, request: DomainRequest, request_dict: dict, user: User):
+ if not request.investigator:
+ return User.objects.get(username=user.username) if "investigator" in request_dict else None
+ return request.investigator
+
+ @classmethod
+ def _get_senior_official(cls, request: DomainRequest, request_dict: dict):
+ if not request.senior_official:
+ if "senior_official" in request_dict and request_dict["senior_official"] is not None:
+ return Contact.objects.get_or_create(**request_dict["senior_official"])[0]
+ return Contact.objects.create(**cls.fake_contact())
+ return request.senior_official
+
+ @classmethod
+ def _get_requested_domain(cls, request: DomainRequest, request_dict: dict):
+ if not request.requested_domain:
+ if "requested_domain" in request_dict and request_dict["requested_domain"] is not None:
+ return DraftDomain.objects.get_or_create(name=request_dict["requested_domain"])[0]
+ return DraftDomain.objects.create(name=cls.fake_dot_gov())
+ return request.requested_domain
+
+ @classmethod
+ def _get_federal_agency(cls, request: DomainRequest, request_dict: dict):
+ if not request.federal_agency:
+ if "federal_agency" in request_dict and request_dict["federal_agency"] is not None:
+ return FederalAgency.objects.get_or_create(name=request_dict["federal_agency"])[0]
+ return random.choice(FederalAgency.objects.all()) # nosec
+ return request.federal_agency
+
+ @classmethod
+ def _get_portfolio(cls, request: DomainRequest, request_dict: dict):
+ if not request.portfolio:
+ if "portfolio" in request_dict and request_dict["portfolio"] is not None:
+ return Portfolio.objects.get_or_create(name=request_dict["portfolio"])[0]
+ return cls._get_random_portfolio()
+ return request.portfolio
+
+ @classmethod
+ def _get_sub_organization(cls, request: DomainRequest, request_dict: dict):
+ if not request.sub_organization:
+ if "sub_organization" in request_dict and request_dict["sub_organization"] is not None:
+ return Suborganization.objects.get_or_create(name=request_dict["sub_organization"])[0]
+ return cls._get_random_sub_organization()
+ return request.sub_organization
+
+ @classmethod
+ def _get_random_portfolio(cls):
+ try:
+ organization_names = [portfolio["organization_name"] for portfolio in PortfolioFixture.PORTFOLIOS]
+
+ portfolio_options = Portfolio.objects.filter(organization_name__in=organization_names)
+ return random.choice(portfolio_options) if portfolio_options.exists() else None # nosec
+ except Exception as e:
+ logger.warning(f"Expected fixture portfolio, did not find it: {e}")
+ return None
+
+ @classmethod
+ def _get_random_sub_organization(cls):
+ try:
+ suborg_options = [Suborganization.objects.first(), Suborganization.objects.last()]
+ return random.choice(suborg_options) # nosec
+ except Exception as e:
+ logger.warning(f"Expected fixture sub_organization, did not find it: {e}")
+ return None
+
+ @classmethod
+ def _set_many_to_many_relations(cls, request: DomainRequest, request_dict: dict):
+ """Helper method used by `load`."""
+ if "other_contacts" in request_dict:
+ for contact in request_dict["other_contacts"]:
+ request.other_contacts.add(Contact.objects.get_or_create(**contact)[0])
+ elif not request.other_contacts.exists():
+ other_contacts = [
+ Contact.objects.create(**cls.fake_contact()) for _ in range(random.randint(1, 3)) # nosec
+ ]
+ request.other_contacts.add(*other_contacts)
+
+ if "current_websites" in request_dict:
+ for website in request_dict["current_websites"]:
+ request.current_websites.add(Website.objects.get_or_create(website=website)[0])
+ elif not request.current_websites.exists():
+ current_websites = [
+ Website.objects.create(website=fake.uri()) for _ in range(random.randint(0, 3)) # nosec
+ ]
+ request.current_websites.add(*current_websites)
+
+ if "alternative_domains" in request_dict:
+ for domain in request_dict["alternative_domains"]:
+ request.alternative_domains.add(Website.objects.get_or_create(website=domain)[0])
+ elif not request.alternative_domains.exists():
+ alternative_domains = [
+ Website.objects.create(website=cls.fake_dot_gov()) for _ in range(random.randint(0, 3)) # nosec
+ ]
+ request.alternative_domains.add(*alternative_domains)
+
+ @classmethod
+ def load(cls):
+ """Creates domain requests for each user in the database."""
+ logger.info("Going to load %s domain requests" % len(cls.DOMAINREQUESTS))
+
+ # Lumped under .atomic to ensure we don't make redundant DB calls.
+ # This bundles them all together, and then saves it in a single call.
+ with transaction.atomic():
+ try:
+ # Get the usernames of users created in the UserFixture
+ created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF]
+
+ # Filter users to only include those created by the fixture
+ users = list(User.objects.filter(username__in=created_usernames))
+ except Exception as e:
+ logger.warning(e)
+ return
+
+ cls._create_domain_requests(users)
+
+ @classmethod
+ def _create_domain_requests(cls, users):
+ """Creates DomainRequests given a list of users."""
+ domain_requests_to_create = []
+ for user in users:
+ for request_data in cls.DOMAINREQUESTS:
+ # Prepare DomainRequest objects
+ try:
+ domain_request = DomainRequest(
+ creator=user,
+ organization_name=request_data["organization_name"],
+ )
+ cls._set_non_foreign_key_fields(domain_request, request_data)
+ cls._set_foreign_key_fields(domain_request, request_data, user)
+ domain_requests_to_create.append(domain_request)
+ except Exception as e:
+ logger.warning(e)
+
+ # Bulk create domain requests
+ cls._bulk_create_requests(domain_requests_to_create)
+
+ # Now many-to-many relationships
+ for domain_request in domain_requests_to_create:
+ try:
+ cls._set_many_to_many_relations(domain_request, request_data)
+ except Exception as e:
+ logger.warning(e)
+
+ @classmethod
+ def _bulk_create_requests(cls, domain_requests_to_create):
+ """Bulk create domain requests."""
+ if len(domain_requests_to_create) > 0:
+ try:
+ DomainRequest.objects.bulk_create(domain_requests_to_create)
+ logger.info(f"Successfully created {len(domain_requests_to_create)} requests.")
+ except Exception as e:
+ logger.error(f"Unexpected error during requests bulk creation: {e}")
diff --git a/src/registrar/fixtures/fixtures_suborganizations.py b/src/registrar/fixtures/fixtures_suborganizations.py
new file mode 100644
index 000000000..af7e02804
--- /dev/null
+++ b/src/registrar/fixtures/fixtures_suborganizations.py
@@ -0,0 +1,87 @@
+import logging
+from faker import Faker
+from django.db import transaction
+
+from registrar.models.portfolio import Portfolio
+from registrar.models.suborganization import Suborganization
+
+
+fake = Faker()
+logger = logging.getLogger(__name__)
+
+
+class SuborganizationFixture:
+ """
+ Creates 2 pre-defined suborg with the infrastructure to add more.
+
+ Depends on fixtures_portfolios.
+
+ Make sure this class' `load` method is called from `handle`
+ in management/commands/load.py, then use `./manage.py load`
+ to run this code.
+ """
+
+ SUBORGS = [
+ {
+ "name": "Take it Easy",
+ },
+ {
+ "name": "Welcome to the Machine",
+ },
+ ]
+
+ @classmethod
+ def load(cls):
+ """Creates suborganizations."""
+ logger.info(f"Going to load {len(cls.SUBORGS)} suborgs")
+
+ with transaction.atomic():
+ portfolios = cls._get_portfolios()
+ if not portfolios:
+ return
+
+ suborgs_to_create = cls._prepare_suborgs_to_create(portfolios)
+ cls._bulk_create_suborgs(suborgs_to_create)
+
+ @classmethod
+ def _get_portfolios(cls):
+ """Fetches portfolios with organization_name 'Hotel California' and 'Wish You Were Here'
+ and logs warnings if not found."""
+ try:
+ portfolio1 = Portfolio.objects.filter(organization_name="Hotel California").first()
+ portfolio2 = Portfolio.objects.filter(organization_name="Wish You Were Here").first()
+
+ if not portfolio1 or not portfolio2:
+ logger.warning("One or both portfolios not found.")
+ return None
+ return portfolio1, portfolio2
+ except Exception as e:
+ logger.warning(f"Error fetching portfolios: {e}")
+ return None
+
+ @classmethod
+ def _prepare_suborgs_to_create(cls, portfolios):
+ """Prepares a list of suborganizations to create, avoiding duplicates."""
+ portfolio1, portfolio2 = portfolios
+ suborgs_to_create = []
+
+ try:
+ if not Suborganization.objects.filter(name=cls.SUBORGS[0]["name"]).exists():
+ suborgs_to_create.append(Suborganization(portfolio=portfolio1, name=cls.SUBORGS[0]["name"]))
+
+ if not Suborganization.objects.filter(name=cls.SUBORGS[1]["name"]).exists():
+ suborgs_to_create.append(Suborganization(portfolio=portfolio2, name=cls.SUBORGS[1]["name"]))
+ except Exception as e:
+ logger.warning(f"Error creating suborg objects: {e}")
+
+ return suborgs_to_create
+
+ @classmethod
+ def _bulk_create_suborgs(cls, suborgs_to_create):
+ """Bulk creates suborganizations and logs success or errors."""
+ if suborgs_to_create:
+ try:
+ Suborganization.objects.bulk_create(suborgs_to_create)
+ logger.info(f"Successfully created {len(suborgs_to_create)} suborgs")
+ except Exception as e:
+ logger.warning(f"Error bulk creating suborgs: {e}")
diff --git a/src/registrar/fixtures/fixtures_user_portfolio_permissions.py b/src/registrar/fixtures/fixtures_user_portfolio_permissions.py
new file mode 100644
index 000000000..3c64eb6b5
--- /dev/null
+++ b/src/registrar/fixtures/fixtures_user_portfolio_permissions.py
@@ -0,0 +1,86 @@
+import logging
+from faker import Faker
+from django.db import transaction
+
+from registrar.fixtures.fixtures_portfolios import PortfolioFixture
+from registrar.fixtures.fixtures_users import UserFixture
+from registrar.models import User
+from registrar.models.portfolio import Portfolio
+from registrar.models.user_portfolio_permission import UserPortfolioPermission
+from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
+
+fake = Faker()
+logger = logging.getLogger(__name__)
+
+
+class UserPortfolioPermissionFixture:
+ """Create user portfolio permissions for each user.
+ Each user will be admin on 2 portfolios.
+
+ Depends on fixture_portfolios"""
+
+ @classmethod
+ def load(cls):
+ logger.info("Going to set user portfolio permissions")
+
+ # Lumped under .atomic to ensure we don't make redundant DB calls.
+ # This bundles them all together, and then saves it in a single call.
+ with transaction.atomic():
+ try:
+ # Get the usernames of users created in the UserFixture
+ created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF]
+
+ # Filter users to only include those created by the fixture
+ users = list(User.objects.filter(username__in=created_usernames))
+
+ organization_names = [portfolio["organization_name"] for portfolio in PortfolioFixture.PORTFOLIOS]
+
+ portfolios = list(Portfolio.objects.filter(organization_name__in=organization_names))
+
+ if not users:
+ logger.warning("User fixtures missing.")
+ return
+
+ if not portfolios:
+ logger.warning("Portfolio fixtures missing.")
+ return
+
+ except Exception as e:
+ logger.warning(e)
+ return
+
+ user_portfolio_permissions_to_create = []
+ for user in users:
+ for portfolio in portfolios:
+ try:
+ if not UserPortfolioPermission.objects.filter(user=user, portfolio=portfolio).exists():
+ user_portfolio_permission = UserPortfolioPermission(
+ user=user,
+ portfolio=portfolio,
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
+ )
+ user_portfolio_permissions_to_create.append(user_portfolio_permission)
+ else:
+ logger.info(
+ f"Permission exists for user '{user.username}' "
+ f"on portfolio '{portfolio.organization_name}'."
+ )
+ except Exception as e:
+ logger.warning(e)
+
+ # Bulk create permissions
+ cls._bulk_create_permissions(user_portfolio_permissions_to_create)
+
+ @classmethod
+ def _bulk_create_permissions(cls, user_portfolio_permissions_to_create):
+ """Bulk creates permissions and logs success or errors."""
+ if user_portfolio_permissions_to_create:
+ try:
+ UserPortfolioPermission.objects.bulk_create(user_portfolio_permissions_to_create)
+ logger.info(
+ f"Successfully created {len(user_portfolio_permissions_to_create)} user portfolio permissions."
+ )
+ except Exception as e:
+ logger.error(f"Unexpected error during portfolio permission bulk creation: {e}")
+ else:
+ logger.info("No new user portfolio permissions to create.")
diff --git a/src/registrar/fixtures_users.py b/src/registrar/fixtures/fixtures_users.py
similarity index 67%
rename from src/registrar/fixtures_users.py
rename to src/registrar/fixtures/fixtures_users.py
index 7fbf41223..91b35f854 100644
--- a/src/registrar/fixtures_users.py
+++ b/src/registrar/fixtures/fixtures_users.py
@@ -23,129 +23,123 @@ class UserFixture:
"""
ADMINS = [
- {
- "username": "43a7fa8d-0550-4494-a6fe-81500324d590",
- "first_name": "Jyoti",
- "last_name": "Bock",
- "email": "jyotibock@truss.works",
- },
{
"username": "aad084c3-66cc-4632-80eb-41cdf5c5bcbf",
"first_name": "Aditi",
"last_name": "Green",
"email": "aditidevelops+01@gmail.com",
+ "title": "Positive vibes",
},
{
"username": "be17c826-e200-4999-9389-2ded48c43691",
"first_name": "Matthew",
"last_name": "Spence",
+ "title": "Hollywood hair",
},
{
"username": "5f283494-31bd-49b5-b024-a7e7cae00848",
"first_name": "Rachid",
"last_name": "Mrad",
"email": "rachid.mrad@associates.cisa.dhs.gov",
+ "title": "Common pirate",
},
{
"username": "eb2214cd-fc0c-48c0-9dbd-bc4cd6820c74",
"first_name": "Alysia",
"last_name": "Broddrick",
"email": "abroddrick@truss.works",
+ "title": "I drink coffee and know things",
},
{
"username": "8f8e7293-17f7-4716-889b-1990241cbd39",
"first_name": "Katherine",
"last_name": "Osos",
"email": "kosos@truss.works",
+ "title": "Grove keeper",
},
{
"username": "70488e0a-e937-4894-a28c-16f5949effd4",
"first_name": "Gaby",
"last_name": "DiSarli",
"email": "gaby@truss.works",
+ "title": "De Stijl",
},
{
"username": "83c2b6dd-20a2-4cac-bb40-e22a72d2955c",
"first_name": "Cameron",
"last_name": "Dixon",
"email": "cameron.dixon@cisa.dhs.gov",
- },
- {
- "username": "0353607a-cbba-47d2-98d7-e83dcd5b90ea",
- "first_name": "Ryan",
- "last_name": "Brooks",
+ "title": "Product owner",
},
{
"username": "30001ee7-0467-4df2-8db2-786e79606060",
"first_name": "Zander",
"last_name": "Adkinson",
+ "title": "ACME specialist",
},
{
"username": "2bf518c2-485a-4c42-ab1a-f5a8b0a08484",
"first_name": "Paul",
"last_name": "Kuykendall",
+ "title": "Dr. Silvertongue",
},
{
"username": "2a88a97b-be96-4aad-b99e-0b605b492c78",
"first_name": "Rebecca",
"last_name": "Hsieh",
"email": "rebecca.hsieh@truss.works",
+ "title": "Catlady",
},
{
"username": "fa69c8e8-da83-4798-a4f2-263c9ce93f52",
"first_name": "David",
"last_name": "Kennedy",
"email": "david.kennedy@ecstech.com",
+ "title": "Mean lean coding machine",
},
{
"username": "f14433d8-f0e9-41bf-9c72-b99b110e665d",
"first_name": "Nicolle",
"last_name": "LeClair",
"email": "nicolle.leclair@ecstech.com",
+ "title": "Nightowl",
},
{
"username": "24840450-bf47-4d89-8aa9-c612fe68f9da",
"first_name": "Erin",
"last_name": "Song",
+ "title": "Catlady 2",
},
{
"username": "e0ea8b94-6e53-4430-814a-849a7ca45f21",
"first_name": "Kristina",
"last_name": "Yin",
+ "title": "Hufflepuff prefect",
},
{
"username": "ac49d7c1-368a-4e6b-8f1d-60250e20a16f",
"first_name": "Vicky",
"last_name": "Chin",
"email": "szu.chin@associates.cisa.dhs.gov",
+ "title": "Ze whip",
},
{
"username": "66bb1a5a-a091-4d7f-a6cf-4d772b4711c7",
"first_name": "Christina",
"last_name": "Burnett",
"email": "christina.burnett@cisa.dhs.gov",
- },
- {
- "username": "012f844d-8a0f-4225-9d82-cbf87bff1d3e",
- "first_name": "Riley",
- "last_name": "Orr",
- "email": "riley+320@truss.works",
+ "title": "Groovy",
},
{
"username": "76612d84-66b0-4ae9-9870-81e98b9858b6",
"first_name": "Anna",
"last_name": "Gingle",
"email": "annagingle@truss.works",
+ "title": "Sweetwater sailor",
},
]
STAFF = [
- {
- "username": "a5906815-dd80-4c64-aebe-2da6a4c9d7a4",
- "first_name": "Jyoti-Analyst",
- "last_name": "Bock-Analyst",
- "email": "jyotibock+1@truss.works",
- },
{
"username": "ffec5987-aa84-411b-a05a-a7ee5cbcde54",
"first_name": "Aditi-Analyst",
@@ -231,18 +225,6 @@ class UserFixture:
"last_name": "Burnett-Analyst",
"email": "christina.burnett@gwe.cisa.dhs.gov",
},
- {
- "username": "d9839768-0c17-4fa2-9c8e-36291eef5c11",
- "first_name": "Alex-Analyst",
- "last_name": "Mcelya-Analyst",
- "email": "ALEXANDER.MCELYA@cisa.dhs.gov",
- },
- {
- "username": "082a066f-e0a4-45f6-8672-4343a1208a36",
- "first_name": "Riley-Analyst",
- "last_name": "Orr-Analyst",
- "email": "riley+321@truss.works",
- },
{
"username": "e1e350b1-cfc1-4753-a6cb-3ae6d912f99c",
"first_name": "Anna-Analyst",
@@ -254,29 +236,61 @@ class UserFixture:
# Additional emails to add to the AllowedEmail whitelist.
ADDITIONAL_ALLOWED_EMAILS: list[str] = ["davekenn4242@gmail.com", "rachid_mrad@hotmail.com"]
+ @classmethod
def load_users(cls, users, group_name, are_superusers=False):
- logger.info(f"Going to load {len(users)} users in group {group_name}")
- for user_data in users:
+ """Loads the users into the database and assigns them to the specified group."""
+ logger.info(f"Going to load {len(users)} users for group {group_name}")
+
+ group = UserGroup.objects.get(name=group_name)
+
+ # Prepare sets of existing usernames and IDs in one query
+ user_identifiers = [(user.get("username"), user.get("id")) for user in users]
+ existing_users = User.objects.filter(
+ username__in=[user[0] for user in user_identifiers] + [user[1] for user in user_identifiers]
+ ).values_list("username", "id")
+
+ existing_usernames = set(user[0] for user in existing_users)
+ existing_user_ids = set(user[1] for user in existing_users)
+
+ # Filter out users with existing IDs or usernames
+ new_users = [
+ User(
+ id=user_data.get("id"),
+ first_name=user_data.get("first_name"),
+ last_name=user_data.get("last_name"),
+ username=user_data.get("username"),
+ email=user_data.get("email", ""),
+ title=user_data.get("title", "Peon"),
+ phone=user_data.get("phone", "2022222222"),
+ is_active=user_data.get("is_active", True),
+ is_staff=True,
+ is_superuser=are_superusers,
+ )
+ for user_data in users
+ if user_data.get("username") not in existing_usernames and user_data.get("id") not in existing_user_ids
+ ]
+
+ # Perform bulk creation for new users
+ if new_users:
try:
- user, _ = User.objects.get_or_create(username=user_data["username"])
- user.is_superuser = are_superusers
- user.first_name = user_data["first_name"]
- user.last_name = user_data["last_name"]
- if "email" in user_data:
- user.email = user_data["email"]
- user.is_staff = True
- user.is_active = True
- # This verification type will get reverted to "regular" (or whichever is applicables)
- # once the user logs in for the first time (as they then got verified through different means).
- # In the meantime, we can still describe how the user got here in the first place.
- user.verification_type = User.VerificationTypeChoices.FIXTURE_USER
- group = UserGroup.objects.get(name=group_name)
- user.groups.add(group)
- user.save()
- logger.debug(f"User object created for {user_data['first_name']}")
+ User.objects.bulk_create(new_users)
+ logger.info(f"Created {len(new_users)} new users.")
except Exception as e:
- logger.warning(e)
- logger.info(f"All users in group {group_name} loaded.")
+ logger.error(f"Unexpected error during user bulk creation: {e}")
+ else:
+ logger.info("No new users to create.")
+
+ # Get all users to be updated (both new and existing)
+ created_or_existing_users = User.objects.filter(username__in=[user.get("username") for user in users])
+
+ # Filter out users who are already in the group
+ users_not_in_group = created_or_existing_users.exclude(groups__id=group.id)
+
+ # Add only users who are not already in the group
+ if users_not_in_group.exists():
+ group.user_set.add(*users_not_in_group)
+
+ logger.info(f"Users loaded for group {group_name}.")
def load_allowed_emails(cls, users, additional_emails):
"""Populates a whitelist of allowed emails (as defined in this list)"""
@@ -284,37 +298,33 @@ class UserFixture:
if additional_emails:
logger.info(f"Going to load {len(additional_emails)} additional allowed emails")
- # Load user emails
- allowed_emails = []
+ existing_emails = set(AllowedEmail.objects.values_list("email", flat=True))
+ new_allowed_emails = []
+
for user_data in users:
user_email = user_data.get("email")
- if user_email and user_email not in allowed_emails:
- allowed_emails.append(AllowedEmail(email=user_email))
- else:
- first_name = user_data.get("first_name")
- last_name = user_data.get("last_name")
- logger.warning(f"Could not add email to whitelist for {first_name} {last_name}.")
+ if user_email and user_email not in existing_emails:
+ new_allowed_emails.append(AllowedEmail(email=user_email))
- # Load additional emails
- allowed_emails.extend([AllowedEmail(email=email) for email in additional_emails])
+ # Load additional emails, only if they don't exist already
+ for email in additional_emails:
+ if email not in existing_emails:
+ new_allowed_emails.append(AllowedEmail(email=email))
- if allowed_emails:
- AllowedEmail.objects.bulk_create(allowed_emails)
- logger.info(f"Loaded {len(allowed_emails)} allowed emails")
+ if new_allowed_emails:
+ try:
+ AllowedEmail.objects.bulk_create(new_allowed_emails)
+ logger.info(f"Loaded {len(new_allowed_emails)} allowed emails")
+ except Exception as e:
+ logger.error(f"Unexpected error during allowed emails bulk creation: {e}")
else:
logger.info("No allowed emails to load")
@classmethod
def load(cls):
- # Lumped under .atomic to ensure we don't make redundant DB calls.
- # This bundles them all together, and then saves it in a single call.
- # This is slightly different then bulk_create or bulk_update, in that
- # you still get the same behaviour of .save(), but those incremental
- # steps now do not need to close/reopen a db connection,
- # instead they share one.
with transaction.atomic():
- cls.load_users(cls, cls.ADMINS, "full_access_group", are_superusers=True)
- cls.load_users(cls, cls.STAFF, "cisa_analysts_group")
+ cls.load_users(cls.ADMINS, "full_access_group", are_superusers=True)
+ cls.load_users(cls.STAFF, "cisa_analysts_group")
# Combine ADMINS and STAFF lists
all_users = cls.ADMINS + cls.STAFF
diff --git a/src/registrar/fixtures_domain_requests.py b/src/registrar/fixtures_domain_requests.py
deleted file mode 100644
index 44dd13e4c..000000000
--- a/src/registrar/fixtures_domain_requests.py
+++ /dev/null
@@ -1,236 +0,0 @@
-import logging
-import random
-from faker import Faker
-from django.db import transaction
-
-from registrar.models import User, DomainRequest, DraftDomain, Contact, Website, FederalAgency
-
-fake = Faker()
-logger = logging.getLogger(__name__)
-
-
-class DomainRequestFixture:
- """
- Load domain requests into the database.
-
- Make sure this class' `load` method is called from `handle`
- in management/commands/load.py, then use `./manage.py load`
- to run this code.
- """
-
- # any fields not specified here will be filled in with fake data or defaults
- # NOTE BENE: each fixture must have `organization_name` for uniqueness!
- # Here is a more complete example as a template:
- # {
- # "status": "started",
- # "organization_name": "Example - Just started",
- # "generic_org_type": "federal",
- # "federal_agency": None,
- # "federal_type": None,
- # "address_line1": None,
- # "address_line2": None,
- # "city": None,
- # "state_territory": None,
- # "zipcode": None,
- # "urbanization": None,
- # "purpose": None,
- # "anything_else": None,
- # "is_policy_acknowledged": None,
- # "senior_official": None,
- # "other_contacts": [],
- # "current_websites": [],
- # "alternative_domains": [],
- # },
- DA = [
- {
- "status": DomainRequest.DomainRequestStatus.STARTED,
- "organization_name": "Example - Finished but not submitted",
- },
- {
- "status": DomainRequest.DomainRequestStatus.SUBMITTED,
- "organization_name": "Example - Submitted but pending investigation",
- },
- {
- "status": DomainRequest.DomainRequestStatus.IN_REVIEW,
- "organization_name": "Example - In investigation",
- },
- {
- "status": DomainRequest.DomainRequestStatus.IN_REVIEW,
- "organization_name": "Example - Approved",
- },
- {
- "status": DomainRequest.DomainRequestStatus.WITHDRAWN,
- "organization_name": "Example - Withdrawn",
- },
- {
- "status": DomainRequest.DomainRequestStatus.ACTION_NEEDED,
- "organization_name": "Example - Action needed",
- },
- {
- "status": "rejected",
- "organization_name": "Example - Rejected",
- },
- ]
-
- @classmethod
- def fake_contact(cls):
- return {
- "first_name": fake.first_name(),
- "middle_name": None,
- "last_name": fake.last_name(),
- "title": fake.job(),
- "email": fake.ascii_safe_email(),
- "phone": "201-555-5555",
- }
-
- @classmethod
- def fake_dot_gov(cls):
- return f"{fake.slug()}.gov"
-
- @classmethod
- def _set_non_foreign_key_fields(cls, da: DomainRequest, app: dict):
- """Helper method used by `load`."""
- da.status = app["status"] if "status" in app else "started"
-
- # TODO for a future ticket: Allow for more than just "federal" here
- da.generic_org_type = app["generic_org_type"] if "generic_org_type" in app else "federal"
- da.last_submitted_date = fake.date()
- da.federal_type = (
- app["federal_type"]
- if "federal_type" in app
- else random.choice(["executive", "judicial", "legislative"]) # nosec
- )
- da.address_line1 = app["address_line1"] if "address_line1" in app else fake.street_address()
- da.address_line2 = app["address_line2"] if "address_line2" in app else None
- da.city = app["city"] if "city" in app else fake.city()
- da.state_territory = app["state_territory"] if "state_territory" in app else fake.state_abbr()
- da.zipcode = app["zipcode"] if "zipcode" in app else fake.postalcode()
- da.urbanization = app["urbanization"] if "urbanization" in app else None
- da.purpose = app["purpose"] if "purpose" in app else fake.paragraph()
- da.anything_else = app["anything_else"] if "anything_else" in app else None
- da.is_policy_acknowledged = app["is_policy_acknowledged"] if "is_policy_acknowledged" in app else True
-
- @classmethod
- def _set_foreign_key_fields(cls, da: DomainRequest, app: dict, user: User):
- """Helper method used by `load`."""
- if not da.investigator:
- da.investigator = User.objects.get(username=user.username) if "investigator" in app else None
-
- if not da.senior_official:
- if "senior_official" in app and app["senior_official"] is not None:
- da.senior_official, _ = Contact.objects.get_or_create(**app["senior_official"])
- else:
- da.senior_official = Contact.objects.create(**cls.fake_contact())
-
- if not da.requested_domain:
- if "requested_domain" in app and app["requested_domain"] is not None:
- da.requested_domain, _ = DraftDomain.objects.get_or_create(name=app["requested_domain"])
- else:
- da.requested_domain = DraftDomain.objects.create(name=cls.fake_dot_gov())
- if not da.federal_agency:
- if "federal_agency" in app and app["federal_agency"] is not None:
- da.federal_agency, _ = FederalAgency.objects.get_or_create(name=app["federal_agency"])
- else:
- federal_agencies = FederalAgency.objects.all()
- # Random choice of agency for selects, used as placeholders for testing.
- da.federal_agency = random.choice(federal_agencies) # nosec
-
- @classmethod
- def _set_many_to_many_relations(cls, da: DomainRequest, app: dict):
- """Helper method used by `load`."""
- if "other_contacts" in app:
- for contact in app["other_contacts"]:
- da.other_contacts.add(Contact.objects.get_or_create(**contact)[0])
- elif not da.other_contacts.exists():
- other_contacts = [
- Contact.objects.create(**cls.fake_contact()) for _ in range(random.randint(0, 3)) # nosec
- ]
- da.other_contacts.add(*other_contacts)
-
- if "current_websites" in app:
- for website in app["current_websites"]:
- da.current_websites.add(Website.objects.get_or_create(website=website)[0])
- elif not da.current_websites.exists():
- current_websites = [
- Website.objects.create(website=fake.uri()) for _ in range(random.randint(0, 3)) # nosec
- ]
- da.current_websites.add(*current_websites)
-
- if "alternative_domains" in app:
- for domain in app["alternative_domains"]:
- da.alternative_domains.add(Website.objects.get_or_create(website=domain)[0])
- elif not da.alternative_domains.exists():
- alternative_domains = [
- Website.objects.create(website=cls.fake_dot_gov()) for _ in range(random.randint(0, 3)) # nosec
- ]
- da.alternative_domains.add(*alternative_domains)
-
- @classmethod
- def load(cls):
- """Creates domain requests for each user in the database."""
- logger.info("Going to load %s domain requests" % len(cls.DA))
- try:
- users = list(User.objects.all()) # force evaluation to catch db errors
- except Exception as e:
- logger.warning(e)
- return
-
- # Lumped under .atomic to ensure we don't make redundant DB calls.
- # This bundles them all together, and then saves it in a single call.
- with transaction.atomic():
- cls._create_domain_requests(users)
-
- @classmethod
- def _create_domain_requests(cls, users):
- """Creates DomainRequests given a list of users"""
- for user in users:
- logger.debug("Loading domain requests for %s" % user)
- for app in cls.DA:
- try:
- da, _ = DomainRequest.objects.get_or_create(
- creator=user,
- organization_name=app["organization_name"],
- )
- cls._set_non_foreign_key_fields(da, app)
- cls._set_foreign_key_fields(da, app, user)
- da.save()
- cls._set_many_to_many_relations(da, app)
- except Exception as e:
- logger.warning(e)
-
-
-class DomainFixture(DomainRequestFixture):
- """Create one domain and permissions on it for each user."""
-
- @classmethod
- def load(cls):
- try:
- users = list(User.objects.all()) # force evaluation to catch db errors
- except Exception as e:
- logger.warning(e)
- return
-
- # Lumped under .atomic to ensure we don't make redundant DB calls.
- # This bundles them all together, and then saves it in a single call.
- with transaction.atomic():
- # approve each user associated with `in review` status domains
- DomainFixture._approve_domain_requests(users)
-
- @staticmethod
- def _approve_domain_requests(users):
- """Approves all provided domain requests if they are in the state in_review"""
- for user in users:
- domain_request = DomainRequest.objects.filter(
- creator=user, status=DomainRequest.DomainRequestStatus.IN_REVIEW
- ).last()
- logger.debug(f"Approving {domain_request} for {user}")
-
- # All approvals require an investigator, so if there is none,
- # assign one.
- if domain_request.investigator is None:
- # All "users" in fixtures have admin perms per prior config.
- # No need to check for that.
- domain_request.investigator = random.choice(users) # nosec
-
- domain_request.approve(send_email=False)
- domain_request.save()
diff --git a/src/registrar/management/commands/load.py b/src/registrar/management/commands/load.py
index aac5eade1..a4cdb1ed5 100644
--- a/src/registrar/management/commands/load.py
+++ b/src/registrar/management/commands/load.py
@@ -1,11 +1,13 @@
import logging
from django.core.management.base import BaseCommand
-from auditlog.context import disable_auditlog # type: ignore
-
-
-from registrar.fixtures_users import UserFixture
-from registrar.fixtures_domain_requests import DomainRequestFixture, DomainFixture
+from auditlog.context import disable_auditlog
+from registrar.fixtures.fixtures_domains import DomainFixture
+from registrar.fixtures.fixtures_portfolios import PortfolioFixture
+from registrar.fixtures.fixtures_requests import DomainRequestFixture
+from registrar.fixtures.fixtures_suborganizations import SuborganizationFixture
+from registrar.fixtures.fixtures_user_portfolio_permissions import UserPortfolioPermissionFixture
+from registrar.fixtures.fixtures_users import UserFixture # type: ignore
logger = logging.getLogger(__name__)
@@ -16,6 +18,9 @@ class Command(BaseCommand):
# https://github.com/jazzband/django-auditlog/issues/17
with disable_auditlog():
UserFixture.load()
+ PortfolioFixture.load()
+ SuborganizationFixture.load()
DomainRequestFixture.load()
DomainFixture.load()
+ UserPortfolioPermissionFixture.load()
logger.info("All fixtures loaded.")
diff --git a/src/registrar/migrations/0129_alter_portfolioinvitation_portfolio_roles_and_more.py b/src/registrar/migrations/0129_alter_portfolioinvitation_portfolio_roles_and_more.py
new file mode 100644
index 000000000..dae994f8e
--- /dev/null
+++ b/src/registrar/migrations/0129_alter_portfolioinvitation_portfolio_roles_and_more.py
@@ -0,0 +1,40 @@
+# Generated by Django 4.2.10 on 2024-09-25 00:49
+
+import django.contrib.postgres.fields
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("registrar", "0128_alter_domaininformation_state_territory_and_more"),
+ ]
+
+ operations = [
+ migrations.AlterField(
+ model_name="portfolioinvitation",
+ name="portfolio_roles",
+ field=django.contrib.postgres.fields.ArrayField(
+ base_field=models.CharField(
+ choices=[("organization_admin", "Admin"), ("organization_member", "Member")], max_length=50
+ ),
+ blank=True,
+ help_text="Select one or more roles.",
+ null=True,
+ size=None,
+ ),
+ ),
+ migrations.AlterField(
+ model_name="userportfoliopermission",
+ name="roles",
+ field=django.contrib.postgres.fields.ArrayField(
+ base_field=models.CharField(
+ choices=[("organization_admin", "Admin"), ("organization_member", "Member")], max_length=50
+ ),
+ blank=True,
+ help_text="Select one or more roles.",
+ null=True,
+ size=None,
+ ),
+ ),
+ ]
diff --git a/src/registrar/models/user_group.py b/src/registrar/models/user_group.py
index 84b4da701..4770f34bc 100644
--- a/src/registrar/models/user_group.py
+++ b/src/registrar/models/user_group.py
@@ -137,7 +137,6 @@ class UserGroup(Group):
+ cisa_analysts_group.name
)
- cisa_analysts_group.save()
logger.debug("CISA Analyst permissions added to group " + cisa_analysts_group.name)
except Exception as e:
logger.error(f"Error creating analyst permissions group: {e}")
@@ -159,7 +158,6 @@ class UserGroup(Group):
# Assign all permissions to the group
full_access_group.permissions.add(*all_permissions)
- full_access_group.save()
logger.debug("All permissions added to group " + full_access_group.name)
except Exception as e:
logger.error(f"Error creating full access group: {e}")
diff --git a/src/registrar/models/user_portfolio_permission.py b/src/registrar/models/user_portfolio_permission.py
index f021fc6bf..241afd328 100644
--- a/src/registrar/models/user_portfolio_permission.py
+++ b/src/registrar/models/user_portfolio_permission.py
@@ -15,8 +15,6 @@ class UserPortfolioPermission(TimeStampedModel):
PORTFOLIO_ROLE_PERMISSIONS = {
UserPortfolioRoleChoices.ORGANIZATION_ADMIN: [
UserPortfolioPermissionChoices.VIEW_ALL_DOMAINS,
- UserPortfolioPermissionChoices.VIEW_MEMBERS,
- UserPortfolioPermissionChoices.EDIT_MEMBERS,
UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS,
UserPortfolioPermissionChoices.EDIT_REQUESTS,
UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
@@ -25,14 +23,6 @@ class UserPortfolioPermission(TimeStampedModel):
UserPortfolioPermissionChoices.VIEW_SUBORGANIZATION,
UserPortfolioPermissionChoices.EDIT_SUBORGANIZATION,
],
- UserPortfolioRoleChoices.ORGANIZATION_ADMIN_READ_ONLY: [
- UserPortfolioPermissionChoices.VIEW_ALL_DOMAINS,
- UserPortfolioPermissionChoices.VIEW_MEMBERS,
- UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS,
- UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
- # Domain: field specific permissions
- UserPortfolioPermissionChoices.VIEW_SUBORGANIZATION,
- ],
UserPortfolioRoleChoices.ORGANIZATION_MEMBER: [
UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
],
diff --git a/src/registrar/models/utility/portfolio_helper.py b/src/registrar/models/utility/portfolio_helper.py
index ef9336f18..ddb487f71 100644
--- a/src/registrar/models/utility/portfolio_helper.py
+++ b/src/registrar/models/utility/portfolio_helper.py
@@ -7,7 +7,6 @@ class UserPortfolioRoleChoices(models.TextChoices):
"""
ORGANIZATION_ADMIN = "organization_admin", "Admin"
- ORGANIZATION_ADMIN_READ_ONLY = "organization_admin_read_only", "Admin read only"
ORGANIZATION_MEMBER = "organization_member", "Member"
@classmethod
diff --git a/src/registrar/templates/includes/header_extended.html b/src/registrar/templates/includes/header_extended.html
index ab53dd5cf..a3b2364a9 100644
--- a/src/registrar/templates/includes/header_extended.html
+++ b/src/registrar/templates/includes/header_extended.html
@@ -91,9 +91,9 @@
{% endif %}
- {% if has_organization_members_flag %}
+ {% if has_organization_members_flag and has_view_members_portfolio_permission %}
-
+
Members
diff --git a/src/registrar/templates/includes/members_table.html b/src/registrar/templates/includes/members_table.html
new file mode 100644
index 000000000..529d2629d
--- /dev/null
+++ b/src/registrar/templates/includes/members_table.html
@@ -0,0 +1,80 @@
+{% load static %}
+
+
+
+{% comment %} Stores the json endpoint in a url for easier access {% endcomment %}
+{% url 'get_portfolio_members_json' as url %}
+{{url}}
+
+
+
+
+
+
+
You don't have any members.
+
+
+
+
diff --git a/src/registrar/templates/portfolio_members.html b/src/registrar/templates/portfolio_members.html
new file mode 100644
index 000000000..82e06c808
--- /dev/null
+++ b/src/registrar/templates/portfolio_members.html
@@ -0,0 +1,33 @@
+{% extends 'portfolio_base.html' %}
+
+{% load static %}
+
+{% block title %} Members | {% endblock %}
+
+{% block wrapper_class %}
+ {{ block.super }} dashboard--grey-1
+{% endblock %}
+
+{% block portfolio_content %}
+{% block messages %}
+ {% include "includes/form_messages.html" %}
+{% endblock %}
+
+
+
+
+ {% include "includes/members_table.html" with portfolio=portfolio %}
+
+{% endblock %}
diff --git a/src/registrar/templatetags/custom_filters.py b/src/registrar/templatetags/custom_filters.py
index 0bde40bb9..a3f35ae8e 100644
--- a/src/registrar/templatetags/custom_filters.py
+++ b/src/registrar/templatetags/custom_filters.py
@@ -241,6 +241,17 @@ def is_portfolio_subpage(path):
return get_url_name(path) in url_names
+@register.filter(name="is_members_subpage")
+def is_members_subpage(path):
+ """Checks if the given page is a subpage of members.
+ Takes a path name, like '/organization/'."""
+ # Since our pages aren't unified under a common path, we need this approach for now.
+ url_names = [
+ "members",
+ ]
+ return get_url_name(path) in url_names
+
+
@register.filter(name="portfolio_role_summary")
def portfolio_role_summary(user, portfolio):
"""Returns the value of user.portfolio_role_summary"""
diff --git a/src/registrar/tests/test_reports.py b/src/registrar/tests/test_reports.py
index ed2b75791..797592806 100644
--- a/src/registrar/tests/test_reports.py
+++ b/src/registrar/tests/test_reports.py
@@ -356,11 +356,6 @@ class ExportDataTest(MockDbForIndividualTests, MockEppLib):
self.assertIn(self.domain_3.name, csv_content)
self.assertNotIn(self.domain_2.name, csv_content)
- # Test the output for readonly admin
- portfolio_permission.roles = [UserPortfolioRoleChoices.ORGANIZATION_ADMIN_READ_ONLY]
- portfolio_permission.save()
- portfolio_permission.refresh_from_db()
-
# Get the csv content
csv_content = self._run_domain_data_type_user_export(request)
self.assertIn(self.domain_1.name, csv_content)
diff --git a/src/registrar/tests/test_url_auth.py b/src/registrar/tests/test_url_auth.py
index 284ec7638..1cd2d1384 100644
--- a/src/registrar/tests/test_url_auth.py
+++ b/src/registrar/tests/test_url_auth.py
@@ -116,6 +116,7 @@ class TestURLAuth(TestCase):
"/api/v1/available/",
"/api/v1/get-report/current-federal",
"/api/v1/get-report/current-full",
+ "/api/v1/rdap/",
"/health",
]
diff --git a/src/registrar/tests/test_views_domain.py b/src/registrar/tests/test_views_domain.py
index 8fb92df72..127b78a4a 100644
--- a/src/registrar/tests/test_views_domain.py
+++ b/src/registrar/tests/test_views_domain.py
@@ -1568,7 +1568,7 @@ class TestDomainSuborganization(TestDomainOverview):
# Add portfolio perms to the user object
portfolio_permission, _ = UserPortfolioPermission.objects.get_or_create(
- user=self.user, portfolio=portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN_READ_ONLY]
+ user=self.user, portfolio=portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER]
)
self.assertEqual(self.domain_information.sub_organization, suborg)
diff --git a/src/registrar/tests/test_views_members_json.py b/src/registrar/tests/test_views_members_json.py
new file mode 100644
index 000000000..75c3a3a66
--- /dev/null
+++ b/src/registrar/tests/test_views_members_json.py
@@ -0,0 +1,175 @@
+from django.urls import reverse
+
+from registrar.models.portfolio import Portfolio
+from registrar.models.user import User
+from registrar.models.user_portfolio_permission import UserPortfolioPermission
+from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
+from .test_views import TestWithUser
+from django_webtest import WebTest # type: ignore
+
+
+class GetPortfolioMembersJsonTest(TestWithUser, WebTest):
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ # Create additional users
+ cls.user2 = User.objects.create(
+ username="test_user2",
+ first_name="Second",
+ last_name="User",
+ email="second@example.com",
+ phone="8003112345",
+ title="Member",
+ )
+ cls.user3 = User.objects.create(
+ username="test_user3",
+ first_name="Third",
+ last_name="User",
+ email="third@example.com",
+ phone="8003113456",
+ title="Member",
+ )
+ cls.user4 = User.objects.create(
+ username="test_user4",
+ first_name="Fourth",
+ last_name="User",
+ email="fourth@example.com",
+ phone="8003114567",
+ title="Admin",
+ )
+
+ # Create Portfolio
+ cls.portfolio = Portfolio.objects.create(creator=cls.user, organization_name="Test Portfolio")
+
+ # Assign permissions
+ UserPortfolioPermission.objects.create(
+ user=cls.user,
+ portfolio=cls.portfolio,
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
+ additional_permissions=[
+ UserPortfolioPermissionChoices.VIEW_MEMBERS,
+ UserPortfolioPermissionChoices.EDIT_MEMBERS,
+ ],
+ )
+ UserPortfolioPermission.objects.create(
+ user=cls.user2,
+ portfolio=cls.portfolio,
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
+ )
+ UserPortfolioPermission.objects.create(
+ user=cls.user3,
+ portfolio=cls.portfolio,
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
+ )
+ UserPortfolioPermission.objects.create(
+ user=cls.user4,
+ portfolio=cls.portfolio,
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
+ )
+
+ def setUp(self):
+ super().setUp()
+ self.app.set_user(self.user.username)
+
+ def test_get_portfolio_members_json_authenticated(self):
+ """Test that portfolio members are returned properly for an authenticated user."""
+ response = self.app.get(reverse("get_portfolio_members_json"), params={"portfolio": self.portfolio.id})
+ self.assertEqual(response.status_code, 200)
+ data = response.json
+
+ # Check pagination info
+ self.assertEqual(data["page"], 1)
+ self.assertFalse(data["has_previous"])
+ self.assertFalse(data["has_next"])
+ self.assertEqual(data["num_pages"], 1)
+ self.assertEqual(data["total"], 4)
+ self.assertEqual(data["unfiltered_total"], 4)
+
+ # Check the number of members
+ self.assertEqual(len(data["members"]), 4)
+
+ # Check member fields
+ expected_emails = {self.user.email, self.user2.email, self.user3.email, self.user4.email}
+ actual_emails = {member["email"] for member in data["members"]}
+ self.assertEqual(expected_emails, actual_emails)
+
+ def test_pagination(self):
+ """Test that pagination works properly when there are more members than page size."""
+ # Create additional members to exceed page size of 10
+ for i in range(5, 15):
+ user, _ = User.objects.get_or_create(
+ username=f"test_user{i}",
+ first_name=f"User{i}",
+ last_name=f"Last{i}",
+ email=f"user{i}@example.com",
+ phone=f"80031156{i}",
+ title="Member",
+ )
+ UserPortfolioPermission.objects.create(
+ user=user,
+ portfolio=self.portfolio,
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
+ )
+
+ response = self.app.get(
+ reverse("get_portfolio_members_json"), params={"portfolio": self.portfolio.id, "page": 1}
+ )
+ self.assertEqual(response.status_code, 200)
+ data = response.json
+
+ # Check pagination info
+ self.assertEqual(data["page"], 1)
+ self.assertTrue(data["has_next"])
+ self.assertFalse(data["has_previous"])
+ self.assertEqual(data["num_pages"], 2)
+ self.assertEqual(data["total"], 14)
+ self.assertEqual(data["unfiltered_total"], 14)
+
+ # Check the number of members on page 1
+ self.assertEqual(len(data["members"]), 10)
+
+ response = self.app.get(
+ reverse("get_portfolio_members_json"), params={"portfolio": self.portfolio.id, "page": 2}
+ )
+ self.assertEqual(response.status_code, 200)
+ data = response.json
+
+ # Check pagination info for page 2
+ self.assertEqual(data["page"], 2)
+ self.assertFalse(data["has_next"])
+ self.assertTrue(data["has_previous"])
+ self.assertEqual(data["num_pages"], 2)
+
+ # Check the number of members on page 2
+ self.assertEqual(len(data["members"]), 4)
+
+ def test_search(self):
+ """Test search functionality for portfolio members."""
+ # Search by name
+ response = self.app.get(
+ reverse("get_portfolio_members_json"), params={"portfolio": self.portfolio.id, "search_term": "Second"}
+ )
+ self.assertEqual(response.status_code, 200)
+ data = response.json
+ self.assertEqual(len(data["members"]), 1)
+ self.assertEqual(data["members"][0]["name"], "Second User")
+ self.assertEqual(data["members"][0]["email"], "second@example.com")
+
+ # Search by email
+ response = self.app.get(
+ reverse("get_portfolio_members_json"),
+ params={"portfolio": self.portfolio.id, "search_term": "fourth@example.com"},
+ )
+ self.assertEqual(response.status_code, 200)
+ data = response.json
+ self.assertEqual(len(data["members"]), 1)
+ self.assertEqual(data["members"][0]["email"], "fourth@example.com")
+
+ # Search with no matching results
+ response = self.app.get(
+ reverse("get_portfolio_members_json"), params={"portfolio": self.portfolio.id, "search_term": "NonExistent"}
+ )
+ self.assertEqual(response.status_code, 200)
+ data = response.json
+ self.assertEqual(len(data["members"]), 0)
diff --git a/src/registrar/tests/test_views_portfolio.py b/src/registrar/tests/test_views_portfolio.py
index e7c593a45..dfb0469d0 100644
--- a/src/registrar/tests/test_views_portfolio.py
+++ b/src/registrar/tests/test_views_portfolio.py
@@ -10,6 +10,7 @@ from registrar.models import (
UserDomainRole,
User,
)
+from registrar.models.user_group import UserGroup
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from .common import MockSESClient, completed_domain_request, create_test_user
@@ -666,6 +667,195 @@ class TestPortfolio(WebTest):
self.assertContains(home, "Hotel California")
self.assertContains(home, "Members")
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_cannot_view_members_table(self):
+ """Test that user without proper permission is denied access to members view"""
+
+ # Users can only view the members table if they have
+ # Portfolio Permission "view_members" selected.
+ # NOTE: Admins, by default, do NOT have permission
+ # to view/edit members. This must be enabled explicitly
+ # in the "additional permissions" section for a portfolio
+ # permission.
+ #
+ # Scenarios to test include;
+ # (1) - User is not admin and can view portfolio, but not the members table
+ # (1) - User is admin and can view portfolio, but not the members table
+
+ # --- non-admin
+ self.app.set_user(self.user.username)
+
+ UserPortfolioPermission.objects.get_or_create(
+ user=self.user,
+ portfolio=self.portfolio,
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
+ additional_permissions=[
+ UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
+ ],
+ )
+ # Verify that the user cannot access the members page
+ # This will redirect the user to the members page.
+ self.client.force_login(self.user)
+ response = self.client.get(reverse("members"), follow=True)
+ # Assert the response is a 403 Forbidden
+ self.assertEqual(response.status_code, 403)
+
+ # --- admin
+ UserPortfolioPermission.objects.filter(user=self.user, portfolio=self.portfolio).update(
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
+ )
+
+ # Verify that the user cannot access the members page
+ # This will redirect the user to the members page.
+ response = self.client.get(reverse("members"), follow=True)
+ # Assert the response is a 403 Forbidden
+ self.assertEqual(response.status_code, 403)
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_can_view_members_table(self):
+ """Test that user with proper permission is able to access members view"""
+
+ self.app.set_user(self.user.username)
+
+ UserPortfolioPermission.objects.get_or_create(
+ user=self.user,
+ portfolio=self.portfolio,
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
+ additional_permissions=[
+ UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
+ UserPortfolioPermissionChoices.VIEW_MEMBERS,
+ ],
+ )
+
+ # Verify that the user can access the members page
+ # This will redirect the user to the members page.
+ self.client.force_login(self.user)
+ response = self.client.get(reverse("members"), follow=True)
+ # Make sure the page loaded
+ self.assertEqual(response.status_code, 200)
+
+ # ---- Useful debugging stub to see what "assertContains" is finding
+ # pattern = r'Members'
+ # matches = re.findall(pattern, response.content.decode('utf-8'))
+ # for match in matches:
+ # TerminalHelper.colorful_logger(logger.info, TerminalColors.OKCYAN, f"{match}")
+
+ # Make sure the page loaded
+ self.assertContains(response, "Members")
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_can_manage_members(self):
+ """Test that user with proper permission is able to manage members"""
+ user = self.user
+ self.app.set_user(user.username)
+
+ # give user permissions to view AND manage members
+ UserPortfolioPermission.objects.get_or_create(
+ user=self.user,
+ portfolio=self.portfolio,
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
+ additional_permissions=[
+ UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
+ UserPortfolioPermissionChoices.VIEW_MEMBERS,
+ UserPortfolioPermissionChoices.EDIT_MEMBERS,
+ ],
+ )
+
+ # Give user permissions to modify user objects in the DB
+ group, _ = UserGroup.objects.get_or_create(name="full_access_group")
+ # Add the user to the group
+ user.groups.set([group])
+
+ # Verify that the user can access the members page
+ # This will redirect the user to the members page.
+ self.client.force_login(self.user)
+ response = self.client.get(reverse("members"), follow=True)
+ # Make sure the page loaded
+ self.assertEqual(response.status_code, 200)
+
+ # Verify that manage settings are sent in the dynamic HTML
+ self.client.force_login(self.user)
+ response = self.client.get(reverse("get_portfolio_members_json") + f"?portfolio={self.portfolio.pk}")
+ self.assertContains(response, '"action_label": "Manage"')
+ self.assertContains(response, '"svg_icon": "settings"')
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_view_only_members(self):
+ """Test that user with view only permission settings can only
+ view members (not manage them)"""
+ user = self.user
+ self.app.set_user(user.username)
+
+ # give user permissions to view AND manage members
+ UserPortfolioPermission.objects.get_or_create(
+ user=self.user,
+ portfolio=self.portfolio,
+ additional_permissions=[
+ UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
+ UserPortfolioPermissionChoices.VIEW_MEMBERS,
+ ],
+ )
+ # Give user permissions to modify user objects in the DB
+ group, _ = UserGroup.objects.get_or_create(name="full_access_group")
+ # Add the user to the group
+ user.groups.set([group])
+
+ # Verify that the user can access the members page
+ # This will redirect the user to the members page.
+ self.client.force_login(self.user)
+ response = self.client.get(reverse("members"), follow=True)
+ # Make sure the page loaded
+ self.assertEqual(response.status_code, 200)
+
+ # Verify that view-only settings are sent in the dynamic HTML
+ response = self.client.get(reverse("get_portfolio_members_json") + f"?portfolio={self.portfolio.pk}")
+ print(response.content)
+ self.assertContains(response, '"action_label": "View"')
+ self.assertContains(response, '"svg_icon": "visibility"')
+
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_members_admin_detection(self):
+ """Test that user with proper permission is able to manage members"""
+ user = self.user
+ self.app.set_user(user.username)
+
+ # give user permissions to view AND manage members
+ UserPortfolioPermission.objects.get_or_create(
+ user=self.user,
+ portfolio=self.portfolio,
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
+ additional_permissions=[
+ UserPortfolioPermissionChoices.VIEW_PORTFOLIO,
+ UserPortfolioPermissionChoices.VIEW_MEMBERS,
+ UserPortfolioPermissionChoices.EDIT_MEMBERS,
+ ],
+ )
+
+ # Give user permissions to modify user objects in the DB
+ group, _ = UserGroup.objects.get_or_create(name="full_access_group")
+ # Add the user to the group
+ user.groups.set([group])
+
+ # Verify that the user can access the members page
+ # This will redirect the user to the members page.
+ self.client.force_login(self.user)
+ response = self.client.get(reverse("members"), follow=True)
+ # Make sure the page loaded
+ self.assertEqual(response.status_code, 200)
+ # Verify that admin info is sent in the dynamic HTML
+ response = self.client.get(reverse("get_portfolio_members_json") + f"?portfolio={self.portfolio.pk}")
+ # TerminalHelper.colorful_logger(logger.info, TerminalColors.OKCYAN, f"{response.content}")
+ self.assertContains(response, '"is_admin": true')
+
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
def test_portfolio_domain_requests_page_when_user_has_no_permissions(self):
diff --git a/src/registrar/views/portfolio_members_json.py b/src/registrar/views/portfolio_members_json.py
new file mode 100644
index 000000000..133e6750e
--- /dev/null
+++ b/src/registrar/views/portfolio_members_json.py
@@ -0,0 +1,125 @@
+from django.http import JsonResponse
+from django.core.paginator import Paginator
+from django.contrib.auth.decorators import login_required
+from django.db.models import Q
+
+from registrar.models.portfolio_invitation import PortfolioInvitation
+from registrar.models.user import User
+from registrar.models.user_portfolio_permission import UserPortfolioPermission
+from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
+
+
+@login_required
+def get_portfolio_members_json(request):
+ """Given the current request,
+ get all members that are associated with the given portfolio"""
+ portfolio = request.GET.get("portfolio")
+ member_ids = get_member_ids_from_request(request, portfolio)
+ objects = User.objects.filter(id__in=member_ids)
+
+ admin_ids = UserPortfolioPermission.objects.filter(
+ portfolio=portfolio,
+ roles__overlap=[
+ UserPortfolioRoleChoices.ORGANIZATION_ADMIN,
+ ],
+ ).values_list("user__id", flat=True)
+ portfolio_invitation_emails = PortfolioInvitation.objects.filter(portfolio=portfolio).values_list(
+ "email", flat=True
+ )
+
+ unfiltered_total = objects.count()
+
+ objects = apply_search(objects, request)
+ # objects = apply_status_filter(objects, request)
+ objects = apply_sorting(objects, request)
+
+ paginator = Paginator(objects, 10)
+ page_number = request.GET.get("page", 1)
+ page_obj = paginator.get_page(page_number)
+ members = [
+ serialize_members(request, portfolio, member, request.user, admin_ids, portfolio_invitation_emails)
+ for member in page_obj.object_list
+ ]
+
+ return JsonResponse(
+ {
+ "members": members,
+ "page": page_obj.number,
+ "num_pages": paginator.num_pages,
+ "has_previous": page_obj.has_previous(),
+ "has_next": page_obj.has_next(),
+ "total": paginator.count,
+ "unfiltered_total": unfiltered_total,
+ }
+ )
+
+
+def get_member_ids_from_request(request, portfolio):
+ """Given the current request,
+ get all members that are associated with the given portfolio"""
+ member_ids = []
+ if portfolio:
+ member_ids = UserPortfolioPermission.objects.filter(portfolio=portfolio).values_list("user__id", flat=True)
+ return member_ids
+
+
+def apply_search(queryset, request):
+ search_term = request.GET.get("search_term")
+
+ if search_term:
+ queryset = queryset.filter(
+ Q(username__icontains=search_term)
+ | Q(first_name__icontains=search_term)
+ | Q(last_name__icontains=search_term)
+ | Q(email__icontains=search_term)
+ )
+ return queryset
+
+
+def apply_sorting(queryset, request):
+ sort_by = request.GET.get("sort_by", "id") # Default to 'id'
+ order = request.GET.get("order", "asc") # Default to 'asc'
+
+ if sort_by == "member":
+ sort_by = ["email", "first_name", "middle_name", "last_name"]
+ else:
+ sort_by = [sort_by]
+
+ if order == "desc":
+ sort_by = [f"-{field}" for field in sort_by]
+
+ return queryset.order_by(*sort_by)
+
+
+def serialize_members(request, portfolio, member, user, admin_ids, portfolio_invitation_emails):
+ # ------- VIEW ONLY
+ # If not view_only (the user has permissions to edit/manage users), show the gear icon with "Manage" link.
+ # If view_only (the user only has view user permissions), show the "View" link (no gear icon).
+ # We check on user_group_permision to account for the upcoming "Manage portfolio" button on admin.
+ user_can_edit_other_users = False
+ for user_group_permission in ["registrar.full_access_permission", "registrar.change_user"]:
+ if user.has_perm(user_group_permission):
+ user_can_edit_other_users = True
+ break
+
+ view_only = not user.has_edit_members_portfolio_permission(portfolio) or not user_can_edit_other_users
+
+ # ------- USER STATUSES
+ is_invited = member.email in portfolio_invitation_emails
+ last_active = "Invited" if is_invited else "Unknown"
+ if member.last_login:
+ last_active = member.last_login.strftime("%b. %d, %Y")
+ is_admin = member.id in admin_ids
+
+ # ------- SERIALIZE
+ member_json = {
+ "id": member.id,
+ "name": member.get_formatted_name(),
+ "email": member.email,
+ "is_admin": is_admin,
+ "last_active": last_active,
+ "action_url": "#", # reverse("members", kwargs={"pk": member.id}), # TODO: Future ticket?
+ "action_label": ("View" if view_only else "Manage"),
+ "svg_icon": ("visibility" if view_only else "settings"),
+ }
+ return member_json
diff --git a/src/registrar/views/portfolios.py b/src/registrar/views/portfolios.py
index 885dca636..552fdb6ff 100644
--- a/src/registrar/views/portfolios.py
+++ b/src/registrar/views/portfolios.py
@@ -12,6 +12,7 @@ from registrar.views.utility.permission_views import (
PortfolioDomainsPermissionView,
PortfolioBasePermissionView,
NoPortfolioDomainsPermissionView,
+ PortfolioMembersPermissionView,
)
from django.views.generic import View
from django.views.generic.edit import FormMixin
@@ -41,6 +42,15 @@ class PortfolioDomainRequestsView(PortfolioDomainRequestsPermissionView, View):
return render(request, "portfolio_requests.html")
+class PortfolioMembersView(PortfolioMembersPermissionView, View):
+
+ template_name = "portfolio_members.html"
+
+ def get(self, request):
+ """Add additional context data to the template."""
+ return render(request, "portfolio_members.html")
+
+
class PortfolioNoDomainsView(NoPortfolioDomainsPermissionView, View):
"""Some users have access to the underlying portfolio, but not any domains.
This is a custom view which explains that to the user - and denotes who to contact.
diff --git a/src/registrar/views/utility/mixins.py b/src/registrar/views/utility/mixins.py
index d8c48e01e..2cb2a599b 100644
--- a/src/registrar/views/utility/mixins.py
+++ b/src/registrar/views/utility/mixins.py
@@ -490,7 +490,7 @@ class PortfolioMembersPermission(PortfolioBasePermission):
up from the portfolio's primary key in self.kwargs["pk"]"""
portfolio = self.request.session.get("portfolio")
- if not self.request.user.has_view_members(portfolio):
+ if not self.request.user.has_view_members_portfolio_permission(portfolio):
return False
return super().has_permission()