diff --git a/src/registrar/templates/portfolio_requests.html b/src/registrar/templates/portfolio_requests.html
index 467141077..5a9197139 100644
--- a/src/registrar/templates/portfolio_requests.html
+++ b/src/registrar/templates/portfolio_requests.html
@@ -19,7 +19,7 @@
{% if has_edit_request_portfolio_permission %}
-
Domain requests can only be modified by the person who created the request.
+
Domain requests can only be modified by the person who created the request.
diff --git a/src/registrar/tests/test_admin.py b/src/registrar/tests/test_admin.py
index a259e5bef..dceb3a79e 100644
--- a/src/registrar/tests/test_admin.py
+++ b/src/registrar/tests/test_admin.py
@@ -2,6 +2,8 @@ from datetime import datetime
from django.utils import timezone
from django.test import TestCase, RequestFactory, Client
from django.contrib.admin.sites import AdminSite
+from registrar.utility.email import EmailSendingError
+from registrar.utility.errors import MissingEmailError
from waffle.testutils import override_flag
from django_webtest import WebTest # type: ignore
from api.tests.common import less_console_noise_decorator
@@ -277,6 +279,29 @@ class TestUserPortfolioPermissionAdmin(TestCase):
# Should return the forbidden permissions for member role
self.assertEqual(member_only_permissions, set(member_forbidden))
+ @less_console_noise_decorator
+ def test_has_change_form_description(self):
+ """Tests if this model has a model description on the change form view"""
+ self.client.force_login(self.superuser)
+
+ user_portfolio_permission, _ = UserPortfolioPermission.objects.get_or_create(
+ user=self.superuser, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
+ )
+
+ response = self.client.get(
+ "/admin/registrar/userportfoliopermission/{}/change/".format(user_portfolio_permission.pk),
+ follow=True,
+ )
+
+ # Make sure that the page is loaded correctly
+ self.assertEqual(response.status_code, 200)
+
+ # Test for a description snippet
+ self.assertContains(
+ response,
+ "If you add someone to a portfolio here, it will not trigger an invitation email.",
+ )
+
class TestPortfolioInvitationAdmin(TestCase):
"""Tests for the PortfolioInvitationAdmin class as super user
@@ -432,6 +457,30 @@ class TestPortfolioInvitationAdmin(TestCase):
)
self.assertContains(response, "Show more")
+ @less_console_noise_decorator
+ def test_has_change_form_description(self):
+ """Tests if this model has a model description on the change form view"""
+ self.client.force_login(self.superuser)
+
+ invitation, _ = PortfolioInvitation.objects.get_or_create(
+ email=self.superuser.email, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
+ )
+
+ response = self.client.get(
+ "/admin/registrar/portfolioinvitation/{}/change/".format(invitation.pk),
+ follow=True,
+ )
+
+ # Make sure that the page is loaded correctly
+ self.assertEqual(response.status_code, 200)
+
+ # Test for a description snippet
+ self.assertContains(
+ response,
+ "If you add someone to a portfolio here, it will trigger an invitation email when you click",
+ )
+
+ @less_console_noise_decorator
def test_get_filters(self):
"""Ensures that our filters are displaying correctly"""
with less_console_noise():
@@ -456,6 +505,176 @@ class TestPortfolioInvitationAdmin(TestCase):
self.assertContains(response, invited_html, count=1)
self.assertContains(response, retrieved_html, count=1)
+ @less_console_noise_decorator
+ @patch("registrar.admin.send_portfolio_invitation_email")
+ @patch("django.contrib.messages.success") # Mock the `messages.warning` call
+ def test_save_sends_email(self, mock_messages_warning, mock_send_email):
+ """On save_model, an email is NOT sent if an invitation already exists."""
+ self.client.force_login(self.superuser)
+
+ # Create an instance of the admin class
+ admin_instance = PortfolioInvitationAdmin(PortfolioInvitation, admin_site=None)
+
+ # Create a PortfolioInvitation instance
+ portfolio_invitation = PortfolioInvitation(
+ email="james.gordon@gotham.gov",
+ portfolio=self.portfolio,
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
+ )
+
+ # Create a request object
+ request = self.factory.post("/admin/registrar/PortfolioInvitation/add/")
+ request.user = self.superuser
+
+ # Call the save_model method
+ admin_instance.save_model(request, portfolio_invitation, None, None)
+
+ # Assert that send_portfolio_invitation_email is not called
+ mock_send_email.assert_called()
+
+ # Get the arguments passed to send_portfolio_invitation_email
+ _, called_kwargs = mock_send_email.call_args
+
+ # Assert the email content
+ self.assertEqual(called_kwargs["email"], "james.gordon@gotham.gov")
+ self.assertEqual(called_kwargs["requestor"], self.superuser)
+ self.assertEqual(called_kwargs["portfolio"], self.portfolio)
+
+ # Assert that a warning message was triggered
+ mock_messages_warning.assert_called_once_with(request, "james.gordon@gotham.gov has been invited.")
+
+ @less_console_noise_decorator
+ @patch("registrar.admin.send_portfolio_invitation_email")
+ @patch("django.contrib.messages.warning") # Mock the `messages.warning` call
+ def test_save_does_not_send_email_if_requested_user_exists(self, mock_messages_warning, mock_send_email):
+ """On save_model, an email is NOT sent if an the requested email belongs to an existing user.
+ It also throws a warning."""
+ self.client.force_login(self.superuser)
+
+ # Create an instance of the admin class
+ admin_instance = PortfolioInvitationAdmin(PortfolioInvitation, admin_site=None)
+
+ # Mock the UserPortfolioPermission query to simulate the invitation already existing
+ existing_user = create_user()
+ UserPortfolioPermission.objects.create(user=existing_user, portfolio=self.portfolio)
+
+ # Create a PortfolioInvitation instance
+ portfolio_invitation = PortfolioInvitation(
+ email=existing_user.email,
+ portfolio=self.portfolio,
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
+ )
+
+ # Create a request object
+ request = self.factory.post("/admin/registrar/PortfolioInvitation/add/")
+ request.user = self.superuser
+
+ # Call the save_model method
+ admin_instance.save_model(request, portfolio_invitation, None, None)
+
+ # Assert that send_portfolio_invitation_email is not called
+ mock_send_email.assert_not_called()
+
+ # Assert that a warning message was triggered
+ mock_messages_warning.assert_called_once_with(request, "User is already a member of this portfolio.")
+
+ @less_console_noise_decorator
+ @patch("registrar.admin.send_portfolio_invitation_email")
+ @patch("django.contrib.messages.error") # Mock the `messages.error` call
+ def test_save_exception_email_sending_error(self, mock_messages_error, mock_send_email):
+ """Handle EmailSendingError correctly when sending the portfolio invitation fails."""
+ self.client.force_login(self.superuser)
+
+ # Mock the email sending function to raise EmailSendingError
+ mock_send_email.side_effect = EmailSendingError("Email service unavailable")
+
+ # Create an instance of the admin class
+ admin_instance = PortfolioInvitationAdmin(PortfolioInvitation, admin_site=None)
+
+ # Create a PortfolioInvitation instance
+ portfolio_invitation = PortfolioInvitation(
+ email="james.gordon@gotham.gov",
+ portfolio=self.portfolio,
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
+ )
+
+ # Create a request object
+ request = self.factory.post("/admin/registrar/PortfolioInvitation/add/")
+ request.user = self.superuser
+
+ # Call the save_model method
+ admin_instance.save_model(request, portfolio_invitation, None, None)
+
+ # Assert that messages.error was called with the correct message
+ mock_messages_error.assert_called_once_with(
+ request, "Could not send email invitation. Portfolio invitation not saved."
+ )
+
+ @less_console_noise_decorator
+ @patch("registrar.admin.send_portfolio_invitation_email")
+ @patch("django.contrib.messages.error") # Mock the `messages.error` call
+ def test_save_exception_missing_email_error(self, mock_messages_error, mock_send_email):
+ """Handle MissingEmailError correctly when no email exists for the requestor."""
+ self.client.force_login(self.superuser)
+
+ # Mock the email sending function to raise MissingEmailError
+ mock_send_email.side_effect = MissingEmailError()
+
+ # Create an instance of the admin class
+ admin_instance = PortfolioInvitationAdmin(PortfolioInvitation, admin_site=None)
+
+ # Create a PortfolioInvitation instance
+ portfolio_invitation = PortfolioInvitation(
+ email="james.gordon@gotham.gov",
+ portfolio=self.portfolio,
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
+ )
+
+ # Create a request object
+ request = self.factory.post("/admin/registrar/PortfolioInvitation/add/")
+ request.user = self.superuser
+
+ # Call the save_model method
+ admin_instance.save_model(request, portfolio_invitation, None, None)
+
+ # Assert that messages.error was called with the correct message
+ mock_messages_error.assert_called_once_with(
+ request,
+ "Can't send invitation email. No email is associated with your user account.",
+ )
+
+ @less_console_noise_decorator
+ @patch("registrar.admin.send_portfolio_invitation_email")
+ @patch("django.contrib.messages.error") # Mock the `messages.error` call
+ def test_save_exception_generic_error(self, mock_messages_error, mock_send_email):
+ """Handle generic exceptions correctly during portfolio invitation."""
+ self.client.force_login(self.superuser)
+
+ # Mock the email sending function to raise a generic exception
+ mock_send_email.side_effect = Exception("Unexpected error")
+
+ # Create an instance of the admin class
+ admin_instance = PortfolioInvitationAdmin(PortfolioInvitation, admin_site=None)
+
+ # Create a PortfolioInvitation instance
+ portfolio_invitation = PortfolioInvitation(
+ email="james.gordon@gotham.gov",
+ portfolio=self.portfolio,
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
+ )
+
+ # Create a request object
+ request = self.factory.post("/admin/registrar/PortfolioInvitation/add/")
+ request.user = self.superuser
+
+ # Call the save_model method
+ admin_instance.save_model(request, portfolio_invitation, None, None)
+
+ # Assert that messages.error was called with the correct message
+ mock_messages_error.assert_called_once_with(
+ request, "Could not send email invitation. Portfolio invitation not saved."
+ )
+
class TestHostAdmin(TestCase):
"""Tests for the HostAdmin class as super user
diff --git a/src/registrar/tests/test_forms.py b/src/registrar/tests/test_forms.py
index 12d9af8ac..a2960deac 100644
--- a/src/registrar/tests/test_forms.py
+++ b/src/registrar/tests/test_forms.py
@@ -18,7 +18,17 @@ from registrar.forms.domain_request_wizard import (
AboutYourOrganizationForm,
)
from registrar.forms.domain import ContactForm
-from registrar.tests.common import MockEppLib
+from registrar.forms.portfolio import (
+ PortfolioInvitedMemberForm,
+ PortfolioMemberForm,
+ PortfolioNewMemberForm,
+)
+from registrar.models.portfolio import Portfolio
+from registrar.models.portfolio_invitation import PortfolioInvitation
+from registrar.models.user import User
+from registrar.models.user_portfolio_permission import UserPortfolioPermission
+from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
+from registrar.tests.common import MockEppLib, create_user
from django.contrib.auth import get_user_model
@@ -408,3 +418,196 @@ class TestContactForm(TestCase):
def test_contact_form_email_invalid2(self):
form = ContactForm(data={"email": "@"})
self.assertEqual(form.errors["email"], ["Enter a valid email address."])
+
+
+class TestBasePortfolioMemberForms(TestCase):
+ """We test on the child forms instead of BasePortfolioMemberForm because the base form
+ is a model form with no model bound."""
+
+ def setUp(self):
+ super().setUp()
+ self.user = create_user()
+ self.portfolio, _ = Portfolio.objects.get_or_create(
+ creator_id=self.user.id, organization_name="Hotel California"
+ )
+
+ def tearDown(self):
+ super().tearDown()
+ Portfolio.objects.all().delete()
+ UserPortfolioPermission.objects.all().delete()
+ PortfolioInvitation.objects.all().delete()
+ User.objects.all().delete()
+
+ def _assert_form_is_valid(self, form_class, data, instance=None):
+ if instance is not None:
+ form = form_class(data=data, instance=instance)
+ else:
+ print("no instance")
+ form = form_class(data=data)
+ self.assertTrue(form.is_valid(), f"Form {form_class.__name__} failed validation with data: {data}")
+ return form
+
+ def _assert_form_has_error(self, form_class, data, field_name):
+ form = form_class(data=data)
+ self.assertFalse(form.is_valid())
+ self.assertIn(field_name, form.errors)
+
+ def _assert_initial_data(self, form_class, instance, expected_initial_data):
+ """Helper to check if the instance data is correctly mapped to the initial form values."""
+ form = form_class(instance=instance)
+ for field, expected_value in expected_initial_data.items():
+ self.assertEqual(form.initial[field], expected_value)
+
+ def _assert_permission_mapping(self, form_class, data, expected_permissions):
+ """Helper to check if permissions are correctly handled and mapped."""
+ form = self._assert_form_is_valid(form_class, data)
+ cleaned_data = form.cleaned_data
+ for permission in expected_permissions:
+ self.assertIn(permission, cleaned_data["additional_permissions"])
+
+ def test_required_field_for_admin(self):
+ """Test that required fields are validated for an admin role."""
+ data = {
+ "role": UserPortfolioRoleChoices.ORGANIZATION_ADMIN.value,
+ "domain_request_permission_admin": "", # Simulate missing field
+ "member_permission_admin": "", # Simulate missing field
+ }
+
+ # Check required fields for all forms
+ self._assert_form_has_error(PortfolioMemberForm, data, "domain_request_permission_admin")
+ self._assert_form_has_error(PortfolioMemberForm, data, "member_permission_admin")
+
+ self._assert_form_has_error(PortfolioInvitedMemberForm, data, "domain_request_permission_admin")
+ self._assert_form_has_error(PortfolioInvitedMemberForm, data, "member_permission_admin")
+
+ self._assert_form_has_error(PortfolioNewMemberForm, data, "domain_request_permission_admin")
+ self._assert_form_has_error(PortfolioNewMemberForm, data, "member_permission_admin")
+
+ def test_required_field_for_member(self):
+ """Test that required fields are validated for a member role."""
+ data = {
+ "role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER.value,
+ "domain_request_permission_member": "", # Simulate missing field
+ }
+
+ # Check required fields for all forms
+ self._assert_form_has_error(PortfolioMemberForm, data, "domain_request_permission_member")
+ self._assert_form_has_error(PortfolioInvitedMemberForm, data, "domain_request_permission_member")
+ self._assert_form_has_error(PortfolioNewMemberForm, data, "domain_request_permission_member")
+
+ def test_clean_validates_required_fields_for_role(self):
+ """Test that the `clean` method validates the correct fields for each role.
+
+ For PortfolioMemberForm and PortfolioInvitedMemberForm, we pass an object as the instance to the form.
+ For UserPortfolioPermissionChoices, we add a portfolio and an email to the POST data.
+
+ These things are handled in the views."""
+
+ user_portfolio_permission, _ = UserPortfolioPermission.objects.get_or_create(
+ portfolio=self.portfolio, user=self.user
+ )
+ portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create(portfolio=self.portfolio, email="hi@ho")
+
+ data = {
+ "role": UserPortfolioRoleChoices.ORGANIZATION_ADMIN.value,
+ "domain_request_permission_admin": UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS.value,
+ "member_permission_admin": UserPortfolioPermissionChoices.EDIT_MEMBERS.value,
+ }
+
+ # Check form validity for all forms
+ form = self._assert_form_is_valid(PortfolioMemberForm, data, user_portfolio_permission)
+ cleaned_data = form.cleaned_data
+ self.assertEqual(cleaned_data["roles"], [UserPortfolioRoleChoices.ORGANIZATION_ADMIN.value])
+ self.assertEqual(cleaned_data["additional_permissions"], [UserPortfolioPermissionChoices.EDIT_MEMBERS])
+
+ form = self._assert_form_is_valid(PortfolioInvitedMemberForm, data, portfolio_invitation)
+ cleaned_data = form.cleaned_data
+ self.assertEqual(cleaned_data["roles"], [UserPortfolioRoleChoices.ORGANIZATION_ADMIN.value])
+ self.assertEqual(cleaned_data["additional_permissions"], [UserPortfolioPermissionChoices.EDIT_MEMBERS])
+
+ data = {
+ "email": "hi@ho.com",
+ "portfolio": self.portfolio.id,
+ "role": UserPortfolioRoleChoices.ORGANIZATION_ADMIN.value,
+ "domain_request_permission_admin": UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS.value,
+ "member_permission_admin": UserPortfolioPermissionChoices.EDIT_MEMBERS.value,
+ }
+
+ form = self._assert_form_is_valid(PortfolioNewMemberForm, data)
+ cleaned_data = form.cleaned_data
+ self.assertEqual(cleaned_data["roles"], [UserPortfolioRoleChoices.ORGANIZATION_ADMIN.value])
+ self.assertEqual(cleaned_data["additional_permissions"], [UserPortfolioPermissionChoices.EDIT_MEMBERS])
+
+ def test_clean_member_permission_edgecase(self):
+ """Test that the clean method correctly handles the special "no_access" value for members.
+ We'll need to add a portfolio, which in the app is handled by the view post."""
+
+ user_portfolio_permission, _ = UserPortfolioPermission.objects.get_or_create(
+ portfolio=self.portfolio, user=self.user
+ )
+ portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create(portfolio=self.portfolio, email="hi@ho")
+
+ data = {
+ "role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER.value,
+ "domain_request_permission_member": "no_access", # Simulate no access permission
+ }
+
+ form = self._assert_form_is_valid(PortfolioMemberForm, data, user_portfolio_permission)
+ cleaned_data = form.cleaned_data
+ self.assertEqual(cleaned_data["domain_request_permission_member"], None)
+
+ form = self._assert_form_is_valid(PortfolioInvitedMemberForm, data, portfolio_invitation)
+ cleaned_data = form.cleaned_data
+ self.assertEqual(cleaned_data["domain_request_permission_member"], None)
+
+ def test_map_instance_to_initial_admin_role(self):
+ """Test that instance data is correctly mapped to the initial form values for an admin role."""
+ user_portfolio_permission = UserPortfolioPermission(
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
+ additional_permissions=[UserPortfolioPermissionChoices.VIEW_MEMBERS],
+ )
+ portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create(
+ portfolio=self.portfolio,
+ email="hi@ho",
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
+ additional_permissions=[UserPortfolioPermissionChoices.VIEW_MEMBERS],
+ )
+
+ expected_initial_data = {
+ "role": UserPortfolioRoleChoices.ORGANIZATION_ADMIN,
+ "domain_request_permission_admin": UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS,
+ "member_permission_admin": UserPortfolioPermissionChoices.VIEW_MEMBERS,
+ }
+ self._assert_initial_data(PortfolioMemberForm, user_portfolio_permission, expected_initial_data)
+ self._assert_initial_data(PortfolioInvitedMemberForm, portfolio_invitation, expected_initial_data)
+
+ def test_map_instance_to_initial_member_role(self):
+ """Test that instance data is correctly mapped to the initial form values for a member role."""
+ user_portfolio_permission = UserPortfolioPermission(
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
+ additional_permissions=[UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS],
+ )
+ portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create(
+ portfolio=self.portfolio,
+ email="hi@ho",
+ roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
+ additional_permissions=[UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS],
+ )
+ expected_initial_data = {
+ "role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER,
+ "domain_request_permission_member": UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS,
+ }
+ self._assert_initial_data(PortfolioMemberForm, user_portfolio_permission, expected_initial_data)
+ self._assert_initial_data(PortfolioInvitedMemberForm, portfolio_invitation, expected_initial_data)
+
+ def test_invalid_data_for_admin(self):
+ """Test invalid form submission for an admin role with missing permissions."""
+ data = {
+ "email": "hi@ho.com",
+ "portfolio": self.portfolio.id,
+ "role": UserPortfolioRoleChoices.ORGANIZATION_ADMIN.value,
+ "domain_request_permission_admin": "", # Missing field
+ "member_permission_admin": "", # Missing field
+ }
+ self._assert_form_has_error(PortfolioMemberForm, data, "domain_request_permission_admin")
+ self._assert_form_has_error(PortfolioInvitedMemberForm, data, "member_permission_admin")
diff --git a/src/registrar/tests/test_reports.py b/src/registrar/tests/test_reports.py
index 995782eea..4a41238c7 100644
--- a/src/registrar/tests/test_reports.py
+++ b/src/registrar/tests/test_reports.py
@@ -16,7 +16,7 @@ from registrar.utility.csv_export import (
DomainDataType,
DomainDataFederal,
DomainDataTypeUser,
- DomainRequestsDataType,
+ DomainRequestDataType,
DomainGrowth,
DomainManaged,
DomainUnmanaged,
@@ -456,11 +456,11 @@ class ExportDataTest(MockDbForIndividualTests, MockEppLib):
portfolio.delete()
def _run_domain_request_data_type_user_export(self, request):
- """Helper function to run the exporting_dr_data_to_csv function on DomainRequestsDataType"""
+ """Helper function to run the export_data_to_csv function on DomainRequestDataType"""
csv_file = StringIO()
- DomainRequestsDataType.exporting_dr_data_to_csv(csv_file, request=request)
+ DomainRequestDataType.export_data_to_csv(csv_file, request=request)
csv_file.seek(0)
@@ -773,9 +773,9 @@ class ExportDataTest(MockDbForIndividualTests, MockEppLib):
# Content
"city5.gov,Approved,Federal,Executive,,Testorg,N/A,,NY,2,,,,1,0,city1.gov,Testy,Tester,testy@town.com,"
"Chief Tester,Purpose of the site,There is more,Testy Tester testy2@town.com,,city.com,\n"
- "city2.gov,In review,Federal,Executive,Portfolio 1 Federal Agency,,N/A,,,2,,,,0,1,city1.gov,,,,,"
+ "city2.gov,In review,Federal,Executive,Portfolio 1 Federal Agency,,N/A,,NY,2,,,,0,1,city1.gov,,,,,"
"Purpose of the site,There is more,Testy Tester testy2@town.com,,city.com,\n"
- "city3.gov,Submitted,Federal,Executive,Portfolio 1 Federal Agency,,N/A,,,2,,,,0,1,"
+ "city3.gov,Submitted,Federal,Executive,Portfolio 1 Federal Agency,,N/A,,NY,2,,,,0,1,"
'"cheeseville.gov, city1.gov, igorville.gov",,,,,Purpose of the site,CISA-first-name CISA-last-name | '
'There is more,"Meow Tester24 te2@town.com, Testy1232 Tester24 te2@town.com, '
'Testy Tester testy2@town.com",'
@@ -785,7 +785,7 @@ class ExportDataTest(MockDbForIndividualTests, MockEppLib):
"Chief Tester,Purpose of the site,CISA-first-name CISA-last-name | There is more,"
"Testy Tester testy2@town.com,"
"cisaRep@igorville.gov,city.com,\n"
- "city6.gov,Submitted,Federal,Executive,Portfolio 1 Federal Agency,,N/A,,,2,,,,0,1,city1.gov,,,,,"
+ "city6.gov,Submitted,Federal,Executive,Portfolio 1 Federal Agency,,N/A,,NY,2,,,,0,1,city1.gov,,,,,"
"Purpose of the site,CISA-first-name CISA-last-name | There is more,Testy Tester testy2@town.com,"
"cisaRep@igorville.gov,city.com,\n"
)
@@ -794,6 +794,7 @@ class ExportDataTest(MockDbForIndividualTests, MockEppLib):
# spaces and leading/trailing whitespace
csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip()
expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip()
+ self.maxDiff = None
self.assertEqual(csv_content, expected_content)
diff --git a/src/registrar/tests/test_views_domain.py b/src/registrar/tests/test_views_domain.py
index ba237e1e7..aedfc41c2 100644
--- a/src/registrar/tests/test_views_domain.py
+++ b/src/registrar/tests/test_views_domain.py
@@ -4,6 +4,8 @@ from unittest.mock import MagicMock, ANY, patch
from django.conf import settings
from django.urls import reverse
from django.contrib.auth import get_user_model
+from registrar.models.portfolio_invitation import PortfolioInvitation
+from registrar.utility.email import EmailSendingError
from waffle.testutils import override_flag
from api.tests.common import less_console_noise_decorator
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
@@ -548,6 +550,7 @@ class TestDomainManagers(TestDomainOverview):
self.portfolio = Portfolio.objects.create(creator=self.user, organization_name="Ice Cream")
# Add the portfolio to the domain_information object
self.domain_information.portfolio = self.portfolio
+ self.domain_information.save()
# Add portfolio perms to the user object
self.portfolio_permission, _ = UserPortfolioPermission.objects.get_or_create(
user=self.user, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
@@ -560,6 +563,7 @@ class TestDomainManagers(TestDomainOverview):
def tearDown(self):
"""Ensure that the user has its original permissions"""
+ PortfolioInvitation.objects.all().delete()
super().tearDown()
@less_console_noise_decorator
@@ -592,7 +596,7 @@ class TestDomainManagers(TestDomainOverview):
@less_console_noise_decorator
def test_domain_user_add_form(self):
"""Adding an existing user works."""
- other_user, _ = get_user_model().objects.get_or_create(email="mayor@igorville.gov")
+ get_user_model().objects.get_or_create(email="mayor@igorville.gov")
add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
@@ -615,6 +619,148 @@ class TestDomainManagers(TestDomainOverview):
success_page = success_result.follow()
self.assertContains(success_page, "mayor@igorville.gov")
+ @boto3_mocking.patching
+ @override_flag("organization_feature", active=True)
+ @less_console_noise_decorator
+ @patch("registrar.views.domain.send_portfolio_invitation_email")
+ @patch("registrar.views.domain.send_domain_invitation_email")
+ def test_domain_user_add_form_sends_portfolio_invitation(self, mock_send_domain_email, mock_send_portfolio_email):
+ """Adding an existing user works and sends portfolio invitation when
+ user is not member of portfolio."""
+ get_user_model().objects.get_or_create(email="mayor@igorville.gov")
+ add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
+ session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
+
+ add_page.form["email"] = "mayor@igorville.gov"
+
+ self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
+
+ success_result = add_page.form.submit()
+
+ self.assertEqual(success_result.status_code, 302)
+ self.assertEqual(
+ success_result["Location"],
+ reverse("domain-users", kwargs={"pk": self.domain.id}),
+ )
+
+ # Verify that the invitation emails were sent
+ mock_send_portfolio_email.assert_called_once_with(
+ email="mayor@igorville.gov", requestor=self.user, portfolio=self.portfolio
+ )
+ mock_send_domain_email.assert_called_once()
+ call_args = mock_send_domain_email.call_args.kwargs
+ self.assertEqual(call_args["email"], "mayor@igorville.gov")
+ self.assertEqual(call_args["requestor"], self.user)
+ self.assertEqual(call_args["domain"], self.domain)
+ self.assertIsNone(call_args.get("is_member_of_different_org"))
+
+ # Assert that the PortfolioInvitation is created
+ portfolio_invitation = PortfolioInvitation.objects.filter(
+ email="mayor@igorville.gov", portfolio=self.portfolio
+ ).first()
+ self.assertIsNotNone(portfolio_invitation, "Portfolio invitation should be created.")
+ self.assertEqual(portfolio_invitation.email, "mayor@igorville.gov")
+ self.assertEqual(portfolio_invitation.portfolio, self.portfolio)
+
+ self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
+ success_page = success_result.follow()
+ self.assertContains(success_page, "mayor@igorville.gov")
+
+ @boto3_mocking.patching
+ @override_flag("organization_feature", active=True)
+ @less_console_noise_decorator
+ @patch("registrar.views.domain.send_portfolio_invitation_email")
+ @patch("registrar.views.domain.send_domain_invitation_email")
+ def test_domain_user_add_form_doesnt_send_portfolio_invitation_if_already_member(
+ self, mock_send_domain_email, mock_send_portfolio_email
+ ):
+ """Adding an existing user works and sends portfolio invitation when
+ user is not member of portfolio."""
+ other_user, _ = get_user_model().objects.get_or_create(email="mayor@igorville.gov")
+ UserPortfolioPermission.objects.get_or_create(
+ user=other_user, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
+ )
+ add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
+ session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
+
+ add_page.form["email"] = "mayor@igorville.gov"
+
+ self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
+
+ success_result = add_page.form.submit()
+
+ self.assertEqual(success_result.status_code, 302)
+ self.assertEqual(
+ success_result["Location"],
+ reverse("domain-users", kwargs={"pk": self.domain.id}),
+ )
+
+ # Verify that the invitation emails were sent
+ mock_send_portfolio_email.assert_not_called()
+ mock_send_domain_email.assert_called_once()
+ call_args = mock_send_domain_email.call_args.kwargs
+ self.assertEqual(call_args["email"], "mayor@igorville.gov")
+ self.assertEqual(call_args["requestor"], self.user)
+ self.assertEqual(call_args["domain"], self.domain)
+ self.assertIsNone(call_args.get("is_member_of_different_org"))
+
+ # Assert that no PortfolioInvitation is created
+ portfolio_invitation_exists = PortfolioInvitation.objects.filter(
+ email="mayor@igorville.gov", portfolio=self.portfolio
+ ).exists()
+ self.assertFalse(
+ portfolio_invitation_exists, "Portfolio invitation should not be created when the user is already a member."
+ )
+
+ self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
+ success_page = success_result.follow()
+ self.assertContains(success_page, "mayor@igorville.gov")
+
+ @boto3_mocking.patching
+ @override_flag("organization_feature", active=True)
+ @less_console_noise_decorator
+ @patch("registrar.views.domain.send_portfolio_invitation_email")
+ @patch("registrar.views.domain.send_domain_invitation_email")
+ def test_domain_user_add_form_sends_portfolio_invitation_raises_email_sending_error(
+ self, mock_send_domain_email, mock_send_portfolio_email
+ ):
+ """Adding an existing user works and attempts to send portfolio invitation when
+ user is not member of portfolio and send raises an error."""
+ mock_send_portfolio_email.side_effect = EmailSendingError("Failed to send email.")
+ get_user_model().objects.get_or_create(email="mayor@igorville.gov")
+ add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
+ session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
+
+ add_page.form["email"] = "mayor@igorville.gov"
+
+ self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
+
+ success_result = add_page.form.submit()
+
+ self.assertEqual(success_result.status_code, 302)
+ self.assertEqual(
+ success_result["Location"],
+ reverse("domain-users", kwargs={"pk": self.domain.id}),
+ )
+
+ # Verify that the invitation emails were sent
+ mock_send_portfolio_email.assert_called_once_with(
+ email="mayor@igorville.gov", requestor=self.user, portfolio=self.portfolio
+ )
+ mock_send_domain_email.assert_not_called()
+
+ # Assert that no PortfolioInvitation is created
+ portfolio_invitation_exists = PortfolioInvitation.objects.filter(
+ email="mayor@igorville.gov", portfolio=self.portfolio
+ ).exists()
+ self.assertFalse(
+ portfolio_invitation_exists, "Portfolio invitation should not be created when email fails to send."
+ )
+
+ self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
+ success_page = success_result.follow()
+ self.assertContains(success_page, "Could not send email invitation.")
+
@boto3_mocking.patching
@less_console_noise_decorator
def test_domain_invitation_created(self):
@@ -827,39 +973,20 @@ class TestDomainManagers(TestDomainOverview):
self.assertNotIn("Last", email_content)
self.assertNotIn("First Last", email_content)
- @boto3_mocking.patching
@less_console_noise_decorator
- def test_domain_invitation_email_displays_error_non_existent(self):
- """Inviting a non existent user sends them an email, with email as the name."""
- # make sure there is no user with this email
- email_address = "mayor@igorville.gov"
- User.objects.filter(email=email_address).delete()
-
- # Give the user who is sending the email an invalid email address
- self.user.email = ""
- self.user.save()
-
+ def test_domain_invitation_email_validation_blocks_bad_email(self):
+ """Inviting a bad email blocks at validation."""
+ email_address = "mayor"
self.domain_information, _ = DomainInformation.objects.get_or_create(creator=self.user, domain=self.domain)
- mock_client = MagicMock()
- mock_error_message = MagicMock()
- with boto3_mocking.clients.handler_for("sesv2", mock_client):
- with patch("django.contrib.messages.error") as mock_error_message:
- add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
- session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
- add_page.form["email"] = email_address
- self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
- add_page.form.submit().follow()
+ add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
+ session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
+ add_page.form["email"] = email_address
+ self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
+ response = add_page.form.submit()
- expected_message_content = "Can't send invitation email. No email is associated with your account."
+ self.assertContains(response, "Enter an email address in the required format, like name@example.com.")
- # Grab the message content
- returned_error_message = mock_error_message.call_args[0][1]
-
- # Check that the message content is what we expect
- self.assertEqual(expected_message_content, returned_error_message)
-
- @boto3_mocking.patching
@less_console_noise_decorator
def test_domain_invitation_email_displays_error(self):
"""When the requesting user has no email, an error is displayed"""
@@ -870,28 +997,25 @@ class TestDomainManagers(TestDomainOverview):
# Give the user who is sending the email an invalid email address
self.user.email = ""
+ self.user.is_staff = False
self.user.save()
self.domain_information, _ = DomainInformation.objects.get_or_create(creator=self.user, domain=self.domain)
- mock_client = MagicMock()
+ with patch("django.contrib.messages.error") as mock_error:
+ add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
+ session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
+ add_page.form["email"] = email_address
+ self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
+ add_page.form.submit()
- mock_error_message = MagicMock()
- with boto3_mocking.clients.handler_for("sesv2", mock_client):
- with patch("django.contrib.messages.error") as mock_error_message:
- add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
- session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
- add_page.form["email"] = email_address
- self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
- add_page.form.submit().follow()
+ expected_message_content = "Can't send invitation email. No email is associated with your user account."
- expected_message_content = "Can't send invitation email. No email is associated with your account."
-
- # Grab the message content
- returned_error_message = mock_error_message.call_args[0][1]
-
- # Check that the message content is what we expect
- self.assertEqual(expected_message_content, returned_error_message)
+ # Assert that the error message was called with the correct argument
+ mock_error.assert_called_once_with(
+ ANY,
+ expected_message_content,
+ )
@less_console_noise_decorator
def test_domain_invitation_cancel(self):
diff --git a/src/registrar/tests/test_views_member_domains_json.py b/src/registrar/tests/test_views_member_domains_json.py
index c9f1e38cc..091ad6151 100644
--- a/src/registrar/tests/test_views_member_domains_json.py
+++ b/src/registrar/tests/test_views_member_domains_json.py
@@ -94,6 +94,12 @@ class GetPortfolioMemberDomainsJsonTest(TestWithUser, WebTest):
DomainInvitation.objects.create(
email=cls.invited_member_email, domain=cls.domain2, status=DomainInvitation.DomainInvitationStatus.INVITED
)
+ DomainInvitation.objects.create(
+ email=cls.invited_member_email, domain=cls.domain3, status=DomainInvitation.DomainInvitationStatus.CANCELED
+ )
+ DomainInvitation.objects.create(
+ email=cls.invited_member_email, domain=cls.domain4, status=DomainInvitation.DomainInvitationStatus.RETRIEVED
+ )
@classmethod
def tearDownClass(cls):
@@ -138,7 +144,8 @@ class GetPortfolioMemberDomainsJsonTest(TestWithUser, WebTest):
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_get_portfolio_invitedmember_domains_json_authenticated(self):
- """Test that portfolio invitedmember's domains are returned properly for an authenticated user."""
+ """Test that portfolio invitedmember's domains are returned properly for an authenticated user.
+ CANCELED and RETRIEVED invites should be ignored."""
response = self.app.get(
reverse("get_member_domains_json"),
params={"portfolio": self.portfolio.id, "email": self.invited_member_email, "member_only": "true"},
diff --git a/src/registrar/tests/test_views_members_json.py b/src/registrar/tests/test_views_members_json.py
index d7b9f3a9f..8082a1a30 100644
--- a/src/registrar/tests/test_views_members_json.py
+++ b/src/registrar/tests/test_views_members_json.py
@@ -157,7 +157,7 @@ class GetPortfolioMembersJsonTest(MockEppLib, WebTest):
@override_flag("organization_members", active=True)
def test_get_portfolio_invited_json_authenticated(self):
"""Test that portfolio invitees are returned properly for an authenticated user."""
- """Also tests that reposnse is 200 when no domains"""
+ """Also tests that response is 200 when no domains"""
UserPortfolioPermission.objects.create(
user=self.user,
portfolio=self.portfolio,
@@ -258,13 +258,14 @@ class GetPortfolioMembersJsonTest(MockEppLib, WebTest):
role=UserDomainRole.Roles.MANAGER,
)
- # create domain for which user is manager and domain not in portfolio
+ # create another domain in the portfolio
domain2 = Domain.objects.create(
- name="somedomain2.com",
+ name="thissecondpermtestsmultipleperms@lets.notbreak",
)
DomainInformation.objects.create(
creator=self.user,
domain=domain2,
+ portfolio=self.portfolio,
)
UserDomainRole.objects.create(
user=self.user,
@@ -272,6 +273,20 @@ class GetPortfolioMembersJsonTest(MockEppLib, WebTest):
role=UserDomainRole.Roles.MANAGER,
)
+ # create domain for which user is manager and domain not in portfolio
+ domain3 = Domain.objects.create(
+ name="somedomain3.com",
+ )
+ DomainInformation.objects.create(
+ creator=self.user,
+ domain=domain3,
+ )
+ UserDomainRole.objects.create(
+ user=self.user,
+ domain=domain3,
+ role=UserDomainRole.Roles.MANAGER,
+ )
+
response = self.app.get(reverse("get_portfolio_members_json"), params={"portfolio": self.portfolio.id})
self.assertEqual(response.status_code, 200)
data = response.json
@@ -279,7 +294,8 @@ class GetPortfolioMembersJsonTest(MockEppLib, WebTest):
# Check if the domain appears in the response JSON and that domain2 does not
domain_names = [domain_name for member in data["members"] for domain_name in member.get("domain_names", [])]
self.assertIn("somedomain1.com", domain_names)
- self.assertNotIn("somedomain2.com", domain_names)
+ self.assertIn("thissecondpermtestsmultipleperms@lets.notbreak", domain_names)
+ self.assertNotIn("somedomain3.com", domain_names)
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@@ -318,19 +334,33 @@ class GetPortfolioMembersJsonTest(MockEppLib, WebTest):
domain=domain,
)
- # create a domain not in the portfolio
+ # create another domain in the portfolio
domain2 = Domain.objects.create(
- name="somedomain2.com",
+ name="thissecondinvitetestsasubqueryinjson@lets.notbreak",
)
DomainInformation.objects.create(
creator=self.user,
domain=domain2,
+ portfolio=self.portfolio,
)
DomainInvitation.objects.create(
email=self.email6,
domain=domain2,
)
+ # create a domain not in the portfolio
+ domain3 = Domain.objects.create(
+ name="somedomain3.com",
+ )
+ DomainInformation.objects.create(
+ creator=self.user,
+ domain=domain3,
+ )
+ DomainInvitation.objects.create(
+ email=self.email6,
+ domain=domain3,
+ )
+
response = self.app.get(reverse("get_portfolio_members_json"), params={"portfolio": self.portfolio.id})
self.assertEqual(response.status_code, 200)
data = response.json
@@ -338,7 +368,8 @@ class GetPortfolioMembersJsonTest(MockEppLib, WebTest):
# Check if the domain appears in the response JSON and domain2 does not
domain_names = [domain_name for member in data["members"] for domain_name in member.get("domain_names", [])]
self.assertIn("somedomain1.com", domain_names)
- self.assertNotIn("somedomain2.com", domain_names)
+ self.assertIn("thissecondinvitetestsasubqueryinjson@lets.notbreak", domain_names)
+ self.assertNotIn("somedomain3.com", domain_names)
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
diff --git a/src/registrar/tests/test_views_portfolio.py b/src/registrar/tests/test_views_portfolio.py
index 01383ae77..d5a708cd0 100644
--- a/src/registrar/tests/test_views_portfolio.py
+++ b/src/registrar/tests/test_views_portfolio.py
@@ -14,17 +14,21 @@ from registrar.models import (
Suborganization,
AllowedEmail,
)
+from registrar.models.domain_invitation import DomainInvitation
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user_group import UserGroup
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from registrar.tests.test_views import TestWithUser
+from registrar.utility.email import EmailSendingError
+from registrar.utility.errors import MissingEmailError
from .common import MockSESClient, completed_domain_request, create_test_user, create_user
from waffle.testutils import override_flag
from django.contrib.sessions.middleware import SessionMiddleware
import boto3_mocking # type: ignore
from django.test import Client
import logging
+import json
logger = logging.getLogger(__name__)
@@ -1927,7 +1931,7 @@ class TestPortfolioMemberDomainsView(TestWithUser, WebTest):
cls.portfolio = Portfolio.objects.create(creator=cls.user, organization_name="Test Portfolio")
# Assign permissions to the user making requests
- UserPortfolioPermission.objects.create(
+ cls.portfolio_permission = UserPortfolioPermission.objects.create(
user=cls.user,
portfolio=cls.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
@@ -2106,11 +2110,22 @@ class TestPortfolioMemberDomainsEditView(TestPortfolioMemberDomainsView):
@classmethod
def setUpClass(cls):
super().setUpClass()
+ cls.url = reverse("member-domains-edit", kwargs={"pk": cls.portfolio_permission.pk})
@classmethod
def tearDownClass(cls):
super().tearDownClass()
+ def setUp(self):
+ super().setUp()
+ names = ["1.gov", "2.gov", "3.gov"]
+ Domain.objects.bulk_create([Domain(name=name) for name in names])
+
+ def tearDown(self):
+ super().tearDown()
+ UserDomainRole.objects.all().delete()
+ Domain.objects.all().delete()
+
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@@ -2162,16 +2177,140 @@ class TestPortfolioMemberDomainsEditView(TestPortfolioMemberDomainsView):
# Make sure the response is not found
self.assertEqual(response.status_code, 404)
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_post_with_valid_added_domains(self):
+ """Test that domains can be successfully added."""
+ self.client.force_login(self.user)
+
+ data = {
+ "added_domains": json.dumps([1, 2, 3]), # Mock domain IDs
+ }
+ response = self.client.post(self.url, data)
+
+ # Check that the UserDomainRole objects were created
+ self.assertEqual(UserDomainRole.objects.filter(user=self.user, role=UserDomainRole.Roles.MANAGER).count(), 3)
+
+ # Check for a success message and a redirect
+ self.assertRedirects(response, reverse("member-domains", kwargs={"pk": self.portfolio_permission.pk}))
+ messages = list(response.wsgi_request._messages)
+ self.assertEqual(len(messages), 1)
+ self.assertEqual(str(messages[0]), "The domain assignment changes have been saved.")
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_post_with_valid_removed_domains(self):
+ """Test that domains can be successfully removed."""
+ self.client.force_login(self.user)
+
+ # Create some UserDomainRole objects
+ domains = [1, 2, 3]
+ UserDomainRole.objects.bulk_create([UserDomainRole(domain_id=domain, user=self.user) for domain in domains])
+
+ data = {
+ "removed_domains": json.dumps([1, 2]),
+ }
+ response = self.client.post(self.url, data)
+
+ # Check that the UserDomainRole objects were deleted
+ self.assertEqual(UserDomainRole.objects.filter(user=self.user).count(), 1)
+ self.assertEqual(UserDomainRole.objects.filter(domain_id=3, user=self.user).count(), 1)
+
+ # Check for a success message and a redirect
+ self.assertRedirects(response, reverse("member-domains", kwargs={"pk": self.portfolio_permission.pk}))
+ messages = list(response.wsgi_request._messages)
+ self.assertEqual(len(messages), 1)
+ self.assertEqual(str(messages[0]), "The domain assignment changes have been saved.")
+
+ UserDomainRole.objects.all().delete()
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_post_with_invalid_added_domains_data(self):
+ """Test that an error is returned for invalid added domains data."""
+ self.client.force_login(self.user)
+
+ data = {
+ "added_domains": "json-statham",
+ }
+ response = self.client.post(self.url, data)
+
+ # Check that no UserDomainRole objects were created
+ self.assertEqual(UserDomainRole.objects.filter(user=self.user).count(), 0)
+
+ # Check for an error message and a redirect
+ self.assertRedirects(response, reverse("member-domains", kwargs={"pk": self.portfolio_permission.pk}))
+ messages = list(response.wsgi_request._messages)
+ self.assertEqual(len(messages), 1)
+ self.assertEqual(
+ str(messages[0]), "Invalid data for added domains. If the issue persists, please contact help@get.gov."
+ )
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_post_with_invalid_removed_domains_data(self):
+ """Test that an error is returned for invalid removed domains data."""
+ self.client.force_login(self.user)
+
+ data = {
+ "removed_domains": "not-a-json",
+ }
+ response = self.client.post(self.url, data)
+
+ # Check that no UserDomainRole objects were deleted
+ self.assertEqual(UserDomainRole.objects.filter(user=self.user).count(), 0)
+
+ # Check for an error message and a redirect
+ self.assertRedirects(response, reverse("member-domains", kwargs={"pk": self.portfolio_permission.pk}))
+ messages = list(response.wsgi_request._messages)
+ self.assertEqual(len(messages), 1)
+ self.assertEqual(
+ str(messages[0]), "Invalid data for removed domains. If the issue persists, please contact help@get.gov."
+ )
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_post_with_no_changes(self):
+ """Test that no changes message is displayed when no changes are made."""
+ self.client.force_login(self.user)
+
+ response = self.client.post(self.url, {})
+
+ # Check that no UserDomainRole objects were created or deleted
+ self.assertEqual(UserDomainRole.objects.filter(user=self.user).count(), 0)
+
+ # Check for an info message and a redirect
+ self.assertRedirects(response, reverse("member-domains", kwargs={"pk": self.portfolio_permission.pk}))
+ messages = list(response.wsgi_request._messages)
+ self.assertEqual(len(messages), 1)
+ self.assertEqual(str(messages[0]), "No changes detected.")
+
class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomainsView):
@classmethod
def setUpClass(cls):
super().setUpClass()
+ cls.url = reverse("invitedmember-domains-edit", kwargs={"pk": cls.invitation.pk})
@classmethod
def tearDownClass(cls):
super().tearDownClass()
+ def setUp(self):
+ super().setUp()
+ names = ["1.gov", "2.gov", "3.gov"]
+ Domain.objects.bulk_create([Domain(name=name) for name in names])
+
+ def tearDown(self):
+ super().tearDown()
+ Domain.objects.all().delete()
+ DomainInvitation.objects.all().delete()
+
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@@ -2222,6 +2361,175 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain
# Make sure the response is not found
self.assertEqual(response.status_code, 404)
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_post_with_valid_added_domains(self):
+ """Test adding new domains successfully."""
+ self.client.force_login(self.user)
+
+ data = {
+ "added_domains": json.dumps([1, 2, 3]), # Mock domain IDs
+ }
+ response = self.client.post(self.url, data)
+
+ # Check that the DomainInvitation objects were created
+ self.assertEqual(
+ DomainInvitation.objects.filter(
+ email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
+ ).count(),
+ 3,
+ )
+
+ # Check for a success message and a redirect
+ self.assertRedirects(response, reverse("invitedmember-domains", kwargs={"pk": self.invitation.pk}))
+ messages = list(response.wsgi_request._messages)
+ self.assertEqual(len(messages), 1)
+ self.assertEqual(str(messages[0]), "The domain assignment changes have been saved.")
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_post_with_existing_and_new_added_domains(self):
+ """Test updating existing and adding new invitations."""
+ self.client.force_login(self.user)
+
+ # Create existing invitations
+ DomainInvitation.objects.bulk_create(
+ [
+ DomainInvitation(
+ domain_id=1, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.CANCELED
+ ),
+ DomainInvitation(
+ domain_id=2, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
+ ),
+ ]
+ )
+
+ data = {
+ "added_domains": json.dumps([1, 2, 3]),
+ }
+ response = self.client.post(self.url, data)
+
+ # Check that status for domain_id=1 was updated to INVITED
+ self.assertEqual(
+ DomainInvitation.objects.get(domain_id=1, email="invited@example.com").status,
+ DomainInvitation.DomainInvitationStatus.INVITED,
+ )
+
+ # Check that domain_id=3 was created as INVITED
+ self.assertTrue(
+ DomainInvitation.objects.filter(
+ domain_id=3, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
+ ).exists()
+ )
+
+ # Check for a success message and a redirect
+ self.assertRedirects(response, reverse("invitedmember-domains", kwargs={"pk": self.invitation.pk}))
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_post_with_valid_removed_domains(self):
+ """Test removing domains successfully."""
+ self.client.force_login(self.user)
+
+ # Create existing invitations
+ DomainInvitation.objects.bulk_create(
+ [
+ DomainInvitation(
+ domain_id=1, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
+ ),
+ DomainInvitation(
+ domain_id=2, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
+ ),
+ ]
+ )
+
+ data = {
+ "removed_domains": json.dumps([1]),
+ }
+ response = self.client.post(self.url, data)
+
+ # Check that the status for domain_id=1 was updated to CANCELED
+ self.assertEqual(
+ DomainInvitation.objects.get(domain_id=1, email="invited@example.com").status,
+ DomainInvitation.DomainInvitationStatus.CANCELED,
+ )
+
+ # Check that domain_id=2 remains INVITED
+ self.assertEqual(
+ DomainInvitation.objects.get(domain_id=2, email="invited@example.com").status,
+ DomainInvitation.DomainInvitationStatus.INVITED,
+ )
+
+ # Check for a success message and a redirect
+ self.assertRedirects(response, reverse("invitedmember-domains", kwargs={"pk": self.invitation.pk}))
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_post_with_invalid_added_domains_data(self):
+ """Test handling of invalid JSON for added domains."""
+ self.client.force_login(self.user)
+
+ data = {
+ "added_domains": "not-a-json",
+ }
+ response = self.client.post(self.url, data)
+
+ # Check that no DomainInvitation objects were created
+ self.assertEqual(DomainInvitation.objects.count(), 0)
+
+ # Check for an error message and a redirect
+ self.assertRedirects(response, reverse("invitedmember-domains", kwargs={"pk": self.invitation.pk}))
+ messages = list(response.wsgi_request._messages)
+ self.assertEqual(len(messages), 1)
+ self.assertEqual(
+ str(messages[0]), "Invalid data for added domains. If the issue persists, please contact help@get.gov."
+ )
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_post_with_invalid_removed_domains_data(self):
+ """Test handling of invalid JSON for removed domains."""
+ self.client.force_login(self.user)
+
+ data = {
+ "removed_domains": "json-sudeikis",
+ }
+ response = self.client.post(self.url, data)
+
+ # Check that no DomainInvitation objects were updated
+ self.assertEqual(DomainInvitation.objects.count(), 0)
+
+ # Check for an error message and a redirect
+ self.assertRedirects(response, reverse("invitedmember-domains", kwargs={"pk": self.invitation.pk}))
+ messages = list(response.wsgi_request._messages)
+ self.assertEqual(len(messages), 1)
+ self.assertEqual(
+ str(messages[0]), "Invalid data for removed domains. If the issue persists, please contact help@get.gov."
+ )
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_post_with_no_changes(self):
+ """Test the case where no changes are made."""
+ self.client.force_login(self.user)
+
+ response = self.client.post(self.url, {})
+
+ # Check that no DomainInvitation objects were created or updated
+ self.assertEqual(DomainInvitation.objects.count(), 0)
+
+ # Check for an info message and a redirect
+ self.assertRedirects(response, reverse("invitedmember-domains", kwargs={"pk": self.invitation.pk}))
+ messages = list(response.wsgi_request._messages)
+ self.assertEqual(len(messages), 1)
+ self.assertEqual(str(messages[0]), "No changes detected.")
+
class TestRequestingEntity(WebTest):
"""The requesting entity page is a domain request form that only exists
@@ -2531,7 +2839,9 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest):
],
)
- cls.new_member_email = "new_user@example.com"
+ cls.new_member_email = "davekenn4242@gmail.com"
+
+ AllowedEmail.objects.get_or_create(email=cls.new_member_email)
# Assign permissions to the user making requests
UserPortfolioPermission.objects.create(
@@ -2550,8 +2860,10 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest):
UserPortfolioPermission.objects.all().delete()
Portfolio.objects.all().delete()
User.objects.all().delete()
+ AllowedEmail.objects.all().delete()
super().tearDownClass()
+ @boto3_mocking.patching
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@@ -2563,30 +2875,240 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest):
session_id = self.client.session.session_key
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
- # Simulate submission of member invite for new user
- final_response = self.client.post(
- reverse("new-member"),
- {
- "member_access_level": "basic",
- "basic_org_domain_request_permissions": "view_only",
- "email": self.new_member_email,
- },
- )
+ mock_client_class = MagicMock()
+ mock_client = mock_client_class.return_value
- # Ensure the final submission is successful
- self.assertEqual(final_response.status_code, 302) # redirects after success
+ with boto3_mocking.clients.handler_for("sesv2", mock_client_class):
+ # Simulate submission of member invite for new user
+ final_response = self.client.post(
+ reverse("new-member"),
+ {
+ "role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER.value,
+ "domain_request_permission_member": UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS.value,
+ "email": self.new_member_email,
+ },
+ )
- # Validate Database Changes
- portfolio_invite = PortfolioInvitation.objects.filter(
- email=self.new_member_email, portfolio=self.portfolio
- ).first()
- self.assertIsNotNone(portfolio_invite)
- self.assertEqual(portfolio_invite.email, self.new_member_email)
+ # Ensure the final submission is successful
+ self.assertEqual(final_response.status_code, 302) # Redirects
+
+ # Validate Database Changes
+ portfolio_invite = PortfolioInvitation.objects.filter(
+ email=self.new_member_email, portfolio=self.portfolio
+ ).first()
+ self.assertIsNotNone(portfolio_invite)
+ self.assertEqual(portfolio_invite.email, self.new_member_email)
+
+ # Check that an email was sent
+ self.assertTrue(mock_client.send_email.called)
+
+ @boto3_mocking.patching
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ def test_member_invite_for_new_users_initial_ajax_call_passes(self):
+ """Tests the member invitation flow for new users."""
+ self.client.force_login(self.user)
+
+ # Simulate a session to ensure continuity
+ session_id = self.client.session.session_key
+ self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
+
+ mock_client_class = MagicMock()
+ mock_client = mock_client_class.return_value
+
+ with boto3_mocking.clients.handler_for("sesv2", mock_client_class):
+ # Simulate submission of member invite for new user
+ final_response = self.client.post(
+ reverse("new-member"),
+ {
+ "role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER.value,
+ "domain_request_permission_member": UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS.value,
+ "email": self.new_member_email,
+ },
+ HTTP_X_REQUESTED_WITH="XMLHttpRequest",
+ )
+
+ # Ensure the prep ajax submission is successful
+ self.assertEqual(final_response.status_code, 200)
+
+ # Check that the response is a JSON response with is_valid
+ json_response = final_response.json()
+ self.assertIn("is_valid", json_response)
+ self.assertTrue(json_response["is_valid"])
+
+ # assert that portfolio invitation is not created
+ self.assertFalse(
+ PortfolioInvitation.objects.filter(email=self.new_member_email, portfolio=self.portfolio).exists(),
+ "Portfolio invitation should not be created when an Exception occurs.",
+ )
+
+ # Check that an email was not sent
+ self.assertFalse(mock_client.send_email.called)
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
- def test_member_invite_for_previously_invited_member(self):
+ @patch("registrar.views.portfolios.send_portfolio_invitation_email")
+ def test_member_invite_for_previously_invited_member_initial_ajax_call_fails(self, mock_send_email):
+ """Tests the initial ajax call in the member invitation flow for existing portfolio member."""
+ self.client.force_login(self.user)
+
+ # Simulate a session to ensure continuity
+ session_id = self.client.session.session_key
+ self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
+
+ invite_count_before = PortfolioInvitation.objects.count()
+
+ # Simulate submission of member invite for user who has already been invited
+ response = self.client.post(
+ reverse("new-member"),
+ {
+ "role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER.value,
+ "domain_request_permission_member": UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS.value,
+ "email": self.invited_member_email,
+ },
+ HTTP_X_REQUESTED_WITH="XMLHttpRequest",
+ )
+ self.assertEqual(response.status_code, 200)
+
+ # Check that the response is a JSON response with is_valid == False
+ json_response = response.json()
+ self.assertIn("is_valid", json_response)
+ self.assertFalse(json_response["is_valid"])
+
+ # Validate Database has not changed
+ invite_count_after = PortfolioInvitation.objects.count()
+ self.assertEqual(invite_count_after, invite_count_before)
+
+ # assert that send_portfolio_invitation_email is not called
+ mock_send_email.assert_not_called()
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ @patch("registrar.views.portfolios.send_portfolio_invitation_email")
+ def test_submit_new_member_raises_email_sending_error(self, mock_send_email):
+ """Test when adding a new member and email_send method raises EmailSendingError."""
+ mock_send_email.side_effect = EmailSendingError("Failed to send email.")
+
+ self.client.force_login(self.user)
+
+ # Simulate a session to ensure continuity
+ session_id = self.client.session.session_key
+ self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
+
+ form_data = {
+ "role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER.value,
+ "domain_request_permission_member": UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS.value,
+ "email": self.new_member_email,
+ }
+
+ # Act
+ with patch("django.contrib.messages.warning") as mock_warning:
+ response = self.client.post(reverse("new-member"), data=form_data)
+
+ # Assert
+ # assert that the send_portfolio_invitation_email called
+ mock_send_email.assert_called_once_with(
+ email=self.new_member_email, requestor=self.user, portfolio=self.portfolio
+ )
+ # assert that response is a redirect to reverse("members")
+ self.assertRedirects(response, reverse("members"))
+ # assert that messages contains message, "Could not send email invitation"
+ mock_warning.assert_called_once_with(response.wsgi_request, "Could not send email invitation.")
+ # assert that portfolio invitation is not created
+ self.assertFalse(
+ PortfolioInvitation.objects.filter(email=self.new_member_email, portfolio=self.portfolio).exists(),
+ "Portfolio invitation should not be created when an EmailSendingError occurs.",
+ )
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ @patch("registrar.views.portfolios.send_portfolio_invitation_email")
+ def test_submit_new_member_raises_missing_email_error(self, mock_send_email):
+ """Test when adding a new member and email_send method raises MissingEmailError."""
+ mock_send_email.side_effect = MissingEmailError()
+
+ self.client.force_login(self.user)
+
+ # Simulate a session to ensure continuity
+ session_id = self.client.session.session_key
+ self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
+
+ form_data = {
+ "role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER.value,
+ "domain_request_permission_member": UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS.value,
+ "email": self.new_member_email,
+ }
+
+ # Act
+ with patch("django.contrib.messages.error") as mock_error:
+ response = self.client.post(reverse("new-member"), data=form_data)
+
+ # Assert
+ # assert that the send_portfolio_invitation_email called
+ mock_send_email.assert_called_once_with(
+ email=self.new_member_email, requestor=self.user, portfolio=self.portfolio
+ )
+ # assert that response is a redirect to reverse("members")
+ self.assertRedirects(response, reverse("members"))
+ # assert that messages contains message, "Could not send email invitation"
+ mock_error.assert_called_once_with(
+ response.wsgi_request,
+ "Can't send invitation email. No email is associated with your user account.",
+ )
+ # assert that portfolio invitation is not created
+ self.assertFalse(
+ PortfolioInvitation.objects.filter(email=self.new_member_email, portfolio=self.portfolio).exists(),
+ "Portfolio invitation should not be created when a MissingEmailError occurs.",
+ )
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ @patch("registrar.views.portfolios.send_portfolio_invitation_email")
+ def test_submit_new_member_raises_exception(self, mock_send_email):
+ """Test when adding a new member and email_send method raises Exception."""
+ mock_send_email.side_effect = Exception("Generic exception")
+
+ self.client.force_login(self.user)
+
+ # Simulate a session to ensure continuity
+ session_id = self.client.session.session_key
+ self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
+
+ form_data = {
+ "role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER.value,
+ "domain_request_permission_member": UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS.value,
+ "email": self.new_member_email,
+ }
+
+ # Act
+ with patch("django.contrib.messages.warning") as mock_warning:
+ response = self.client.post(reverse("new-member"), data=form_data)
+
+ # Assert
+ # assert that the send_portfolio_invitation_email called
+ mock_send_email.assert_called_once_with(
+ email=self.new_member_email, requestor=self.user, portfolio=self.portfolio
+ )
+ # assert that response is a redirect to reverse("members")
+ self.assertRedirects(response, reverse("members"))
+ # assert that messages contains message, "Could not send email invitation"
+ mock_warning.assert_called_once_with(response.wsgi_request, "Could not send email invitation.")
+ # assert that portfolio invitation is not created
+ self.assertFalse(
+ PortfolioInvitation.objects.filter(email=self.new_member_email, portfolio=self.portfolio).exists(),
+ "Portfolio invitation should not be created when an Exception occurs.",
+ )
+
+ @less_console_noise_decorator
+ @override_flag("organization_feature", active=True)
+ @override_flag("organization_members", active=True)
+ @patch("registrar.views.portfolios.send_portfolio_invitation_email")
+ def test_member_invite_for_previously_invited_member(self, mock_send_email):
"""Tests the member invitation flow for existing portfolio member."""
self.client.force_login(self.user)
@@ -2600,23 +3122,35 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest):
response = self.client.post(
reverse("new-member"),
{
- "member_access_level": "basic",
- "basic_org_domain_request_permissions": "view_only",
+ "role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER.value,
+ "domain_request_permission_member": UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS.value,
"email": self.invited_member_email,
},
)
- self.assertEqual(response.status_code, 302) # Redirects
+ self.assertEqual(response.status_code, 200)
- # TODO: verify messages
+ # verify messages
+ self.assertContains(
+ response,
+ (
+ "This user is already assigned to a portfolio invitation. "
+ "Based on current waffle flag settings, users cannot be assigned "
+ "to multiple portfolios."
+ ),
+ )
# Validate Database has not changed
invite_count_after = PortfolioInvitation.objects.count()
self.assertEqual(invite_count_after, invite_count_before)
+ # assert that send_portfolio_invitation_email is not called
+ mock_send_email.assert_not_called()
+
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
- def test_member_invite_for_existing_member(self):
+ @patch("registrar.views.portfolios.send_portfolio_invitation_email")
+ def test_member_invite_for_existing_member(self, mock_send_email):
"""Tests the member invitation flow for existing portfolio member."""
self.client.force_login(self.user)
@@ -2630,19 +3164,30 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest):
response = self.client.post(
reverse("new-member"),
{
- "member_access_level": "basic",
- "basic_org_domain_request_permissions": "view_only",
+ "role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER.value,
+ "domain_request_permission_member": UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS.value,
"email": self.user.email,
},
)
- self.assertEqual(response.status_code, 302) # Redirects
+ self.assertEqual(response.status_code, 200)
- # TODO: verify messages
+ # Verify messages
+ self.assertContains(
+ response,
+ (
+ "This user is already assigned to a portfolio. "
+ "Based on current waffle flag settings, users cannot be "
+ "assigned to multiple portfolios."
+ ),
+ )
# Validate Database has not changed
invite_count_after = PortfolioInvitation.objects.count()
self.assertEqual(invite_count_after, invite_count_before)
+ # assert that send_portfolio_invitation_email is not called
+ mock_send_email.assert_not_called()
+
class TestEditPortfolioMemberView(WebTest):
"""Tests for the edit member page on portfolios"""
@@ -2783,7 +3328,13 @@ class TestEditPortfolioMemberView(WebTest):
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_admin_removing_own_admin_role(self):
- """Tests an admin removing their own admin role redirects to home."""
+ """Tests an admin removing their own admin role redirects to home.
+
+ Removing the admin role will remove both view and edit members permissions.
+ Note: The user can remove the edit members permissions but as long as they
+ stay in admin role, they will at least still have view members permissions.
+ """
+
self.client.force_login(self.user)
# Get the user's admin permission
diff --git a/src/registrar/utility/csv_export.py b/src/registrar/utility/csv_export.py
index 66809777b..af2fadeb9 100644
--- a/src/registrar/utility/csv_export.py
+++ b/src/registrar/utility/csv_export.py
@@ -538,11 +538,23 @@ class DomainExport(BaseExport):
# model objects as we export data, trying to reinstate model objects in order to grab @property
# values negatively impacts performance. Therefore, we will follow best practice and use annotations
return {
- "converted_generic_org_type": Case(
- # When portfolio is present, use its value instead
- When(portfolio__isnull=False, then=F("portfolio__organization_type")),
+ "converted_org_type": Case(
+ # When portfolio is present and is_election_board is True
+ When(
+ portfolio__isnull=False,
+ portfolio__organization_type__isnull=False,
+ is_election_board=True,
+ then=Concat(F("portfolio__organization_type"), Value("_election")),
+ ),
+ # When portfolio is present and is_election_board is False or None
+ When(
+ Q(is_election_board=False) | Q(is_election_board__isnull=True),
+ portfolio__isnull=False,
+ portfolio__organization_type__isnull=False,
+ then=F("portfolio__organization_type"),
+ ),
# Otherwise, return the natively assigned value
- default=F("generic_org_type"),
+ default=F("organization_type"),
output_field=CharField(),
),
"converted_federal_agency": Case(
@@ -573,20 +585,6 @@ class DomainExport(BaseExport):
default=F("organization_name"),
output_field=CharField(),
),
- "converted_city": Case(
- # When portfolio is present, use its value instead
- When(portfolio__isnull=False, then=F("portfolio__city")),
- # Otherwise, return the natively assigned value
- default=F("city"),
- output_field=CharField(),
- ),
- "converted_state_territory": Case(
- # When portfolio is present, use its value instead
- When(portfolio__isnull=False, then=F("portfolio__state_territory")),
- # Otherwise, return the natively assigned value
- default=F("state_territory"),
- output_field=CharField(),
- ),
"converted_so_email": Case(
# When portfolio is present, use its value instead
When(portfolio__isnull=False, then=F("portfolio__senior_official__email")),
@@ -727,7 +725,8 @@ class DomainExport(BaseExport):
first_ready_on = "(blank)"
# organization_type has organization_type AND is_election
- domain_org_type = model.get("converted_generic_org_type")
+ # domain_org_type includes "- Election" org_type variants
+ domain_org_type = model.get("converted_org_type")
human_readable_domain_org_type = DomainRequest.OrgChoicesElectionOffice.get_org_label(domain_org_type)
domain_federal_type = model.get("converted_federal_type")
human_readable_domain_federal_type = BranchChoices.get_branch_label(domain_federal_type)
@@ -772,8 +771,8 @@ class DomainExport(BaseExport):
"Domain type": model.get("domain_type"),
"Agency": model.get("converted_federal_agency"),
"Organization name": model.get("converted_organization_name"),
- "City": model.get("converted_city"),
- "State": model.get("converted_state_territory"),
+ "City": model.get("city"),
+ "State": model.get("state_territory"),
"SO": model.get("converted_so_name"),
"SO email": model.get("converted_so_email"),
"Security contact email": model.get("security_contact_email"),
@@ -908,7 +907,7 @@ class DomainDataType(DomainExport):
"""
# Coalesce is used to replace federal_type of None with ZZZZZ
return [
- "converted_generic_org_type",
+ "converted_org_type",
Coalesce("converted_federal_type", Value("ZZZZZ")),
"converted_federal_agency",
"domain__name",
@@ -987,105 +986,6 @@ class DomainDataTypeUser(DomainDataType):
return Q(domain__id__in=request.user.get_user_domain_ids(request))
-class DomainRequestsDataType:
- """
- The DomainRequestsDataType report, but filtered based on the current request user
- """
-
- @classmethod
- def get_filter_conditions(cls, request=None, **kwargs):
- if request is None or not hasattr(request, "user") or not request.user.is_authenticated:
- return Q(id__in=[])
-
- request_ids = request.user.get_user_domain_request_ids(request)
- return Q(id__in=request_ids)
-
- @classmethod
- def get_queryset(cls, request):
- return DomainRequest.objects.filter(cls.get_filter_conditions(request))
-
- def safe_get(attribute, default="N/A"):
- # Return the attribute value or default if not present
- return attribute if attribute is not None else default
-
- @classmethod
- def exporting_dr_data_to_csv(cls, response, request=None):
- import csv
-
- writer = csv.writer(response)
-
- # CSV headers
- writer.writerow(
- [
- "Domain request",
- "Region",
- "Status",
- "Election office",
- "Federal type",
- "Domain type",
- "Request additional details",
- "Creator approved domains count",
- "Creator active requests count",
- "Alternative domains",
- "Other contacts",
- "Current websites",
- "Federal agency",
- "SO first name",
- "SO last name",
- "SO email",
- "SO title/role",
- "Creator first name",
- "Creator last name",
- "Creator email",
- "Organization name",
- "City",
- "State/territory",
- "Request purpose",
- "CISA regional representative",
- "Last submitted date",
- "First submitted date",
- "Last status update",
- ]
- )
-
- queryset = cls.get_queryset(request)
- for request in queryset:
- writer.writerow(
- [
- request.requested_domain,
- cls.safe_get(getattr(request, "region_field", None)),
- request.status,
- cls.safe_get(getattr(request, "election_office", None)),
- request.converted_federal_type,
- cls.safe_get(getattr(request, "domain_type", None)),
- cls.safe_get(getattr(request, "additional_details", None)),
- cls.safe_get(getattr(request, "creator_approved_domains_count", None)),
- cls.safe_get(getattr(request, "creator_active_requests_count", None)),
- cls.safe_get(getattr(request, "all_alternative_domains", None)),
- cls.safe_get(getattr(request, "all_other_contacts", None)),
- cls.safe_get(getattr(request, "all_current_websites", None)),
- cls.safe_get(getattr(request, "converted_federal_agency", None)),
- cls.safe_get(getattr(request.converted_senior_official, "first_name", None)),
- cls.safe_get(getattr(request.converted_senior_official, "last_name", None)),
- cls.safe_get(getattr(request.converted_senior_official, "email", None)),
- cls.safe_get(getattr(request.converted_senior_official, "title", None)),
- cls.safe_get(getattr(request.creator, "first_name", None)),
- cls.safe_get(getattr(request.creator, "last_name", None)),
- cls.safe_get(getattr(request.creator, "email", None)),
- cls.safe_get(getattr(request, "converted_organization_name", None)),
- cls.safe_get(getattr(request, "converted_city", None)),
- cls.safe_get(getattr(request, "converted_state_territory", None)),
- cls.safe_get(getattr(request, "purpose", None)),
- cls.safe_get(getattr(request, "cisa_representative_email", None)),
- cls.safe_get(getattr(request, "last_submitted_date", None)),
- cls.safe_get(getattr(request, "first_submitted_date", None)),
- cls.safe_get(getattr(request, "last_status_update", None)),
- ]
- )
-
- return response
-
-
class DomainDataFull(DomainExport):
"""
Shows security contacts, filtered by state
@@ -1760,20 +1660,6 @@ class DomainRequestExport(BaseExport):
default=F("organization_name"),
output_field=CharField(),
),
- "converted_city": Case(
- # When portfolio is present, use its value instead
- When(portfolio__isnull=False, then=F("portfolio__city")),
- # Otherwise, return the natively assigned value
- default=F("city"),
- output_field=CharField(),
- ),
- "converted_state_territory": Case(
- # When portfolio is present, use its value instead
- When(portfolio__isnull=False, then=F("portfolio__state_territory")),
- # Otherwise, return the natively assigned value
- default=F("state_territory"),
- output_field=CharField(),
- ),
"converted_so_email": Case(
# When portfolio is present, use its value instead
When(portfolio__isnull=False, then=F("portfolio__senior_official__email")),
@@ -1952,8 +1838,8 @@ class DomainRequestExport(BaseExport):
"Investigator": model.get("investigator__email"),
# Untouched fields
"Organization name": model.get("converted_organization_name"),
- "City": model.get("converted_city"),
- "State/territory": model.get("converted_state_territory"),
+ "City": model.get("city"),
+ "State/territory": model.get("state_territory"),
"Request purpose": model.get("purpose"),
"CISA regional representative": model.get("cisa_representative_email"),
"Last submitted date": model.get("last_submitted_date"),
@@ -1965,6 +1851,92 @@ class DomainRequestExport(BaseExport):
return row
+class DomainRequestDataType(DomainRequestExport):
+ """
+ The DomainRequestDataType report, but filtered based on the current request user
+ """
+
+ @classmethod
+ def get_columns(cls):
+ """
+ Overrides the columns for CSV export specific to DomainRequestDataType.
+ """
+ return [
+ "Domain request",
+ "Region",
+ "Status",
+ "Election office",
+ "Federal type",
+ "Domain type",
+ "Request additional details",
+ "Creator approved domains count",
+ "Creator active requests count",
+ "Alternative domains",
+ "Other contacts",
+ "Current websites",
+ "Federal agency",
+ "SO first name",
+ "SO last name",
+ "SO email",
+ "SO title/role",
+ "Creator first name",
+ "Creator last name",
+ "Creator email",
+ "Organization name",
+ "City",
+ "State/territory",
+ "Request purpose",
+ "CISA regional representative",
+ "Last submitted date",
+ "First submitted date",
+ "Last status update",
+ ]
+
+ @classmethod
+ def get_filter_conditions(cls, request=None, **kwargs):
+ """
+ Get a Q object of filter conditions to filter when building queryset.
+ """
+ if request is None or not hasattr(request, "user") or not request.user:
+ # Return nothing
+ return Q(id__in=[])
+ else:
+ # Get all domain requests the user is associated with
+ return Q(id__in=request.user.get_user_domain_request_ids(request))
+
+ @classmethod
+ def get_select_related(cls):
+ """
+ Get a list of tables to pass to select_related when building queryset.
+ """
+ return ["creator", "senior_official", "federal_agency", "investigator", "requested_domain"]
+
+ @classmethod
+ def get_prefetch_related(cls):
+ """
+ Get a list of tables to pass to prefetch_related when building queryset.
+ """
+ return ["current_websites", "other_contacts", "alternative_domains"]
+
+ @classmethod
+ def get_related_table_fields(cls):
+ """
+ Get a list of fields from related tables.
+ """
+ return [
+ "requested_domain__name",
+ "federal_agency__agency",
+ "senior_official__first_name",
+ "senior_official__last_name",
+ "senior_official__email",
+ "senior_official__title",
+ "creator__first_name",
+ "creator__last_name",
+ "creator__email",
+ "investigator__email",
+ ]
+
+
class DomainRequestGrowth(DomainRequestExport):
"""
Shows submitted requests within a date range, sorted
diff --git a/src/registrar/utility/email_invitations.py b/src/registrar/utility/email_invitations.py
new file mode 100644
index 000000000..7171b8902
--- /dev/null
+++ b/src/registrar/utility/email_invitations.py
@@ -0,0 +1,114 @@
+from django.conf import settings
+from registrar.models import DomainInvitation
+from registrar.utility.errors import (
+ AlreadyDomainInvitedError,
+ AlreadyDomainManagerError,
+ MissingEmailError,
+ OutsideOrgMemberError,
+)
+from registrar.utility.waffle import flag_is_active_for_user
+from registrar.utility.email import send_templated_email
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+def send_domain_invitation_email(email: str, requestor, domain, is_member_of_different_org):
+ """
+ Sends a domain invitation email to the specified address.
+
+ Raises exceptions for validation or email-sending issues.
+
+ Args:
+ email (str): Email address of the recipient.
+ requestor (User): The user initiating the invitation.
+ domain (Domain): The domain object for which the invitation is being sent.
+ is_member_of_different_org (bool): if an email belongs to a different org
+
+ Raises:
+ MissingEmailError: If the requestor has no email associated with their account.
+ AlreadyDomainManagerError: If the email corresponds to an existing domain manager.
+ AlreadyDomainInvitedError: If an invitation has already been sent.
+ OutsideOrgMemberError: If the requested_user is part of a different organization.
+ EmailSendingError: If there is an error while sending the email.
+ """
+ # Default email address for staff
+ requestor_email = settings.DEFAULT_FROM_EMAIL
+
+ # Check if the requestor is staff and has an email
+ if not requestor.is_staff:
+ if not requestor.email or requestor.email.strip() == "":
+ raise MissingEmailError
+ else:
+ requestor_email = requestor.email
+
+ # Check if the recipient is part of a different organization
+ # COMMENT: this does not account for multiple_portfolios flag being active
+ if (
+ flag_is_active_for_user(requestor, "organization_feature")
+ and not flag_is_active_for_user(requestor, "multiple_portfolios")
+ and is_member_of_different_org
+ ):
+ raise OutsideOrgMemberError
+
+ # Check for an existing invitation
+ try:
+ invite = DomainInvitation.objects.get(email=email, domain=domain)
+ if invite.status == DomainInvitation.DomainInvitationStatus.RETRIEVED:
+ raise AlreadyDomainManagerError(email)
+ elif invite.status == DomainInvitation.DomainInvitationStatus.CANCELED:
+ invite.update_cancellation_status()
+ invite.save()
+ else:
+ raise AlreadyDomainInvitedError(email)
+ except DomainInvitation.DoesNotExist:
+ pass
+
+ # Send the email
+ send_templated_email(
+ "emails/domain_invitation.txt",
+ "emails/domain_invitation_subject.txt",
+ to_address=email,
+ context={
+ "domain": domain,
+ "requestor_email": requestor_email,
+ },
+ )
+
+
+def send_portfolio_invitation_email(email: str, requestor, portfolio):
+ """
+ Sends a portfolio member invitation email to the specified address.
+
+ Raises exceptions for validation or email-sending issues.
+
+ Args:
+ email (str): Email address of the recipient
+ requestor (User): The user initiating the invitation.
+ portfolio (Portfolio): The portfolio object for which the invitation is being sent.
+
+ Raises:
+ MissingEmailError: If the requestor has no email associated with their account.
+ EmailSendingError: If there is an error while sending the email.
+ """
+
+ # Default email address for staff
+ requestor_email = settings.DEFAULT_FROM_EMAIL
+
+ # Check if the requestor is staff and has an email
+ if not requestor.is_staff:
+ if not requestor.email or requestor.email.strip() == "":
+ raise MissingEmailError
+ else:
+ requestor_email = requestor.email
+
+ send_templated_email(
+ "emails/portfolio_invitation.txt",
+ "emails/portfolio_invitation_subject.txt",
+ to_address=email,
+ context={
+ "portfolio": portfolio,
+ "requestor_email": requestor_email,
+ "email": email,
+ },
+ )
diff --git a/src/registrar/utility/errors.py b/src/registrar/utility/errors.py
index e70f06d1e..039fb3696 100644
--- a/src/registrar/utility/errors.py
+++ b/src/registrar/utility/errors.py
@@ -23,6 +23,33 @@ class InvalidDomainError(ValueError):
pass
+class InvitationError(Exception):
+ """Base exception for invitation-related errors."""
+
+ pass
+
+
+class AlreadyDomainManagerError(InvitationError):
+ """Raised when the user is already a manager for the domain."""
+
+ def __init__(self, email):
+ super().__init__(f"{email} is already a manager for this domain.")
+
+
+class AlreadyDomainInvitedError(InvitationError):
+ """Raised when the user has already been invited to the domain."""
+
+ def __init__(self, email):
+ super().__init__(f"{email} has already been invited to this domain.")
+
+
+class MissingEmailError(InvitationError):
+ """Raised when the requestor has no email associated with their account."""
+
+ def __init__(self):
+ super().__init__("Can't send invitation email. No email is associated with your user account.")
+
+
class OutsideOrgMemberError(ValueError):
"""
Error raised when an org member tries adding a user from a different .gov org.
diff --git a/src/registrar/views/domain.py b/src/registrar/views/domain.py
index 45f1e6ad2..f544a20f7 100644
--- a/src/registrar/views/domain.py
+++ b/src/registrar/views/domain.py
@@ -25,14 +25,17 @@ from registrar.models import (
PortfolioInvitation,
User,
UserDomainRole,
- UserPortfolioPermission,
PublicContact,
)
+from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
from registrar.utility.enums import DefaultEmail
from registrar.utility.errors import (
+ AlreadyDomainInvitedError,
+ AlreadyDomainManagerError,
GenericError,
GenericErrorCodes,
+ MissingEmailError,
NameserverError,
NameserverErrorCodes as nsErrorCodes,
DsDataError,
@@ -63,6 +66,7 @@ from epplibwrapper import (
)
from ..utility.email import send_templated_email, EmailSendingError
+from ..utility.email_invitations import send_domain_invitation_email, send_portfolio_invitation_email
from .utility import DomainPermissionView, DomainInvitationPermissionCancelView
from django import forms
@@ -1100,7 +1104,10 @@ class DomainUsersView(DomainBaseView):
# If any of the PortfolioInvitations have the ORGANIZATION_ADMIN role, set the flag to True
for portfolio_invitation in portfolio_invitations:
- if UserPortfolioRoleChoices.ORGANIZATION_ADMIN in portfolio_invitation.roles:
+ if (
+ portfolio_invitation.roles
+ and UserPortfolioRoleChoices.ORGANIZATION_ADMIN in portfolio_invitation.roles
+ ):
has_admin_flag = True
break # Once we find one match, no need to check further
@@ -1142,170 +1149,172 @@ class DomainAddUserView(DomainFormBaseView):
def get_success_url(self):
return reverse("domain-users", kwargs={"pk": self.object.pk})
- def _domain_abs_url(self):
- """Get an absolute URL for this domain."""
- return self.request.build_absolute_uri(reverse("domain", kwargs={"pk": self.object.id}))
+ def _get_org_membership(self, requestor_org, requested_email, requested_user):
+ """
+ Verifies if an email belongs to a different organization as a member or invited member.
+ Verifies if an email belongs to this organization as a member or invited member.
+ User does not belong to any org can be deduced from the tuple returned.
- def _is_member_of_different_org(self, email, requestor, requested_user):
- """Verifies if an email belongs to a different organization as a member or invited member."""
- # Check if user is a already member of a different organization than the requestor's org
- requestor_org = UserPortfolioPermission.objects.filter(user=requestor).first().portfolio
- existing_org_permission = UserPortfolioPermission.objects.filter(user=requested_user).first()
- existing_org_invitation = PortfolioInvitation.objects.filter(email=email).first()
-
- return (existing_org_permission and existing_org_permission.portfolio != requestor_org) or (
- existing_org_invitation and existing_org_invitation.portfolio != requestor_org
- )
-
- def _check_invite_status(self, invite, email):
- """Check if invitation status is canceled or retrieved, and gives the appropiate response"""
- if invite.status == DomainInvitation.DomainInvitationStatus.RETRIEVED:
- messages.warning(
- self.request,
- f"{email} is already a manager for this domain.",
- )
- return False
- elif invite.status == DomainInvitation.DomainInvitationStatus.CANCELED:
- invite.update_cancellation_status()
- invite.save()
- return True
- else:
- # else if it has been sent but not accepted
- messages.warning(self.request, f"{email} has already been invited to this domain")
- return False
-
- def _send_domain_invitation_email(self, email: str, requestor: User, requested_user=None, add_success=True):
- """Performs the sending of the domain invitation email,
- does not make a domain information object
- email: string- email to send to
- add_success: bool- default True indicates:
- adding a success message to the view if the email sending succeeds
-
- raises EmailSendingError
+ Returns a tuple (member_of_a_different_org, member_of_this_org).
"""
- # Set a default email address to send to for staff
- requestor_email = settings.DEFAULT_FROM_EMAIL
+ # COMMENT: this code does not take into account multiple portfolios flag
- # Check if the email requestor has a valid email address
- if not requestor.is_staff and requestor.email is not None and requestor.email.strip() != "":
- requestor_email = requestor.email
- elif not requestor.is_staff:
- messages.error(self.request, "Can't send invitation email. No email is associated with your account.")
- logger.error(
- f"Can't send email to '{email}' on domain '{self.object}'."
- f"No email exists for the requestor '{requestor.username}'.",
- exc_info=True,
- )
+ # COMMENT: shouldn't this code be based on the organization of the domain, not the org
+ # of the requestor? requestor could have multiple portfolios
+
+ # Check for existing permissions or invitations for the requested user
+ existing_org_permission = UserPortfolioPermission.objects.filter(user=requested_user).first()
+ existing_org_invitation = PortfolioInvitation.objects.filter(email=requested_email).first()
+
+ # Determine membership in a different organization
+ member_of_a_different_org = (
+ existing_org_permission and existing_org_permission.portfolio != requestor_org
+ ) or (existing_org_invitation and existing_org_invitation.portfolio != requestor_org)
+
+ # Determine membership in the same organization
+ member_of_this_org = (existing_org_permission and existing_org_permission.portfolio == requestor_org) or (
+ existing_org_invitation and existing_org_invitation.portfolio == requestor_org
+ )
+
+ return member_of_a_different_org, member_of_this_org
+
+ def form_valid(self, form):
+ """Add the specified user to this domain."""
+ requested_email = form.cleaned_data["email"]
+ requestor = self.request.user
+
+ # Look up a user with that email
+ requested_user = self._get_requested_user(requested_email)
+ # NOTE: This does not account for multiple portfolios flag being set to True
+ domain_org = self.object.domain_info.portfolio
+
+ # requestor can only send portfolio invitations if they are staff or if they are a member
+ # of the domain's portfolio
+ requestor_can_update_portfolio = (
+ UserPortfolioPermission.objects.filter(user=requestor, portfolio=domain_org).first() is not None
+ or requestor.is_staff
+ )
+
+ member_of_a_different_org, member_of_this_org = self._get_org_membership(
+ domain_org, requested_email, requested_user
+ )
+
+ # determine portfolio of the domain (code currently is looking at requestor's portfolio)
+ # if requested_email/user is not member or invited member of this portfolio
+ # COMMENT: this code does not take into account multiple portfolios flag
+ # send portfolio invitation email
+ # create portfolio invitation
+ # create message to view
+ if (
+ flag_is_active_for_user(requestor, "organization_feature")
+ and not flag_is_active_for_user(requestor, "multiple_portfolios")
+ and domain_org is not None
+ and requestor_can_update_portfolio
+ and not member_of_this_org
+ ):
+ try:
+ send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=domain_org)
+ PortfolioInvitation.objects.get_or_create(email=requested_email, portfolio=domain_org)
+ messages.success(self.request, f"{requested_email} has been invited to the organization: {domain_org}")
+ except Exception as e:
+ self._handle_portfolio_exceptions(e, requested_email, domain_org)
+ # If that first invite does not succeed take an early exit
+ return redirect(self.get_success_url())
+
+ try:
+ if requested_user is None:
+ self._handle_new_user_invitation(requested_email, requestor, member_of_a_different_org)
+ else:
+ self._handle_existing_user(requested_email, requestor, requested_user, member_of_a_different_org)
+ except Exception as e:
+ self._handle_exceptions(e, requested_email)
+
+ return redirect(self.get_success_url())
+
+ def _get_requested_user(self, email):
+ """Retrieve a user by email or return None if the user doesn't exist."""
+ try:
+ return User.objects.get(email=email)
+ except User.DoesNotExist:
return None
- # Check is user is a member or invited member of a different org from this domain's org
- if flag_is_active_for_user(requestor, "organization_feature") and self._is_member_of_different_org(
- email, requestor, requested_user
- ):
- add_success = False
- raise OutsideOrgMemberError
+ def _handle_new_user_invitation(self, email, requestor, member_of_different_org):
+ """Handle invitation for a new user who does not exist in the system."""
+ send_domain_invitation_email(
+ email=email,
+ requestor=requestor,
+ domain=self.object,
+ is_member_of_different_org=member_of_different_org,
+ )
+ DomainInvitation.objects.get_or_create(email=email, domain=self.object)
+ messages.success(self.request, f"{email} has been invited to the domain: {self.object}")
- # Check to see if an invite has already been sent
- try:
- invite = DomainInvitation.objects.get(email=email, domain=self.object)
- # check if the invite has already been accepted or has a canceled invite
- add_success = self._check_invite_status(invite, email)
- except Exception:
- logger.error("An error occured")
+ def _handle_existing_user(self, email, requestor, requested_user, member_of_different_org):
+ """Handle adding an existing user to the domain."""
+ send_domain_invitation_email(
+ email=email,
+ requestor=requestor,
+ domain=self.object,
+ is_member_of_different_org=member_of_different_org,
+ )
+ UserDomainRole.objects.create(
+ user=requested_user,
+ domain=self.object,
+ role=UserDomainRole.Roles.MANAGER,
+ )
+ messages.success(self.request, f"Added user {email}.")
- try:
- send_templated_email(
- "emails/domain_invitation.txt",
- "emails/domain_invitation_subject.txt",
- to_address=email,
- context={
- "domain_url": self._domain_abs_url(),
- "domain": self.object,
- "requestor_email": requestor_email,
- },
- )
- except EmailSendingError as exc:
- logger.warn(
- "Could not sent email invitation to %s for domain %s",
+ def _handle_exceptions(self, exception, email):
+ """Handle exceptions raised during the process."""
+ if isinstance(exception, EmailSendingError):
+ logger.warning(
+ "Could not send email invitation to %s for domain %s (EmailSendingError)",
email,
self.object,
exc_info=True,
)
- logger.info(exc)
- raise EmailSendingError("Could not send email invitation.") from exc
- else:
- if add_success:
- messages.success(self.request, f"{email} has been invited to this domain.")
-
- def _make_invitation(self, email_address: str, requestor: User):
- """Make a Domain invitation for this email and redirect with a message."""
- try:
- self._send_domain_invitation_email(email=email_address, requestor=requestor)
- except EmailSendingError:
messages.warning(self.request, "Could not send email invitation.")
+ elif isinstance(exception, OutsideOrgMemberError):
+ logger.warning(
+ "Could not send email. Can not invite member of a .gov organization to a different organization.",
+ self.object,
+ exc_info=True,
+ )
+ messages.error(
+ self.request,
+ f"{email} is already a member of another .gov organization.",
+ )
+ elif isinstance(exception, AlreadyDomainManagerError):
+ messages.warning(self.request, str(exception))
+ elif isinstance(exception, AlreadyDomainInvitedError):
+ messages.warning(self.request, str(exception))
+ elif isinstance(exception, MissingEmailError):
+ messages.error(self.request, str(exception))
+ logger.error(
+ f"Can't send email to '{email}' on domain '{self.object}'. No email exists for the requestor.",
+ exc_info=True,
+ )
+ elif isinstance(exception, IntegrityError):
+ messages.warning(self.request, f"{email} is already a manager for this domain")
else:
- # (NOTE: only create a domainInvitation if the e-mail sends correctly)
- DomainInvitation.objects.get_or_create(email=email_address, domain=self.object)
- return redirect(self.get_success_url())
+ logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
+ messages.warning(self.request, "Could not send email invitation.")
- def form_valid(self, form):
- """Add the specified user on this domain.
- Throws EmailSendingError."""
- requested_email = form.cleaned_data["email"]
- requestor = self.request.user
- email_success = False
- # look up a user with that email
- try:
- requested_user = User.objects.get(email=requested_email)
- except User.DoesNotExist:
- # no matching user, go make an invitation
- email_success = True
- return self._make_invitation(requested_email, requestor)
+ def _handle_portfolio_exceptions(self, exception, email, portfolio):
+ """Handle exceptions raised during the process."""
+ if isinstance(exception, EmailSendingError):
+ logger.warning("Could not send email invitation (EmailSendingError)", exc_info=True)
+ messages.warning(self.request, "Could not send email invitation.")
+ elif isinstance(exception, MissingEmailError):
+ messages.error(self.request, str(exception))
+ logger.error(
+ f"Can't send email to '{email}' for portfolio '{portfolio}'. No email exists for the requestor.",
+ exc_info=True,
+ )
else:
- # if user already exists then just send an email
- try:
- self._send_domain_invitation_email(
- requested_email, requestor, requested_user=requested_user, add_success=False
- )
- email_success = True
- except EmailSendingError:
- logger.warn(
- "Could not send email invitation (EmailSendingError)",
- self.object,
- exc_info=True,
- )
- messages.warning(self.request, "Could not send email invitation.")
- email_success = True
- except OutsideOrgMemberError:
- logger.warn(
- "Could not send email. Can not invite member of a .gov organization to a different organization.",
- self.object,
- exc_info=True,
- )
- messages.error(
- self.request,
- f"{requested_email} is already a member of another .gov organization.",
- )
- except Exception:
- logger.warn(
- "Could not send email invitation (Other Exception)",
- self.object,
- exc_info=True,
- )
- messages.warning(self.request, "Could not send email invitation.")
- if email_success:
- try:
- UserDomainRole.objects.create(
- user=requested_user,
- domain=self.object,
- role=UserDomainRole.Roles.MANAGER,
- )
- messages.success(self.request, f"Added user {requested_email}.")
- except IntegrityError:
- messages.warning(self.request, f"{requested_email} is already a manager for this domain")
-
- return redirect(self.get_success_url())
+ logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
+ messages.warning(self.request, "Could not send email invitation.")
class DomainInvitationCancelView(SuccessMessageMixin, DomainInvitationPermissionCancelView):
diff --git a/src/registrar/views/member_domains_json.py b/src/registrar/views/member_domains_json.py
index 125059692..3d24336bb 100644
--- a/src/registrar/views/member_domains_json.py
+++ b/src/registrar/views/member_domains_json.py
@@ -90,7 +90,9 @@ class PortfolioMemberDomainsJson(PortfolioMemberDomainsPermission, View):
domain_info_ids = DomainInformation.objects.filter(portfolio=portfolio).values_list(
"domain_id", flat=True
)
- domain_invitations = DomainInvitation.objects.filter(email=email).values_list("domain_id", flat=True)
+ domain_invitations = DomainInvitation.objects.filter(
+ email=email, status=DomainInvitation.DomainInvitationStatus.INVITED
+ ).values_list("domain_id", flat=True)
return domain_info_ids.intersection(domain_invitations)
else:
domain_infos = DomainInformation.objects.filter(portfolio=portfolio)
diff --git a/src/registrar/views/portfolio_members_json.py b/src/registrar/views/portfolio_members_json.py
index b5c608eab..11e58e112 100644
--- a/src/registrar/views/portfolio_members_json.py
+++ b/src/registrar/views/portfolio_members_json.py
@@ -12,6 +12,7 @@ from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from registrar.views.utility.mixins import PortfolioMembersPermission
from registrar.models.utility.orm_helper import ArrayRemoveNull
+from django.contrib.postgres.aggregates import StringAgg
class PortfolioMembersJson(PortfolioMembersPermission, View):
@@ -119,11 +120,22 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
def initial_invitations_search(self, portfolio):
"""Perform initial invitations search and get related DomainInvitation data based on the email."""
- # Get DomainInvitation query for matching email and for the portfolio
- domain_invitations = DomainInvitation.objects.filter(
- email=OuterRef("email"), # Check if email matches the OuterRef("email")
- domain__domain_info__portfolio=portfolio, # Check if the domain's portfolio matches the given portfolio
- ).annotate(domain_info=Concat(F("domain__id"), Value(":"), F("domain__name"), output_field=CharField()))
+
+ # Subquery to get concatenated domain information for each email
+ domain_invitations = (
+ DomainInvitation.objects.filter(email=OuterRef("email"), domain__domain_info__portfolio=portfolio)
+ .annotate(
+ concatenated_info=Concat(F("domain__id"), Value(":"), F("domain__name"), output_field=CharField())
+ )
+ .values("concatenated_info")
+ )
+
+ concatenated_domain_info = (
+ domain_invitations.values("email")
+ .annotate(domain_info=StringAgg("concatenated_info", delimiter=", "))
+ .values("domain_info")
+ )
+
# PortfolioInvitation query
invitations = PortfolioInvitation.objects.filter(portfolio=portfolio)
invitations = invitations.annotate(
@@ -136,7 +148,12 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
# Use ArrayRemove to return an empty list when no domain invitations are found
domain_info=ArrayRemoveNull(
ArrayAgg(
- Subquery(domain_invitations.values("domain_info")),
+ # We've pre-concatenated the domain infos to limit the subquery to return a single virtual 'row',
+ # otherwise we'll trigger a "more than one row returned by a subquery used as an expression"
+ # when an email matches multiple domain invitations.
+ # We'll take care when processing the list of one single concatenated items item
+ # in serialize_members.
+ Subquery(concatenated_domain_info),
distinct=True,
)
),
@@ -153,6 +170,7 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
"domain_info",
"type",
)
+
return invitations
def apply_search_term(self, queryset, request):
@@ -190,10 +208,19 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
is_admin = UserPortfolioRoleChoices.ORGANIZATION_ADMIN in (item.get("roles") or [])
action_url = reverse(item["type"], kwargs={"pk": item["id"]})
+ item_type = item.get("type", "")
+
+ # Ensure domain_info is properly processed for invites -
+ # we need to un-concatenate the subquery
+ domain_info_list = item.get("domain_info", [])
+ if item_type == "invitedmember" and isinstance(domain_info_list, list) and domain_info_list:
+ # Split the first item in the list if it exists
+ domain_info_list = domain_info_list[0].split(", ")
+
# Serialize member data
member_json = {
"id": item.get("id", ""), # id is id of UserPortfolioPermission or PortfolioInvitation
- "type": item.get("type", ""), # source is member or invitedmember
+ "type": item_type, # source is member or invitedmember
"name": " ".join(filter(None, [item.get("first_name", ""), item.get("last_name", "")])),
"email": item.get("email_display", ""),
"member_display": item.get("member_display", ""),
@@ -203,9 +230,9 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
),
# split domain_info array values into ids to form urls, and names
"domain_urls": [
- reverse("domain", kwargs={"pk": domain_info.split(":")[0]}) for domain_info in item.get("domain_info")
+ reverse("domain", kwargs={"pk": domain_info.split(":")[0]}) for domain_info in domain_info_list
],
- "domain_names": [domain_info.split(":")[1] for domain_info in item.get("domain_info")],
+ "domain_names": [domain_info.split(":")[1] for domain_info in domain_info_list],
"is_admin": is_admin,
"last_active": item.get("last_active"),
"action_url": action_url,
diff --git a/src/registrar/views/portfolios.py b/src/registrar/views/portfolios.py
index 8e1df48f3..751e28d85 100644
--- a/src/registrar/views/portfolios.py
+++ b/src/registrar/views/portfolios.py
@@ -1,5 +1,5 @@
+import json
import logging
-from django.conf import settings
from django.http import Http404, JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
@@ -8,10 +8,15 @@ from django.utils.safestring import mark_safe
from django.contrib import messages
from registrar.forms import portfolio as portfolioForms
from registrar.models import Portfolio, User
+from registrar.models.domain_invitation import DomainInvitation
from registrar.models.portfolio_invitation import PortfolioInvitation
+from registrar.models.user_domain_role import UserDomainRole
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from registrar.utility.email import EmailSendingError
+from registrar.utility.email_invitations import send_portfolio_invitation_email
+from registrar.utility.errors import MissingEmailError
+from registrar.utility.enums import DefaultUserValues
from registrar.views.utility.mixins import PortfolioMemberPermission
from registrar.views.utility.permission_views import (
PortfolioDomainRequestsPermissionView,
@@ -26,6 +31,7 @@ from registrar.views.utility.permission_views import (
)
from django.views.generic import View
from django.views.generic.edit import FormMixin
+from django.db import IntegrityError
logger = logging.getLogger(__name__)
@@ -145,7 +151,7 @@ class PortfolioMemberDeleteView(PortfolioMemberPermission, View):
class PortfolioMemberEditView(PortfolioMemberEditPermissionView, View):
template_name = "portfolio_member_permissions.html"
- form_class = portfolioForms.BasePortfolioMemberForm
+ form_class = portfolioForms.PortfolioMemberForm
def get(self, request, pk):
portfolio_permission = get_object_or_404(UserPortfolioPermission, pk=pk)
@@ -164,13 +170,14 @@ class PortfolioMemberEditView(PortfolioMemberEditPermissionView, View):
def post(self, request, pk):
portfolio_permission = get_object_or_404(UserPortfolioPermission, pk=pk)
+ user_initially_is_admin = UserPortfolioRoleChoices.ORGANIZATION_ADMIN in portfolio_permission.roles
user = portfolio_permission.user
form = self.form_class(request.POST, instance=portfolio_permission)
if form.is_valid():
# Check if user is removing their own admin or edit role
removing_admin_role_on_self = (
request.user == user
- and UserPortfolioRoleChoices.ORGANIZATION_ADMIN in portfolio_permission.roles
+ and user_initially_is_admin
and UserPortfolioRoleChoices.ORGANIZATION_ADMIN not in form.cleaned_data.get("role", [])
)
form.save()
@@ -222,6 +229,86 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V
},
)
+ def post(self, request, pk):
+ """
+ Handles adding and removing domains for a portfolio member.
+ """
+ added_domains = request.POST.get("added_domains")
+ removed_domains = request.POST.get("removed_domains")
+ portfolio_permission = get_object_or_404(UserPortfolioPermission, pk=pk)
+ member = portfolio_permission.user
+
+ added_domain_ids = self._parse_domain_ids(added_domains, "added domains")
+ if added_domain_ids is None:
+ return redirect(reverse("member-domains", kwargs={"pk": pk}))
+
+ removed_domain_ids = self._parse_domain_ids(removed_domains, "removed domains")
+ if removed_domain_ids is None:
+ return redirect(reverse("member-domains", kwargs={"pk": pk}))
+
+ if added_domain_ids or removed_domain_ids:
+ try:
+ self._process_added_domains(added_domain_ids, member)
+ self._process_removed_domains(removed_domain_ids, member)
+ messages.success(request, "The domain assignment changes have been saved.")
+ return redirect(reverse("member-domains", kwargs={"pk": pk}))
+ except IntegrityError:
+ messages.error(
+ request,
+ "A database error occurred while saving changes. If the issue persists, "
+ f"please contact {DefaultUserValues.HELP_EMAIL}.",
+ )
+ logger.error("A database error occurred while saving changes.")
+ return redirect(reverse("member-domains-edit", kwargs={"pk": pk}))
+ except Exception as e:
+ messages.error(
+ request,
+ "An unexpected error occurred: {str(e)}. If the issue persists, "
+ f"please contact {DefaultUserValues.HELP_EMAIL}.",
+ )
+ logger.error(f"An unexpected error occurred: {str(e)}")
+ return redirect(reverse("member-domains-edit", kwargs={"pk": pk}))
+ else:
+ messages.info(request, "No changes detected.")
+ return redirect(reverse("member-domains", kwargs={"pk": pk}))
+
+ def _parse_domain_ids(self, domain_data, domain_type):
+ """
+ Parses the domain IDs from the request and handles JSON errors.
+ """
+ try:
+ return json.loads(domain_data) if domain_data else []
+ except json.JSONDecodeError:
+ messages.error(
+ self.request,
+ f"Invalid data for {domain_type}. If the issue persists, "
+ f"please contact {DefaultUserValues.HELP_EMAIL}.",
+ )
+ logger.error(f"Invalid data for {domain_type}")
+ return None
+
+ def _process_added_domains(self, added_domain_ids, member):
+ """
+ Processes added domains by bulk creating UserDomainRole instances.
+ """
+ if added_domain_ids:
+ # Bulk create UserDomainRole instances for added domains
+ UserDomainRole.objects.bulk_create(
+ [
+ UserDomainRole(domain_id=domain_id, user=member, role=UserDomainRole.Roles.MANAGER)
+ for domain_id in added_domain_ids
+ ],
+ ignore_conflicts=True, # Avoid duplicate entries
+ )
+
+ def _process_removed_domains(self, removed_domain_ids, member):
+ """
+ Processes removed domains by deleting corresponding UserDomainRole instances.
+ """
+ if removed_domain_ids:
+ # Delete UserDomainRole instances for removed domains
+ UserDomainRole.objects.filter(domain_id__in=removed_domain_ids, user=member).delete()
+
class PortfolioInvitedMemberView(PortfolioMemberPermissionView, View):
@@ -284,7 +371,7 @@ class PortfolioInvitedMemberDeleteView(PortfolioMemberPermission, View):
class PortfolioInvitedMemberEditView(PortfolioMemberEditPermissionView, View):
template_name = "portfolio_member_permissions.html"
- form_class = portfolioForms.BasePortfolioMemberForm
+ form_class = portfolioForms.PortfolioInvitedMemberForm
def get(self, request, pk):
portfolio_invitation = get_object_or_404(PortfolioInvitation, pk=pk)
@@ -348,6 +435,106 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission
},
)
+ def post(self, request, pk):
+ """
+ Handles adding and removing domains for a portfolio invitee.
+ """
+ added_domains = request.POST.get("added_domains")
+ removed_domains = request.POST.get("removed_domains")
+ portfolio_invitation = get_object_or_404(PortfolioInvitation, pk=pk)
+ email = portfolio_invitation.email
+
+ added_domain_ids = self._parse_domain_ids(added_domains, "added domains")
+ if added_domain_ids is None:
+ return redirect(reverse("invitedmember-domains", kwargs={"pk": pk}))
+
+ removed_domain_ids = self._parse_domain_ids(removed_domains, "removed domains")
+ if removed_domain_ids is None:
+ return redirect(reverse("invitedmember-domains", kwargs={"pk": pk}))
+
+ if added_domain_ids or removed_domain_ids:
+ try:
+ self._process_added_domains(added_domain_ids, email)
+ self._process_removed_domains(removed_domain_ids, email)
+ messages.success(request, "The domain assignment changes have been saved.")
+ return redirect(reverse("invitedmember-domains", kwargs={"pk": pk}))
+ except IntegrityError:
+ messages.error(
+ request,
+ "A database error occurred while saving changes. If the issue persists, "
+ f"please contact {DefaultUserValues.HELP_EMAIL}.",
+ )
+ logger.error("A database error occurred while saving changes.")
+ return redirect(reverse("invitedmember-domains-edit", kwargs={"pk": pk}))
+ except Exception as e:
+ messages.error(
+ request,
+ "An unexpected error occurred: {str(e)}. If the issue persists, "
+ f"please contact {DefaultUserValues.HELP_EMAIL}.",
+ )
+ logger.error(f"An unexpected error occurred: {str(e)}.")
+ return redirect(reverse("invitedmember-domains-edit", kwargs={"pk": pk}))
+ else:
+ messages.info(request, "No changes detected.")
+ return redirect(reverse("invitedmember-domains", kwargs={"pk": pk}))
+
+ def _parse_domain_ids(self, domain_data, domain_type):
+ """
+ Parses the domain IDs from the request and handles JSON errors.
+ """
+ try:
+ return json.loads(domain_data) if domain_data else []
+ except json.JSONDecodeError:
+ messages.error(
+ self.request,
+ f"Invalid data for {domain_type}. If the issue persists, "
+ f"please contact {DefaultUserValues.HELP_EMAIL}.",
+ )
+ logger.error(f"Invalid data for {domain_type}.")
+ return None
+
+ def _process_added_domains(self, added_domain_ids, email):
+ """
+ Processes added domain invitations by updating existing invitations
+ or creating new ones.
+ """
+ if not added_domain_ids:
+ return
+
+ # Update existing invitations from CANCELED to INVITED
+ existing_invitations = DomainInvitation.objects.filter(domain_id__in=added_domain_ids, email=email)
+ existing_invitations.update(status=DomainInvitation.DomainInvitationStatus.INVITED)
+
+ # Determine which domains need new invitations
+ existing_domain_ids = existing_invitations.values_list("domain_id", flat=True)
+ new_domain_ids = set(added_domain_ids) - set(existing_domain_ids)
+
+ # Bulk create new invitations
+ DomainInvitation.objects.bulk_create(
+ [
+ DomainInvitation(
+ domain_id=domain_id,
+ email=email,
+ status=DomainInvitation.DomainInvitationStatus.INVITED,
+ )
+ for domain_id in new_domain_ids
+ ]
+ )
+
+ def _process_removed_domains(self, removed_domain_ids, email):
+ """
+ Processes removed domain invitations by updating their status to CANCELED.
+ """
+ if not removed_domain_ids:
+ return
+
+ # Update invitations from INVITED to CANCELED
+ DomainInvitation.objects.filter(
+ domain_id__in=removed_domain_ids,
+ email=email,
+ status=DomainInvitation.DomainInvitationStatus.INVITED,
+ ).update(status=DomainInvitation.DomainInvitationStatus.CANCELED)
+
class PortfolioNoDomainsView(NoPortfolioDomainsPermissionView, View):
"""Some users have access to the underlying portfolio, but not any domains.
@@ -509,34 +696,27 @@ class PortfolioMembersView(PortfolioMembersPermissionView, View):
return render(request, "portfolio_members.html")
-class NewMemberView(PortfolioMembersPermissionView, FormMixin):
+class PortfolioAddMemberView(PortfolioMembersPermissionView, FormMixin):
template_name = "portfolio_members_add_new.html"
- form_class = portfolioForms.NewMemberForm
-
- def get_object(self, queryset=None):
- """Get the portfolio object based on the session."""
- portfolio = self.request.session.get("portfolio")
- if portfolio is None:
- raise Http404("No organization found for this user")
- return portfolio
-
- def get_form_kwargs(self):
- """Include the instance in the form kwargs."""
- kwargs = super().get_form_kwargs()
- kwargs["instance"] = self.get_object()
- return kwargs
+ form_class = portfolioForms.PortfolioNewMemberForm
def get(self, request, *args, **kwargs):
"""Handle GET requests to display the form."""
- self.object = self.get_object()
+ self.object = None # No existing PortfolioInvitation instance
form = self.get_form()
return self.render_to_response(self.get_context_data(form=form))
def post(self, request, *args, **kwargs):
"""Handle POST requests to process form submission."""
- self.object = self.get_object()
- form = self.get_form()
+ self.object = None # For a new invitation, there's no existing model instance
+
+ # portfolio not submitted with form, so override the value
+ data = request.POST.copy()
+ if not data.get("portfolio"):
+ data["portfolio"] = self.request.session.get("portfolio").id
+ # Pass the modified data to the form
+ form = portfolioForms.PortfolioNewMemberForm(data)
if form.is_valid():
return self.form_valid(form)
@@ -553,7 +733,7 @@ class NewMemberView(PortfolioMembersPermissionView, FormMixin):
return super().form_invalid(form) # Handle non-AJAX requests normally
def form_valid(self, form):
-
+ super().form_valid(form)
if self.is_ajax():
return JsonResponse({"is_valid": True}) # Return a JSON response
else:
@@ -563,108 +743,42 @@ class NewMemberView(PortfolioMembersPermissionView, FormMixin):
"""Redirect to members table."""
return reverse("members")
- def _send_portfolio_invitation_email(self, email: str, requestor: User, add_success=True):
- """Performs the sending of the member invitation email
- email: string- email to send to
- add_success: bool- default True indicates:
- adding a success message to the view if the email sending succeeds
-
- raises EmailSendingError
- """
-
- # Set a default email address to send to for staff
- requestor_email = settings.DEFAULT_FROM_EMAIL
-
- # Check if the email requestor has a valid email address
- if not requestor.is_staff and requestor.email is not None and requestor.email.strip() != "":
- requestor_email = requestor.email
- elif not requestor.is_staff:
- messages.error(self.request, "Can't send invitation email. No email is associated with your account.")
- logger.error(
- f"Can't send email to '{email}' on domain '{self.object}'."
- f"No email exists for the requestor '{requestor.username}'.",
- exc_info=True,
- )
- return None
-
- # Check to see if an invite has already been sent
- try:
- invite = PortfolioInvitation.objects.get(email=email, portfolio=self.object)
- if invite: # We have an existin invite
- # check if the invite has already been accepted
- if invite.status == PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED:
- add_success = False
- messages.warning(
- self.request,
- f"{email} is already a manager for this portfolio.",
- )
- else:
- add_success = False
- # it has been sent but not accepted
- messages.warning(self.request, f"{email} has already been invited to this portfolio")
- return
- except Exception as err:
- logger.error(f"_send_portfolio_invitation_email() => An error occured: {err}")
-
- try:
- logger.debug("requestor email: " + requestor_email)
-
- # send_templated_email(
- # "emails/portfolio_invitation.txt",
- # "emails/portfolio_invitation_subject.txt",
- # to_address=email,
- # context={
- # "portfolio": self.object,
- # "requestor_email": requestor_email,
- # },
- # )
- except EmailSendingError as exc:
- logger.warn(
- "Could not sent email invitation to %s for domain %s",
- email,
- self.object,
- exc_info=True,
- )
- raise EmailSendingError("Could not send email invitation.") from exc
- else:
- if add_success:
- messages.success(self.request, f"{email} has been invited.")
-
- def _make_invitation(self, email_address: str, requestor: User, add_success=True):
- """Make a Member invitation for this email and redirect with a message."""
- try:
- self._send_portfolio_invitation_email(email=email_address, requestor=requestor, add_success=add_success)
- except EmailSendingError:
- logger.warn(
- "Could not send email invitation (EmailSendingError)",
- self.object,
- exc_info=True,
- )
- messages.warning(self.request, "Could not send email invitation.")
- except Exception:
- logger.warn(
- "Could not send email invitation (Other Exception)",
- self.object,
- exc_info=True,
- )
- messages.warning(self.request, "Could not send email invitation.")
- else:
- # (NOTE: only create a MemberInvitation if the e-mail sends correctly)
- PortfolioInvitation.objects.get_or_create(email=email_address, portfolio=self.object)
- return redirect(self.get_success_url())
-
def submit_new_member(self, form):
- """Add the specified user as a member
- for this portfolio.
- Throws EmailSendingError."""
+ """Add the specified user as a member for this portfolio."""
requested_email = form.cleaned_data["email"]
requestor = self.request.user
+ portfolio = form.cleaned_data["portfolio"]
requested_user = User.objects.filter(email=requested_email).first()
- permission_exists = UserPortfolioPermission.objects.filter(user=requested_user, portfolio=self.object).exists()
- if not requested_user or not permission_exists:
- return self._make_invitation(requested_email, requestor)
- else:
- if permission_exists:
- messages.warning(self.request, "User is already a member of this portfolio.")
+ permission_exists = UserPortfolioPermission.objects.filter(user=requested_user, portfolio=portfolio).exists()
+ try:
+ if not requested_user or not permission_exists:
+ send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=portfolio)
+ form.save()
+ messages.success(self.request, f"{requested_email} has been invited.")
+ else:
+ if permission_exists:
+ messages.warning(self.request, "User is already a member of this portfolio.")
+ except Exception as e:
+ self._handle_exceptions(e, portfolio, requested_email)
return redirect(self.get_success_url())
+
+ def _handle_exceptions(self, exception, portfolio, email):
+ """Handle exceptions raised during the process."""
+ if isinstance(exception, EmailSendingError):
+ logger.warning(
+ "Could not sent email invitation to %s for portfolio %s (EmailSendingError)",
+ email,
+ portfolio,
+ exc_info=True,
+ )
+ messages.warning(self.request, "Could not send email invitation.")
+ elif isinstance(exception, MissingEmailError):
+ messages.error(self.request, str(exception))
+ logger.error(
+ f"Can't send email to '{email}' for portfolio '{portfolio}'. No email exists for the requestor.",
+ exc_info=True,
+ )
+ else:
+ logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
+ messages.warning(self.request, "Could not send email invitation.")
diff --git a/src/registrar/views/report_views.py b/src/registrar/views/report_views.py
index 1b1798d69..694d1e205 100644
--- a/src/registrar/views/report_views.py
+++ b/src/registrar/views/report_views.py
@@ -203,7 +203,7 @@ class ExportDataTypeRequests(View):
def get(self, request, *args, **kwargs):
response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = 'attachment; filename="domain-requests.csv"'
- csv_export.DomainRequestsDataType.exporting_dr_data_to_csv(response, request=request)
+ csv_export.DomainRequestDataType.export_data_to_csv(response, request=request)
return response