Merge remote-tracking branch 'origin/main' into nl/3275-slowness-admin-tables

This commit is contained in:
CocoByte 2025-01-15 15:18:58 -07:00
commit f2e284c71e
No known key found for this signature in database
GPG key ID: BBFAA2526384C97F
15 changed files with 2021 additions and 572 deletions

View file

@ -14,6 +14,7 @@ from django.db.models import (
from django.db.models.functions import Concat, Coalesce
from django.http import HttpResponseRedirect
from registrar.models.federal_agency import FederalAgency
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.utility.admin_helpers import (
AutocompleteSelectWithPlaceholder,
get_action_needed_reason_default_email,
@ -27,8 +28,12 @@ from django.shortcuts import redirect
from django_fsm import get_available_FIELD_transitions, FSMField
from registrar.models import DomainInformation, Portfolio, UserPortfolioPermission, DomainInvitation
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from registrar.utility.email import EmailSendingError
from registrar.utility.email_invitations import send_portfolio_invitation_email
from registrar.utility.email_invitations import send_domain_invitation_email, send_portfolio_invitation_email
from registrar.views.utility.invitation_helper import (
get_org_membership,
get_requested_user,
handle_invitation_exceptions,
)
from waffle.decorators import flag_is_active
from django.contrib import admin, messages
from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
@ -41,7 +46,7 @@ from waffle.admin import FlagAdmin
from waffle.models import Sample, Switch
from registrar.models import Contact, Domain, DomainRequest, DraftDomain, User, Website, SeniorOfficial
from registrar.utility.constants import BranchChoices
from registrar.utility.errors import FSMDomainRequestError, FSMErrorCodes, MissingEmailError
from registrar.utility.errors import FSMDomainRequestError, FSMErrorCodes
from registrar.utility.waffle import flag_is_active_for_user
from registrar.views.utility.mixins import OrderableFieldsMixin
from django.contrib.admin.views.main import ORDER_VAR
@ -1389,7 +1394,78 @@ class UserDomainRoleAdmin(ListHeaderAdmin, ImportExportModelAdmin):
return super().changeform_view(request, object_id, form_url, extra_context=extra_context)
class DomainInvitationAdmin(ListHeaderAdmin):
class BaseInvitationAdmin(ListHeaderAdmin):
"""Base class for admin classes which will customize save_model and send email invitations
on model adds, and require custom handling of forms and form errors."""
def response_add(self, request, obj, post_url_continue=None):
"""
Override response_add to handle rendering when exceptions are raised during add model.
Normal flow on successful save_model on add is to redirect to changelist_view.
If there are errors, flow is modified to instead render change form.
"""
# store current messages from request so that they are preserved throughout the method
storage = get_messages(request)
# Check if there are any error or warning messages in the `messages` framework
has_errors = any(message.level_tag in ["error", "warning"] for message in storage)
if has_errors:
# Re-render the change form if there are errors or warnings
# Prepare context for rendering the change form
# Get the model form
ModelForm = self.get_form(request, obj=obj)
form = ModelForm(instance=obj)
# Create an AdminForm instance
admin_form = AdminForm(
form,
list(self.get_fieldsets(request, obj)),
self.get_prepopulated_fields(request, obj),
self.get_readonly_fields(request, obj),
model_admin=self,
)
media = self.media + form.media
opts = obj._meta
change_form_context = {
**self.admin_site.each_context(request), # Add admin context
"title": f"Add {opts.verbose_name}",
"opts": opts,
"original": obj,
"save_as": self.save_as,
"has_change_permission": self.has_change_permission(request, obj),
"add": True, # Indicate this is an "Add" form
"change": False, # Indicate this is not a "Change" form
"is_popup": False,
"inline_admin_formsets": [],
"save_on_top": self.save_on_top,
"show_delete": self.has_delete_permission(request, obj),
"obj": obj,
"adminform": admin_form, # Pass the AdminForm instance
"media": media,
"errors": None,
}
return self.render_change_form(
request,
context=change_form_context,
add=True,
change=False,
obj=obj,
)
response = super().response_add(request, obj, post_url_continue)
# Re-add all messages from storage after `super().response_add`
# as super().response_add resets the success messages in request
for message in storage:
messages.add_message(request, message.level, message.message)
return response
class DomainInvitationAdmin(BaseInvitationAdmin):
"""Custom domain invitation admin class."""
class Meta:
@ -1442,14 +1518,60 @@ class DomainInvitationAdmin(ListHeaderAdmin):
which will be successful if a single User exists for that email; otherwise, will
just continue to create the invitation.
"""
if not change and User.objects.filter(email=obj.email).count() == 1:
if not change:
domain = obj.domain
domain_org = getattr(domain.domain_info, "portfolio", None)
requested_email = obj.email
# Look up a user with that email
requested_user = get_requested_user(requested_email)
requestor = request.user
member_of_a_different_org, member_of_this_org = get_org_membership(
domain_org, requested_email, requested_user
)
try:
if (
flag_is_active(request, "organization_feature")
and not flag_is_active(request, "multiple_portfolios")
and domain_org is not None
and not member_of_this_org
and not member_of_a_different_org
):
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=domain_org)
portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create(
email=requested_email,
portfolio=domain_org,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
# if user exists for email, immediately retrieve portfolio invitation upon creation
if requested_user is not None:
portfolio_invitation.retrieve()
portfolio_invitation.save()
messages.success(request, f"{requested_email} has been invited to the organization: {domain_org}")
send_domain_invitation_email(
email=requested_email,
requestor=requestor,
domains=domain,
is_member_of_different_org=member_of_a_different_org,
requested_user=requested_user,
)
if requested_user is not None:
# Domain Invitation creation for an existing User
obj.retrieve()
# Call the parent save method to save the object
super().save_model(request, obj, form, change)
messages.success(request, f"{requested_email} has been invited to the domain: {domain}")
except Exception as e:
handle_invitation_exceptions(request, e, requested_email)
return
else:
# Call the parent save method to save the object
super().save_model(request, obj, form, change)
class PortfolioInvitationAdmin(ListHeaderAdmin):
class PortfolioInvitationAdmin(BaseInvitationAdmin):
"""Custom portfolio invitation admin class."""
form = PortfolioInvitationAdminForm
@ -1472,7 +1594,7 @@ class PortfolioInvitationAdmin(ListHeaderAdmin):
# Search
search_fields = [
"email",
"portfolio__name",
"portfolio__organization_name",
]
# Filters
@ -1510,6 +1632,8 @@ class PortfolioInvitationAdmin(ListHeaderAdmin):
portfolio = obj.portfolio
requested_email = obj.email
requestor = request.user
# Look up a user with that email
requested_user = get_requested_user(requested_email)
permission_exists = UserPortfolioPermission.objects.filter(
user__email=requested_email, portfolio=portfolio, user__email__isnull=False
@ -1518,98 +1642,19 @@ class PortfolioInvitationAdmin(ListHeaderAdmin):
if not permission_exists:
# if permission does not exist for a user with requested_email, send email
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=portfolio)
# if user exists for email, immediately retrieve portfolio invitation upon creation
if requested_user is not None:
obj.retrieve()
messages.success(request, f"{requested_email} has been invited.")
else:
messages.warning(request, "User is already a member of this portfolio.")
except Exception as e:
# when exception is raised, handle and do not save the model
self._handle_exceptions(e, request, obj)
handle_invitation_exceptions(request, e, requested_email)
return
# Call the parent save method to save the object
super().save_model(request, obj, form, change)
def _handle_exceptions(self, exception, request, obj):
"""Handle exceptions raised during the process.
Log warnings / errors, and message errors to the user.
"""
if isinstance(exception, EmailSendingError):
logger.warning(
"Could not sent email invitation to %s for portfolio %s (EmailSendingError)",
obj.email,
obj.portfolio,
exc_info=True,
)
messages.error(request, "Could not send email invitation. Portfolio invitation not saved.")
elif isinstance(exception, MissingEmailError):
messages.error(request, str(exception))
logger.error(
f"Can't send email to '{obj.email}' for portfolio '{obj.portfolio}'. "
f"No email exists for the requestor.",
exc_info=True,
)
else:
logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
messages.error(request, "Could not send email invitation. Portfolio invitation not saved.")
def response_add(self, request, obj, post_url_continue=None):
"""
Override response_add to handle rendering when exceptions are raised during add model.
Normal flow on successful save_model on add is to redirect to changelist_view.
If there are errors, flow is modified to instead render change form.
"""
# Check if there are any error or warning messages in the `messages` framework
storage = get_messages(request)
has_errors = any(message.level_tag in ["error", "warning"] for message in storage)
if has_errors:
# Re-render the change form if there are errors or warnings
# Prepare context for rendering the change form
# Get the model form
ModelForm = self.get_form(request, obj=obj)
form = ModelForm(instance=obj)
# Create an AdminForm instance
admin_form = AdminForm(
form,
list(self.get_fieldsets(request, obj)),
self.get_prepopulated_fields(request, obj),
self.get_readonly_fields(request, obj),
model_admin=self,
)
media = self.media + form.media
opts = obj._meta
change_form_context = {
**self.admin_site.each_context(request), # Add admin context
"title": f"Add {opts.verbose_name}",
"opts": opts,
"original": obj,
"save_as": self.save_as,
"has_change_permission": self.has_change_permission(request, obj),
"add": True, # Indicate this is an "Add" form
"change": False, # Indicate this is not a "Change" form
"is_popup": False,
"inline_admin_formsets": [],
"save_on_top": self.save_on_top,
"show_delete": self.has_delete_permission(request, obj),
"obj": obj,
"adminform": admin_form, # Pass the AdminForm instance
"media": media,
"errors": None,
}
return self.render_change_form(
request,
context=change_form_context,
add=True,
change=False,
obj=obj,
)
return super().response_add(request, obj, post_url_continue)
class DomainInformationResource(resources.ModelResource):
"""defines how each field in the referenced model should be mapped to the corresponding fields in the

View file

@ -66,9 +66,9 @@
text-align: center;
font-size: inherit; //inherit tooltip fontsize of .93rem
max-width: fit-content;
display: block;
@include at-media('desktop') {
width: 70vw;
}
display: block;
}
}

View file

@ -153,7 +153,9 @@ def validate_user_portfolio_permission(user_portfolio_permission):
"Based on current waffle flag settings, users cannot be assigned to multiple portfolios."
)
existing_invitations = PortfolioInvitation.objects.filter(email=user_portfolio_permission.user.email)
existing_invitations = PortfolioInvitation.objects.exclude(
portfolio=user_portfolio_permission.portfolio
).filter(email=user_portfolio_permission.user.email)
if existing_invitations.exists():
raise ValidationError(
"This user is already assigned to a portfolio invitation. "

View file

@ -1,36 +1,40 @@
{% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #}
Hi.
Hi,{% if requested_user and requested_user.first_name %} {{ requested_user.first_name }}.{% endif %}
{{ requestor_email }} has added you as a manager on {{ domain.name }}.
You can manage this domain on the .gov registrar <https://manage.get.gov>.
{{ requestor_email }} has invited you to manage:
{% for domain in domains %}{{ domain.name }}
{% endfor %}
To manage domain information, visit the .gov registrar <https://manage.get.gov>.
----------------------------------------------------------------
{% if not requested_user %}
YOU NEED A LOGIN.GOV ACCOUNT
Youll need a Login.gov account to manage your .gov domain. Login.gov provides
a simple and secure process for signing in to many government services with one
account.
Youll need a Login.gov account to access the .gov registrar. That account needs to be
associated with the following email address: {{ invitee_email_address }}
If you dont already have one, follow these steps to create your
Login.gov account <https://login.gov/help/get-started/create-your-account/>.
Login.gov provides a simple and secure process for signing in to many government
services with one account. If you dont already have one, follow these steps to create
your Login.gov account <https://login.gov/help/get-started/create-your-account/>.
{% endif %}
DOMAIN MANAGEMENT
As a .gov domain manager, you can add or update information about your domain.
Youll also serve as a contact for your .gov domain. Please keep your contact
As a .gov domain manager, you can add or update information like name servers. Youll
also serve as a contact for the domains you manage. Please keep your contact
information updated.
Learn more about domain management <https://get.gov/help/domain-management>.
SOMETHING WRONG?
If youre not affiliated with {{ domain.name }} or think you received this
message in error, reply to this email.
If youre not affiliated with the .gov domains mentioned in this invitation or think you
received this message in error, reply to this email.
THANK YOU
.Gov helps the public identify official, trusted information. Thank you for using a .gov domain.
.Gov helps the public identify official, trusted information. Thank you for using a .gov
domain.
----------------------------------------------------------------
@ -38,5 +42,6 @@ The .gov team
Contact us: <https://get.gov/contact/>
Learn about .gov <https://get.gov>
The .gov registry is a part of the Cybersecurity and Infrastructure Security Agency (CISA) <https://cisa.gov/>
The .gov registry is a part of the Cybersecurity and Infrastructure Security Agency
(CISA) <https://cisa.gov/>
{% endautoescape %}

View file

@ -1 +1 @@
Youve been added to a .gov domain
You've been invited to manage {% if domains|length > 1 %}.gov domains{% else %}{{ domains.0.name }}{% endif %}

File diff suppressed because it is too large Load diff

View file

@ -28,6 +28,7 @@ from registrar.models.verified_by_staff import VerifiedByStaff # type: ignore
from .common import (
MockSESClient,
completed_domain_request,
create_superuser,
create_test_user,
)
from waffle.testutils import override_flag
@ -155,6 +156,7 @@ class TestPortfolioInvitations(TestCase):
roles=[self.portfolio_role_base, self.portfolio_role_admin],
additional_permissions=[self.portfolio_permission_1, self.portfolio_permission_2],
)
self.superuser = create_superuser()
def tearDown(self):
super().tearDown()
@ -294,10 +296,158 @@ class TestPortfolioInvitations(TestCase):
# Verify
self.assertEquals(self.invitation.get_portfolio_permissions(), perm_list)
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=False)
def test_clean_multiple_portfolios_inactive(self):
"""Tests that users cannot have multiple portfolios or invitations when flag is inactive"""
# Create the first portfolio permission
UserPortfolioPermission.objects.create(
user=self.superuser, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
)
# Test a second portfolio permission object (should fail)
second_portfolio = Portfolio.objects.create(organization_name="Second Portfolio", creator=self.superuser)
second_permission = UserPortfolioPermission(
user=self.superuser, portfolio=second_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
)
with self.assertRaises(ValidationError) as err:
second_permission.clean()
self.assertIn("users cannot be assigned to multiple portfolios", str(err.exception))
# Test that adding a new portfolio invitation also fails
third_portfolio = Portfolio.objects.create(organization_name="Third Portfolio", creator=self.superuser)
invitation = PortfolioInvitation(
email=self.superuser.email, portfolio=third_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
)
with self.assertRaises(ValidationError) as err:
invitation.clean()
self.assertIn("users cannot be assigned to multiple portfolios", str(err.exception))
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=True)
def test_clean_multiple_portfolios_active(self):
"""Tests that users can have multiple portfolios and invitations when flag is active"""
# Create first portfolio permission
UserPortfolioPermission.objects.create(
user=self.superuser, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
)
# Second portfolio permission should succeed
second_portfolio = Portfolio.objects.create(organization_name="Second Portfolio", creator=self.superuser)
second_permission = UserPortfolioPermission(
user=self.superuser, portfolio=second_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
)
second_permission.clean()
second_permission.save()
# Verify both permissions exist
user_permissions = UserPortfolioPermission.objects.filter(user=self.superuser)
self.assertEqual(user_permissions.count(), 2)
# Portfolio invitation should also succeed
third_portfolio = Portfolio.objects.create(organization_name="Third Portfolio", creator=self.superuser)
invitation = PortfolioInvitation(
email=self.superuser.email, portfolio=third_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
)
invitation.clean()
invitation.save()
# Verify invitation exists
self.assertTrue(
PortfolioInvitation.objects.filter(
email=self.superuser.email,
portfolio=third_portfolio,
).exists()
)
@less_console_noise_decorator
def test_clean_portfolio_invitation(self):
"""Tests validation of portfolio invitation permissions"""
# Test validation fails when portfolio missing but permissions present
invitation = PortfolioInvitation(email="test@example.com", roles=["organization_admin"], portfolio=None)
with self.assertRaises(ValidationError) as err:
invitation.clean()
self.assertEqual(
str(err.exception),
"When portfolio roles or additional permissions are assigned, portfolio is required.",
)
# Test validation fails when portfolio present but no permissions
invitation = PortfolioInvitation(email="test@example.com", roles=None, portfolio=self.portfolio)
with self.assertRaises(ValidationError) as err:
invitation.clean()
self.assertEqual(
str(err.exception),
"When portfolio is assigned, portfolio roles or additional permissions are required.",
)
# Test validation fails with forbidden permissions
forbidden_member_roles = UserPortfolioPermission.FORBIDDEN_PORTFOLIO_ROLE_PERMISSIONS.get(
UserPortfolioRoleChoices.ORGANIZATION_MEMBER
)
invitation = PortfolioInvitation(
email="test@example.com",
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
additional_permissions=forbidden_member_roles,
portfolio=self.portfolio,
)
with self.assertRaises(ValidationError) as err:
invitation.clean()
self.assertEqual(
str(err.exception),
"These permissions cannot be assigned to Member: "
"<View all domains and domain reports, Create and edit members, View members>",
)
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=False)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_off_and_duplicate_permission(self):
"""MISSING TEST: Test validation of multiple_portfolios flag.
Scenario 1: Flag is inactive, and the user has existing portfolio permissions
NOTE: Refer to the same test under TestUserPortfolioPermission"""
pass
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=False)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_off_and_existing_invitation(self):
"""MISSING TEST: Test validation of multiple_portfolios flag.
Scenario 2: Flag is inactive, and the user has existing portfolio invitation to another portfolio
NOTE: Refer to the same test under TestUserPortfolioPermission"""
pass
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=True)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_on_and_duplicate_permission(self):
"""MISSING TEST: Test validation of multiple_portfolios flag.
Scenario 3: Flag is active, and the user has existing portfolio invitation
NOTE: Refer to the same test under TestUserPortfolioPermission"""
pass
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=True)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_on_and_existing_invitation(self):
"""MISSING TEST: Test validation of multiple_portfolios flag.
Scenario 4: Flag is active, and the user has existing portfolio invitation to another portfolio
NOTE: Refer to the same test under TestUserPortfolioPermission"""
pass
class TestUserPortfolioPermission(TestCase):
@less_console_noise_decorator
def setUp(self):
self.superuser = create_superuser()
self.portfolio = Portfolio.objects.create(organization_name="Test Portfolio", creator=self.superuser)
self.user, _ = User.objects.get_or_create(email="mayor@igorville.gov")
self.user2, _ = User.objects.get_or_create(email="user2@igorville.gov", username="user2")
super().setUp()
@ -311,6 +461,7 @@ class TestUserPortfolioPermission(TestCase):
Portfolio.objects.all().delete()
User.objects.all().delete()
UserDomainRole.objects.all().delete()
PortfolioInvitation.objects.all().delete()
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=True)
@ -427,6 +578,178 @@ class TestUserPortfolioPermission(TestCase):
# Assert
self.assertEqual(portfolio_permission.get_managed_domains_count(), 1)
@less_console_noise_decorator
def test_clean_user_portfolio_permission(self):
"""Tests validation of user portfolio permission"""
# Test validation fails when portfolio missing but permissions are present
permission = UserPortfolioPermission(user=self.superuser, roles=["organization_admin"], portfolio=None)
with self.assertRaises(ValidationError) as err:
permission.clean()
self.assertEqual(
str(err.exception),
"When portfolio roles or additional permissions are assigned, portfolio is required.",
)
# Test validation fails when portfolio present but no permissions are present
permission = UserPortfolioPermission(user=self.superuser, roles=None, portfolio=self.portfolio)
with self.assertRaises(ValidationError) as err:
permission.clean()
self.assertEqual(
str(err.exception),
"When portfolio is assigned, portfolio roles or additional permissions are required.",
)
# Test validation fails with forbidden permissions for single role
forbidden_member_roles = UserPortfolioPermission.FORBIDDEN_PORTFOLIO_ROLE_PERMISSIONS.get(
UserPortfolioRoleChoices.ORGANIZATION_MEMBER
)
permission = UserPortfolioPermission(
user=self.superuser,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
additional_permissions=forbidden_member_roles,
portfolio=self.portfolio,
)
with self.assertRaises(ValidationError) as err:
permission.clean()
self.assertEqual(
str(err.exception),
"These permissions cannot be assigned to Member: "
"<Create and edit members, View all domains and domain reports, View members>",
)
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=False)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_off_and_duplicate_permission(self):
"""Test validation of multiple_portfolios flag.
Scenario 1: Flag is inactive, and the user has existing portfolio permissions"""
# existing permission
UserPortfolioPermission.objects.create(
user=self.superuser,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
portfolio=self.portfolio,
)
permission = UserPortfolioPermission(
user=self.superuser,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
portfolio=self.portfolio,
)
with self.assertRaises(ValidationError) as err:
permission.clean()
self.assertEqual(
str(err.exception.messages[0]),
"This user is already assigned to a portfolio. "
"Based on current waffle flag settings, users cannot be assigned to multiple portfolios.",
)
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=False)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_off_and_existing_invitation(self):
"""Test validation of multiple_portfolios flag.
Scenario 2: Flag is inactive, and the user has existing portfolio invitation to another portfolio"""
portfolio2 = Portfolio.objects.create(creator=self.superuser, organization_name="Joey go away")
PortfolioInvitation.objects.create(
email=self.superuser.email, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], portfolio=portfolio2
)
permission = UserPortfolioPermission(
user=self.superuser,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
portfolio=self.portfolio,
)
with self.assertRaises(ValidationError) as err:
permission.clean()
self.assertEqual(
str(err.exception.messages[0]),
"This user is already assigned to a portfolio invitation. "
"Based on current waffle flag settings, users cannot be assigned to multiple portfolios.",
)
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=True)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_on_and_duplicate_permission(self):
"""Test validation of multiple_portfolios flag.
Scenario 3: Flag is active, and the user has existing portfolio invitation"""
# existing permission
UserPortfolioPermission.objects.create(
user=self.superuser,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
portfolio=self.portfolio,
)
permission = UserPortfolioPermission(
user=self.superuser,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
portfolio=self.portfolio,
)
# Should not raise any exceptions
try:
permission.clean()
except ValidationError:
self.fail("ValidationError was raised unexpectedly when flag is active.")
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=True)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_on_and_existing_invitation(self):
"""Test validation of multiple_portfolios flag.
Scenario 4: Flag is active, and the user has existing portfolio invitation to another portfolio"""
portfolio2 = Portfolio.objects.create(creator=self.superuser, organization_name="Joey go away")
PortfolioInvitation.objects.create(
email=self.superuser.email, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], portfolio=portfolio2
)
permission = UserPortfolioPermission(
user=self.superuser,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
portfolio=self.portfolio,
)
# Should not raise any exceptions
try:
permission.clean()
except ValidationError:
self.fail("ValidationError was raised unexpectedly when flag is active.")
@less_console_noise_decorator
def test_get_forbidden_permissions_with_multiple_roles(self):
"""Tests that forbidden permissions are properly handled when a user has multiple roles"""
# Get forbidden permissions for member role
member_forbidden = UserPortfolioPermission.FORBIDDEN_PORTFOLIO_ROLE_PERMISSIONS.get(
UserPortfolioRoleChoices.ORGANIZATION_MEMBER
)
# Test with both admin and member roles
roles = [UserPortfolioRoleChoices.ORGANIZATION_ADMIN, UserPortfolioRoleChoices.ORGANIZATION_MEMBER]
# These permissions would be forbidden for member alone, but should be allowed
# when combined with admin role
permissions = UserPortfolioPermission.get_forbidden_permissions(
roles=roles, additional_permissions=member_forbidden
)
# Should return empty set since no permissions are commonly forbidden between admin and member
self.assertEqual(permissions, set())
# Verify the same permissions are forbidden when only member role is present
member_only_permissions = UserPortfolioPermission.get_forbidden_permissions(
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER], additional_permissions=member_forbidden
)
# Should return the forbidden permissions for member role
self.assertEqual(member_only_permissions, set(member_forbidden))
class TestUser(TestCase):
"""Test actions that occur on user login,

View file

@ -900,6 +900,7 @@ class MemberExportTest(MockDbForIndividualTests, MockEppLib):
# spaces and leading/trailing whitespace
csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip()
expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip()
self.maxDiff = None
self.assertEqual(csv_content, expected_content)

View file

@ -720,6 +720,8 @@ class TestDomainManagers(TestDomainOverview):
def tearDown(self):
"""Ensure that the user has its original permissions"""
PortfolioInvitation.objects.all().delete()
UserPortfolioPermission.objects.all().delete()
User.objects.exclude(id=self.user.id).delete()
super().tearDown()
@less_console_noise_decorator
@ -807,21 +809,76 @@ class TestDomainManagers(TestDomainOverview):
call_args = mock_send_domain_email.call_args.kwargs
self.assertEqual(call_args["email"], "mayor@igorville.gov")
self.assertEqual(call_args["requestor"], self.user)
self.assertEqual(call_args["domain"], self.domain)
self.assertEqual(call_args["domains"], self.domain)
self.assertIsNone(call_args.get("is_member_of_different_org"))
# Assert that the PortfolioInvitation is created
# Assert that the PortfolioInvitation is created and retrieved
portfolio_invitation = PortfolioInvitation.objects.filter(
email="mayor@igorville.gov", portfolio=self.portfolio
).first()
self.assertIsNotNone(portfolio_invitation, "Portfolio invitation should be created.")
self.assertEqual(portfolio_invitation.email, "mayor@igorville.gov")
self.assertEqual(portfolio_invitation.portfolio, self.portfolio)
self.assertEqual(portfolio_invitation.status, PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED)
# Assert that the UserPortfolioPermission is created
user_portfolio_permission = UserPortfolioPermission.objects.filter(
user=self.user, portfolio=self.portfolio
).first()
self.assertIsNotNone(user_portfolio_permission, "User portfolio permission should be created")
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_page = success_result.follow()
self.assertContains(success_page, "mayor@igorville.gov")
@boto3_mocking.patching
@override_flag("organization_feature", active=True)
@less_console_noise_decorator
@patch("registrar.views.domain.send_portfolio_invitation_email")
@patch("registrar.views.domain.send_domain_invitation_email")
def test_domain_user_add_form_sends_portfolio_invitation_to_new_email(
self, mock_send_domain_email, mock_send_portfolio_email
):
"""Adding an email not associated with a user works and sends portfolio invitation."""
add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
add_page.form["email"] = "notauser@igorville.gov"
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_result = add_page.form.submit()
self.assertEqual(success_result.status_code, 302)
self.assertEqual(
success_result["Location"],
reverse("domain-users", kwargs={"pk": self.domain.id}),
)
# Verify that the invitation emails were sent
mock_send_portfolio_email.assert_called_once_with(
email="notauser@igorville.gov", requestor=self.user, portfolio=self.portfolio
)
mock_send_domain_email.assert_called_once()
call_args = mock_send_domain_email.call_args.kwargs
self.assertEqual(call_args["email"], "notauser@igorville.gov")
self.assertEqual(call_args["requestor"], self.user)
self.assertEqual(call_args["domains"], self.domain)
self.assertIsNone(call_args.get("is_member_of_different_org"))
# Assert that the PortfolioInvitation is created
portfolio_invitation = PortfolioInvitation.objects.filter(
email="notauser@igorville.gov", portfolio=self.portfolio
).first()
self.assertIsNotNone(portfolio_invitation, "Portfolio invitation should be created.")
self.assertEqual(portfolio_invitation.email, "notauser@igorville.gov")
self.assertEqual(portfolio_invitation.portfolio, self.portfolio)
self.assertEqual(portfolio_invitation.status, PortfolioInvitation.PortfolioInvitationStatus.INVITED)
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_page = success_result.follow()
self.assertContains(success_page, "notauser@igorville.gov")
@boto3_mocking.patching
@override_flag("organization_feature", active=True)
@less_console_noise_decorator
@ -857,7 +914,7 @@ class TestDomainManagers(TestDomainOverview):
call_args = mock_send_domain_email.call_args.kwargs
self.assertEqual(call_args["email"], "mayor@igorville.gov")
self.assertEqual(call_args["requestor"], self.user)
self.assertEqual(call_args["domain"], self.domain)
self.assertEqual(call_args["domains"], self.domain)
self.assertIsNone(call_args.get("is_member_of_different_org"))
# Assert that no PortfolioInvitation is created
@ -915,7 +972,7 @@ class TestDomainManagers(TestDomainOverview):
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_page = success_result.follow()
self.assertContains(success_page, "Could not send email invitation.")
self.assertContains(success_page, "Failed to send email.")
@boto3_mocking.patching
@less_console_noise_decorator

View file

@ -2106,25 +2106,75 @@ class TestPortfolioInvitedMemberDomainsView(TestWithUser, WebTest):
self.assertEqual(response.status_code, 404)
class TestPortfolioMemberDomainsEditView(TestPortfolioMemberDomainsView):
class TestPortfolioMemberDomainsEditView(TestWithUser, WebTest):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.url = reverse("member-domains-edit", kwargs={"pk": cls.portfolio_permission.pk})
# Create Portfolio
cls.portfolio = Portfolio.objects.create(creator=cls.user, organization_name="Test Portfolio")
# Create domains for testing
cls.domain1 = Domain.objects.create(name="1.gov")
cls.domain2 = Domain.objects.create(name="2.gov")
cls.domain3 = Domain.objects.create(name="3.gov")
@classmethod
def tearDownClass(cls):
super().tearDownClass()
Portfolio.objects.all().delete()
User.objects.all().delete()
Domain.objects.all().delete()
def setUp(self):
super().setUp()
names = ["1.gov", "2.gov", "3.gov"]
Domain.objects.bulk_create([Domain(name=name) for name in names])
# Create test member
self.user_member = User.objects.create(
username="test_member",
first_name="Second",
last_name="User",
email="second@example.com",
phone="8003112345",
title="Member",
)
# Create test user with no perms
self.user_no_perms = User.objects.create(
username="test_user_no_perms",
first_name="No",
last_name="Permissions",
email="user_no_perms@example.com",
phone="8003112345",
title="No Permissions",
)
# Assign permissions to the user making requests
self.portfolio_permission = UserPortfolioPermission.objects.create(
user=self.user,
portfolio=self.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_MEMBERS,
UserPortfolioPermissionChoices.EDIT_MEMBERS,
],
)
# Assign permissions to test member
self.permission = UserPortfolioPermission.objects.create(
user=self.user_member,
portfolio=self.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_MEMBERS,
UserPortfolioPermissionChoices.EDIT_MEMBERS,
],
)
# Create url to be used in all tests
self.url = reverse("member-domains-edit", kwargs={"pk": self.portfolio_permission.pk})
def tearDown(self):
super().tearDown()
UserDomainRole.objects.all().delete()
Domain.objects.all().delete()
DomainInvitation.objects.all().delete()
UserPortfolioPermission.objects.all().delete()
PortfolioInvitation.objects.all().delete()
Portfolio.objects.exclude(id=self.portfolio.id).delete()
User.objects.exclude(id=self.user.id).delete()
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@ -2180,12 +2230,13 @@ class TestPortfolioMemberDomainsEditView(TestPortfolioMemberDomainsView):
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_post_with_valid_added_domains(self):
@patch("registrar.views.portfolios.send_domain_invitation_email")
def test_post_with_valid_added_domains(self, mock_send_domain_email):
"""Test that domains can be successfully added."""
self.client.force_login(self.user)
data = {
"added_domains": json.dumps([1, 2, 3]), # Mock domain IDs
"added_domains": json.dumps([self.domain1.id, self.domain2.id, self.domain3.id]), # Mock domain IDs
}
response = self.client.post(self.url, data)
@ -2198,31 +2249,43 @@ class TestPortfolioMemberDomainsEditView(TestPortfolioMemberDomainsView):
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), "The domain assignment changes have been saved.")
expected_domains = [self.domain1, self.domain2, self.domain3]
# Verify that the invitation email was sent
mock_send_domain_email.assert_called_once()
call_args = mock_send_domain_email.call_args.kwargs
self.assertEqual(call_args["email"], "info@example.com")
self.assertEqual(call_args["requestor"], self.user)
self.assertEqual(list(call_args["domains"]), list(expected_domains))
self.assertIsNone(call_args.get("is_member_of_different_org"))
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_post_with_valid_removed_domains(self):
@patch("registrar.views.portfolios.send_domain_invitation_email")
def test_post_with_valid_removed_domains(self, mock_send_domain_email):
"""Test that domains can be successfully removed."""
self.client.force_login(self.user)
# Create some UserDomainRole objects
domains = [1, 2, 3]
UserDomainRole.objects.bulk_create([UserDomainRole(domain_id=domain, user=self.user) for domain in domains])
domains = [self.domain1, self.domain2, self.domain3]
UserDomainRole.objects.bulk_create([UserDomainRole(domain=domain, user=self.user) for domain in domains])
data = {
"removed_domains": json.dumps([1, 2]),
"removed_domains": json.dumps([self.domain1.id, self.domain2.id]),
}
response = self.client.post(self.url, data)
# Check that the UserDomainRole objects were deleted
self.assertEqual(UserDomainRole.objects.filter(user=self.user).count(), 1)
self.assertEqual(UserDomainRole.objects.filter(domain_id=3, user=self.user).count(), 1)
self.assertEqual(UserDomainRole.objects.filter(domain=self.domain3, user=self.user).count(), 1)
# Check for a success message and a redirect
self.assertRedirects(response, reverse("member-domains", kwargs={"pk": self.portfolio_permission.pk}))
messages = list(response.wsgi_request._messages)
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), "The domain assignment changes have been saved.")
# assert that send_domain_invitation_email is not called
mock_send_domain_email.assert_not_called()
UserDomainRole.objects.all().delete()
@ -2290,26 +2353,93 @@ class TestPortfolioMemberDomainsEditView(TestPortfolioMemberDomainsView):
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), "No changes detected.")
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_domain_invitation_email")
def test_post_when_send_domain_email_raises_exception(self, mock_send_domain_email):
"""Test attempt to add new domains when an EmailSendingError raised."""
self.client.force_login(self.user)
class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomainsView):
data = {
"added_domains": json.dumps([self.domain1.id, self.domain2.id, self.domain3.id]), # Mock domain IDs
}
mock_send_domain_email.side_effect = EmailSendingError("Failed to send email")
response = self.client.post(self.url, data)
# Check that the UserDomainRole objects were not created
self.assertEqual(UserDomainRole.objects.filter(user=self.user, role=UserDomainRole.Roles.MANAGER).count(), 0)
# Check for an error message and a redirect to edit form
self.assertRedirects(response, reverse("member-domains-edit", kwargs={"pk": self.portfolio_permission.pk}))
messages = list(response.wsgi_request._messages)
self.assertEqual(len(messages), 1)
self.assertEqual(
str(messages[0]),
"An unexpected error occurred: Failed to send email. If the issue persists, please contact help@get.gov.",
)
class TestPortfolioInvitedMemberEditDomainsView(TestWithUser, WebTest):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.url = reverse("invitedmember-domains-edit", kwargs={"pk": cls.invitation.pk})
# Create Portfolio
cls.portfolio = Portfolio.objects.create(creator=cls.user, organization_name="Test Portfolio")
# Create domains for testing
cls.domain1 = Domain.objects.create(name="1.gov")
cls.domain2 = Domain.objects.create(name="2.gov")
cls.domain3 = Domain.objects.create(name="3.gov")
@classmethod
def tearDownClass(cls):
super().tearDownClass()
Portfolio.objects.all().delete()
User.objects.all().delete()
Domain.objects.all().delete()
def setUp(self):
super().setUp()
names = ["1.gov", "2.gov", "3.gov"]
Domain.objects.bulk_create([Domain(name=name) for name in names])
# Add a user with no permissions
self.user_no_perms = User.objects.create(
username="test_user_no_perms",
first_name="No",
last_name="Permissions",
email="user_no_perms@example.com",
phone="8003112345",
title="No Permissions",
)
# Add an invited member who has been invited to manage domains
self.invited_member_email = "invited@example.com"
self.invitation = PortfolioInvitation.objects.create(
email=self.invited_member_email,
portfolio=self.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_MEMBERS,
],
)
# Assign permissions to the user making requests
UserPortfolioPermission.objects.create(
user=self.user,
portfolio=self.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_MEMBERS,
UserPortfolioPermissionChoices.EDIT_MEMBERS,
],
)
self.url = reverse("invitedmember-domains-edit", kwargs={"pk": self.invitation.pk})
def tearDown(self):
super().tearDown()
Domain.objects.all().delete()
DomainInvitation.objects.all().delete()
UserPortfolioPermission.objects.all().delete()
PortfolioInvitation.objects.all().delete()
Portfolio.objects.exclude(id=self.portfolio.id).delete()
User.objects.exclude(id=self.user.id).delete()
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@ -2364,12 +2494,13 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_post_with_valid_added_domains(self):
@patch("registrar.views.portfolios.send_domain_invitation_email")
def test_post_with_valid_added_domains(self, mock_send_domain_email):
"""Test adding new domains successfully."""
self.client.force_login(self.user)
data = {
"added_domains": json.dumps([1, 2, 3]), # Mock domain IDs
"added_domains": json.dumps([self.domain1.id, self.domain2.id, self.domain3.id]),
}
response = self.client.post(self.url, data)
@ -2387,10 +2518,20 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), "The domain assignment changes have been saved.")
expected_domains = [self.domain1, self.domain2, self.domain3]
# Verify that the invitation email was sent
mock_send_domain_email.assert_called_once()
call_args = mock_send_domain_email.call_args.kwargs
self.assertEqual(call_args["email"], "invited@example.com")
self.assertEqual(call_args["requestor"], self.user)
self.assertEqual(list(call_args["domains"]), list(expected_domains))
self.assertFalse(call_args.get("is_member_of_different_org"))
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_post_with_existing_and_new_added_domains(self):
@patch("registrar.views.portfolios.send_domain_invitation_email")
def test_post_with_existing_and_new_added_domains(self, _):
"""Test updating existing and adding new invitations."""
self.client.force_login(self.user)
@ -2398,29 +2539,33 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain
DomainInvitation.objects.bulk_create(
[
DomainInvitation(
domain_id=1, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.CANCELED
domain=self.domain1,
email="invited@example.com",
status=DomainInvitation.DomainInvitationStatus.CANCELED,
),
DomainInvitation(
domain_id=2, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
domain=self.domain2,
email="invited@example.com",
status=DomainInvitation.DomainInvitationStatus.INVITED,
),
]
)
data = {
"added_domains": json.dumps([1, 2, 3]),
"added_domains": json.dumps([self.domain1.id, self.domain2.id, self.domain3.id]),
}
response = self.client.post(self.url, data)
# Check that status for domain_id=1 was updated to INVITED
self.assertEqual(
DomainInvitation.objects.get(domain_id=1, email="invited@example.com").status,
DomainInvitation.objects.get(domain=self.domain1, email="invited@example.com").status,
DomainInvitation.DomainInvitationStatus.INVITED,
)
# Check that domain_id=3 was created as INVITED
self.assertTrue(
DomainInvitation.objects.filter(
domain_id=3, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
domain=self.domain3, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
).exists()
)
@ -2430,7 +2575,8 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_post_with_valid_removed_domains(self):
@patch("registrar.views.portfolios.send_domain_invitation_email")
def test_post_with_valid_removed_domains(self, mock_send_domain_email):
"""Test removing domains successfully."""
self.client.force_login(self.user)
@ -2438,33 +2584,39 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain
DomainInvitation.objects.bulk_create(
[
DomainInvitation(
domain_id=1, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
domain=self.domain1,
email="invited@example.com",
status=DomainInvitation.DomainInvitationStatus.INVITED,
),
DomainInvitation(
domain_id=2, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
domain=self.domain2,
email="invited@example.com",
status=DomainInvitation.DomainInvitationStatus.INVITED,
),
]
)
data = {
"removed_domains": json.dumps([1]),
"removed_domains": json.dumps([self.domain1.id]),
}
response = self.client.post(self.url, data)
# Check that the status for domain_id=1 was updated to CANCELED
self.assertEqual(
DomainInvitation.objects.get(domain_id=1, email="invited@example.com").status,
DomainInvitation.objects.get(domain=self.domain1, email="invited@example.com").status,
DomainInvitation.DomainInvitationStatus.CANCELED,
)
# Check that domain_id=2 remains INVITED
self.assertEqual(
DomainInvitation.objects.get(domain_id=2, email="invited@example.com").status,
DomainInvitation.objects.get(domain=self.domain2, email="invited@example.com").status,
DomainInvitation.DomainInvitationStatus.INVITED,
)
# Check for a success message and a redirect
self.assertRedirects(response, reverse("invitedmember-domains", kwargs={"pk": self.invitation.pk}))
# assert that send_domain_invitation_email is not called
mock_send_domain_email.assert_not_called()
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@ -2530,6 +2682,37 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), "No changes detected.")
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_domain_invitation_email")
def test_post_when_send_domain_email_raises_exception(self, mock_send_domain_email):
"""Test attempt to add new domains when an EmailSendingError raised."""
self.client.force_login(self.user)
data = {
"added_domains": json.dumps([self.domain1.id, self.domain2.id, self.domain3.id]),
}
mock_send_domain_email.side_effect = EmailSendingError("Failed to send email")
response = self.client.post(self.url, data)
# Check that the DomainInvitation objects were not created
self.assertEqual(
DomainInvitation.objects.filter(
email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
).count(),
0,
)
# Check for an error message and a redirect to edit form
self.assertRedirects(response, reverse("invitedmember-domains-edit", kwargs={"pk": self.invitation.pk}))
messages = list(response.wsgi_request._messages)
self.assertEqual(len(messages), 1)
self.assertEqual(
str(messages[0]),
"An unexpected error occurred: Failed to send email. If the issue persists, please contact help@get.gov.",
)
class TestRequestingEntity(WebTest):
"""The requesting entity page is a domain request form that only exists
@ -2879,7 +3062,7 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest):
],
)
cls.new_member_email = "davekenn4242@gmail.com"
cls.new_member_email = "newmember@example.com"
AllowedEmail.objects.get_or_create(email=cls.new_member_email)
@ -2933,11 +3116,13 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest):
self.assertEqual(final_response.status_code, 302) # Redirects
# Validate Database Changes
# Validate that portfolio invitation was created but not retrieved
portfolio_invite = PortfolioInvitation.objects.filter(
email=self.new_member_email, portfolio=self.portfolio
).first()
self.assertIsNotNone(portfolio_invite)
self.assertEqual(portfolio_invite.email, self.new_member_email)
self.assertEqual(portfolio_invite.status, PortfolioInvitation.PortfolioInvitationStatus.INVITED)
# Check that an email was sent
self.assertTrue(mock_client.send_email.called)
@ -3228,6 +3413,52 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest):
# assert that send_portfolio_invitation_email is not called
mock_send_email.assert_not_called()
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_invitation_email")
def test_member_invite_for_existing_user_who_is_not_a_member(self, mock_send_email):
"""Tests the member invitation flow for existing user who is not a portfolio member."""
self.client.force_login(self.user)
# Simulate a session to ensure continuity
session_id = self.client.session.session_key
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
new_user = User.objects.create(email="newuser@example.com")
# Simulate submission of member invite for the newly created user
response = self.client.post(
reverse("new-member"),
{
"role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER.value,
"domain_request_permission_member": UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS.value,
"email": "newuser@example.com",
},
)
self.assertEqual(response.status_code, 302)
# Validate Database Changes
# Validate that portfolio invitation was created and retrieved
portfolio_invite = PortfolioInvitation.objects.filter(
email="newuser@example.com", portfolio=self.portfolio
).first()
self.assertIsNotNone(portfolio_invite)
self.assertEqual(portfolio_invite.email, "newuser@example.com")
self.assertEqual(portfolio_invite.status, PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED)
# Validate UserPortfolioPermission
user_portfolio_permission = UserPortfolioPermission.objects.filter(
user=new_user, portfolio=self.portfolio
).first()
self.assertIsNotNone(user_portfolio_permission)
# assert that send_portfolio_invitation_email is called
mock_send_email.assert_called_once()
call_args = mock_send_email.call_args.kwargs
self.assertEqual(call_args["email"], "newuser@example.com")
self.assertEqual(call_args["requestor"], self.user)
self.assertIsNone(call_args.get("is_member_of_different_org"))
class TestEditPortfolioMemberView(WebTest):
"""Tests for the edit member page on portfolios"""

View file

@ -1,5 +1,6 @@
from django.conf import settings
from registrar.models import DomainInvitation
from registrar.models.domain import Domain
from registrar.utility.errors import (
AlreadyDomainInvitedError,
AlreadyDomainManagerError,
@ -7,23 +8,24 @@ from registrar.utility.errors import (
OutsideOrgMemberError,
)
from registrar.utility.waffle import flag_is_active_for_user
from registrar.utility.email import send_templated_email
from registrar.utility.email import EmailSendingError, send_templated_email
import logging
logger = logging.getLogger(__name__)
def send_domain_invitation_email(email: str, requestor, domain, is_member_of_different_org):
def send_domain_invitation_email(
email: str, requestor, domains: Domain | list[Domain], is_member_of_different_org, requested_user=None
):
"""
Sends a domain invitation email to the specified address.
Raises exceptions for validation or email-sending issues.
Args:
email (str): Email address of the recipient.
requestor (User): The user initiating the invitation.
domain (Domain): The domain object for which the invitation is being sent.
domains (Domain or list of Domain): The domain objects for which the invitation is being sent.
is_member_of_different_org (bool): if an email belongs to a different org
requested_user (User | None): The recipient if the email belongs to a user in the registrar
Raises:
MissingEmailError: If the requestor has no email associated with their account.
@ -32,26 +34,54 @@ def send_domain_invitation_email(email: str, requestor, domain, is_member_of_dif
OutsideOrgMemberError: If the requested_user is part of a different organization.
EmailSendingError: If there is an error while sending the email.
"""
# Default email address for staff
requestor_email = settings.DEFAULT_FROM_EMAIL
domains = normalize_domains(domains)
requestor_email = get_requestor_email(requestor, domains)
validate_invitation(email, domains, requestor, is_member_of_different_org)
send_invitation_email(email, requestor_email, domains, requested_user)
def normalize_domains(domains):
"""Ensures domains is always a list."""
return [domains] if isinstance(domains, Domain) else domains
def get_requestor_email(requestor, domains):
"""Get the requestor's email or raise an error if it's missing.
If the requestor is staff, default email is returned.
"""
if requestor.is_staff:
return settings.DEFAULT_FROM_EMAIL
# Check if the requestor is staff and has an email
if not requestor.is_staff:
if not requestor.email or requestor.email.strip() == "":
raise MissingEmailError
else:
requestor_email = requestor.email
domain_names = ", ".join([domain.name for domain in domains])
raise MissingEmailError(email=requestor.email, domain=domain_names)
# Check if the recipient is part of a different organization
# COMMENT: this does not account for multiple_portfolios flag being active
return requestor.email
def validate_invitation(email, domains, requestor, is_member_of_different_org):
"""Validate the invitation conditions."""
check_outside_org_membership(email, requestor, is_member_of_different_org)
for domain in domains:
validate_existing_invitation(email, domain)
def check_outside_org_membership(email, requestor, is_member_of_different_org):
"""Raise an error if the email belongs to a different organization."""
if (
flag_is_active_for_user(requestor, "organization_feature")
and not flag_is_active_for_user(requestor, "multiple_portfolios")
and is_member_of_different_org
):
raise OutsideOrgMemberError
raise OutsideOrgMemberError(email=email)
# Check for an existing invitation
def validate_existing_invitation(email, domain):
"""Check for existing invitations and handle their status."""
try:
invite = DomainInvitation.objects.get(email=email, domain=domain)
if invite.status == DomainInvitation.DomainInvitationStatus.RETRIEVED:
@ -64,16 +94,24 @@ def send_domain_invitation_email(email: str, requestor, domain, is_member_of_dif
except DomainInvitation.DoesNotExist:
pass
# Send the email
def send_invitation_email(email, requestor_email, domains, requested_user):
"""Send the invitation email."""
try:
send_templated_email(
"emails/domain_invitation.txt",
"emails/domain_invitation_subject.txt",
to_address=email,
context={
"domain": domain,
"domains": domains,
"requestor_email": requestor_email,
"invitee_email_address": email,
"requested_user": requested_user,
},
)
except EmailSendingError as err:
domain_names = ", ".join([domain.name for domain in domains])
raise EmailSendingError(f"Could not send email invitation to {email} for domains: {domain_names}") from err
def send_portfolio_invitation_email(email: str, requestor, portfolio):
@ -98,10 +136,11 @@ def send_portfolio_invitation_email(email: str, requestor, portfolio):
# Check if the requestor is staff and has an email
if not requestor.is_staff:
if not requestor.email or requestor.email.strip() == "":
raise MissingEmailError
raise MissingEmailError(email=email, portfolio=portfolio)
else:
requestor_email = requestor.email
try:
send_templated_email(
"emails/portfolio_invitation.txt",
"emails/portfolio_invitation_subject.txt",
@ -112,3 +151,7 @@ def send_portfolio_invitation_email(email: str, requestor, portfolio):
"email": email,
},
)
except EmailSendingError as err:
raise EmailSendingError(
f"Could not sent email invitation to {email} for portfolio {portfolio}. Portfolio invitation not saved."
) from err

View file

@ -46,8 +46,17 @@ class AlreadyDomainInvitedError(InvitationError):
class MissingEmailError(InvitationError):
"""Raised when the requestor has no email associated with their account."""
def __init__(self):
super().__init__("Can't send invitation email. No email is associated with your user account.")
def __init__(self, email=None, domain=None, portfolio=None):
# Default message if no additional info is provided
message = "Can't send invitation email. No email is associated with your user account."
# Customize message based on provided arguments
if email and domain:
message = f"Can't send email to '{email}' on domain '{domain}'. No email exists for the requestor."
elif email and portfolio:
message = f"Can't send email to '{email}' for portfolio '{portfolio}'. No email exists for the requestor."
super().__init__(message)
class OutsideOrgMemberError(ValueError):

View file

@ -10,7 +10,6 @@ import logging
import requests
from django.contrib import messages
from django.contrib.messages.views import SuccessMessageMixin
from django.db import IntegrityError
from django.http import HttpResponseRedirect
from django.shortcuts import redirect, render, get_object_or_404
from django.urls import reverse
@ -31,22 +30,23 @@ from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
from registrar.utility.enums import DefaultEmail
from registrar.utility.errors import (
AlreadyDomainInvitedError,
AlreadyDomainManagerError,
GenericError,
GenericErrorCodes,
MissingEmailError,
NameserverError,
NameserverErrorCodes as nsErrorCodes,
DsDataError,
DsDataErrorCodes,
SecurityEmailError,
SecurityEmailErrorCodes,
OutsideOrgMemberError,
)
from registrar.models.utility.contact_error import ContactError
from registrar.views.utility.permission_views import UserDomainRolePermissionDeleteView
from registrar.utility.waffle import flag_is_active_for_user
from registrar.views.utility.invitation_helper import (
get_org_membership,
get_requested_user,
handle_invitation_exceptions,
)
from ..forms import (
SeniorOfficialContactForm,
@ -1190,43 +1190,13 @@ class DomainAddUserView(DomainFormBaseView):
def get_success_url(self):
return reverse("domain-users", kwargs={"pk": self.object.pk})
def _get_org_membership(self, requestor_org, requested_email, requested_user):
"""
Verifies if an email belongs to a different organization as a member or invited member.
Verifies if an email belongs to this organization as a member or invited member.
User does not belong to any org can be deduced from the tuple returned.
Returns a tuple (member_of_a_different_org, member_of_this_org).
"""
# COMMENT: this code does not take into account multiple portfolios flag
# COMMENT: shouldn't this code be based on the organization of the domain, not the org
# of the requestor? requestor could have multiple portfolios
# Check for existing permissions or invitations for the requested user
existing_org_permission = UserPortfolioPermission.objects.filter(user=requested_user).first()
existing_org_invitation = PortfolioInvitation.objects.filter(email=requested_email).first()
# Determine membership in a different organization
member_of_a_different_org = (
existing_org_permission and existing_org_permission.portfolio != requestor_org
) or (existing_org_invitation and existing_org_invitation.portfolio != requestor_org)
# Determine membership in the same organization
member_of_this_org = (existing_org_permission and existing_org_permission.portfolio == requestor_org) or (
existing_org_invitation and existing_org_invitation.portfolio == requestor_org
)
return member_of_a_different_org, member_of_this_org
def form_valid(self, form):
"""Add the specified user to this domain."""
requested_email = form.cleaned_data["email"]
requestor = self.request.user
# Look up a user with that email
requested_user = self._get_requested_user(requested_email)
requested_user = get_requested_user(requested_email)
# NOTE: This does not account for multiple portfolios flag being set to True
domain_org = self.object.domain_info.portfolio
@ -1237,13 +1207,12 @@ class DomainAddUserView(DomainFormBaseView):
or requestor.is_staff
)
member_of_a_different_org, member_of_this_org = self._get_org_membership(
domain_org, requested_email, requested_user
)
member_of_a_different_org, member_of_this_org = get_org_membership(domain_org, requested_email, requested_user)
try:
# COMMENT: this code does not take into account multiple portfolios flag being set to TRUE
# determine portfolio of the domain (code currently is looking at requestor's portfolio)
# if requested_email/user is not member or invited member of this portfolio
# COMMENT: this code does not take into account multiple portfolios flag
# send portfolio invitation email
# create portfolio invitation
# create message to view
@ -1254,38 +1223,31 @@ class DomainAddUserView(DomainFormBaseView):
and requestor_can_update_portfolio
and not member_of_this_org
):
try:
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=domain_org)
PortfolioInvitation.objects.get_or_create(email=requested_email, portfolio=domain_org)
portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create(
email=requested_email, portfolio=domain_org, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER]
)
# if user exists for email, immediately retrieve portfolio invitation upon creation
if requested_user is not None:
portfolio_invitation.retrieve()
portfolio_invitation.save()
messages.success(self.request, f"{requested_email} has been invited to the organization: {domain_org}")
except Exception as e:
self._handle_portfolio_exceptions(e, requested_email, domain_org)
# If that first invite does not succeed take an early exit
return redirect(self.get_success_url())
try:
if requested_user is None:
self._handle_new_user_invitation(requested_email, requestor, member_of_a_different_org)
else:
self._handle_existing_user(requested_email, requestor, requested_user, member_of_a_different_org)
except Exception as e:
self._handle_exceptions(e, requested_email)
handle_invitation_exceptions(self.request, e, requested_email)
return redirect(self.get_success_url())
def _get_requested_user(self, email):
"""Retrieve a user by email or return None if the user doesn't exist."""
try:
return User.objects.get(email=email)
except User.DoesNotExist:
return None
def _handle_new_user_invitation(self, email, requestor, member_of_different_org):
"""Handle invitation for a new user who does not exist in the system."""
send_domain_invitation_email(
email=email,
requestor=requestor,
domain=self.object,
domains=self.object,
is_member_of_different_org=member_of_different_org,
)
DomainInvitation.objects.get_or_create(email=email, domain=self.object)
@ -1296,8 +1258,9 @@ class DomainAddUserView(DomainFormBaseView):
send_domain_invitation_email(
email=email,
requestor=requestor,
domain=self.object,
domains=self.object,
is_member_of_different_org=member_of_different_org,
requested_user=requested_user,
)
UserDomainRole.objects.create(
user=requested_user,
@ -1306,57 +1269,6 @@ class DomainAddUserView(DomainFormBaseView):
)
messages.success(self.request, f"Added user {email}.")
def _handle_exceptions(self, exception, email):
"""Handle exceptions raised during the process."""
if isinstance(exception, EmailSendingError):
logger.warning(
"Could not send email invitation to %s for domain %s (EmailSendingError)",
email,
self.object,
exc_info=True,
)
messages.warning(self.request, "Could not send email invitation.")
elif isinstance(exception, OutsideOrgMemberError):
logger.warning(
"Could not send email. Can not invite member of a .gov organization to a different organization.",
self.object,
exc_info=True,
)
messages.error(
self.request,
f"{email} is already a member of another .gov organization.",
)
elif isinstance(exception, AlreadyDomainManagerError):
messages.warning(self.request, str(exception))
elif isinstance(exception, AlreadyDomainInvitedError):
messages.warning(self.request, str(exception))
elif isinstance(exception, MissingEmailError):
messages.error(self.request, str(exception))
logger.error(
f"Can't send email to '{email}' on domain '{self.object}'. No email exists for the requestor.",
exc_info=True,
)
elif isinstance(exception, IntegrityError):
messages.warning(self.request, f"{email} is already a manager for this domain")
else:
logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
messages.warning(self.request, "Could not send email invitation.")
def _handle_portfolio_exceptions(self, exception, email, portfolio):
"""Handle exceptions raised during the process."""
if isinstance(exception, EmailSendingError):
logger.warning("Could not send email invitation (EmailSendingError)", exc_info=True)
messages.warning(self.request, "Could not send email invitation.")
elif isinstance(exception, MissingEmailError):
messages.error(self.request, str(exception))
logger.error(
f"Can't send email to '{email}' for portfolio '{portfolio}'. No email exists for the requestor.",
exc_info=True,
)
else:
logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
messages.warning(self.request, "Could not send email invitation.")
class DomainInvitationCancelView(SuccessMessageMixin, DomainInvitationPermissionCancelView):
object: DomainInvitation

View file

@ -8,13 +8,14 @@ from django.utils.safestring import mark_safe
from django.contrib import messages
from registrar.forms import portfolio as portfolioForms
from registrar.models import Portfolio, User
from registrar.models.domain import Domain
from registrar.models.domain_invitation import DomainInvitation
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user_domain_role import UserDomainRole
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from registrar.utility.email import EmailSendingError
from registrar.utility.email_invitations import send_portfolio_invitation_email
from registrar.utility.email_invitations import send_domain_invitation_email, send_portfolio_invitation_email
from registrar.utility.errors import MissingEmailError
from registrar.utility.enums import DefaultUserValues
from registrar.views.utility.mixins import PortfolioMemberPermission
@ -33,6 +34,8 @@ from django.views.generic import View
from django.views.generic.edit import FormMixin
from django.db import IntegrityError
from registrar.views.utility.invitation_helper import get_org_membership
logger = logging.getLogger(__name__)
@ -237,6 +240,7 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V
removed_domains = request.POST.get("removed_domains")
portfolio_permission = get_object_or_404(UserPortfolioPermission, pk=pk)
member = portfolio_permission.user
portfolio = portfolio_permission.portfolio
added_domain_ids = self._parse_domain_ids(added_domains, "added domains")
if added_domain_ids is None:
@ -248,7 +252,7 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V
if added_domain_ids or removed_domain_ids:
try:
self._process_added_domains(added_domain_ids, member)
self._process_added_domains(added_domain_ids, member, request.user, portfolio)
self._process_removed_domains(removed_domain_ids, member)
messages.success(request, "The domain assignment changes have been saved.")
return redirect(reverse("member-domains", kwargs={"pk": pk}))
@ -258,15 +262,15 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V
"A database error occurred while saving changes. If the issue persists, "
f"please contact {DefaultUserValues.HELP_EMAIL}.",
)
logger.error("A database error occurred while saving changes.")
logger.error("A database error occurred while saving changes.", exc_info=True)
return redirect(reverse("member-domains-edit", kwargs={"pk": pk}))
except Exception as e:
messages.error(
request,
"An unexpected error occurred: {str(e)}. If the issue persists, "
f"An unexpected error occurred: {str(e)}. If the issue persists, "
f"please contact {DefaultUserValues.HELP_EMAIL}.",
)
logger.error(f"An unexpected error occurred: {str(e)}")
logger.error(f"An unexpected error occurred: {str(e)}", exc_info=True)
return redirect(reverse("member-domains-edit", kwargs={"pk": pk}))
else:
messages.info(request, "No changes detected.")
@ -287,16 +291,26 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V
logger.error(f"Invalid data for {domain_type}")
return None
def _process_added_domains(self, added_domain_ids, member):
def _process_added_domains(self, added_domain_ids, member, requestor, portfolio):
"""
Processes added domains by bulk creating UserDomainRole instances.
"""
if added_domain_ids:
# get added_domains from ids to pass to send email method and bulk create
added_domains = Domain.objects.filter(id__in=added_domain_ids)
member_of_a_different_org, _ = get_org_membership(portfolio, member.email, member)
send_domain_invitation_email(
email=member.email,
requestor=requestor,
domains=added_domains,
is_member_of_different_org=member_of_a_different_org,
requested_user=member,
)
# Bulk create UserDomainRole instances for added domains
UserDomainRole.objects.bulk_create(
[
UserDomainRole(domain_id=domain_id, user=member, role=UserDomainRole.Roles.MANAGER)
for domain_id in added_domain_ids
UserDomainRole(domain=domain, user=member, role=UserDomainRole.Roles.MANAGER)
for domain in added_domains
],
ignore_conflicts=True, # Avoid duplicate entries
)
@ -443,6 +457,7 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission
removed_domains = request.POST.get("removed_domains")
portfolio_invitation = get_object_or_404(PortfolioInvitation, pk=pk)
email = portfolio_invitation.email
portfolio = portfolio_invitation.portfolio
added_domain_ids = self._parse_domain_ids(added_domains, "added domains")
if added_domain_ids is None:
@ -454,7 +469,7 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission
if added_domain_ids or removed_domain_ids:
try:
self._process_added_domains(added_domain_ids, email)
self._process_added_domains(added_domain_ids, email, request.user, portfolio)
self._process_removed_domains(removed_domain_ids, email)
messages.success(request, "The domain assignment changes have been saved.")
return redirect(reverse("invitedmember-domains", kwargs={"pk": pk}))
@ -464,15 +479,15 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission
"A database error occurred while saving changes. If the issue persists, "
f"please contact {DefaultUserValues.HELP_EMAIL}.",
)
logger.error("A database error occurred while saving changes.")
logger.error("A database error occurred while saving changes.", exc_info=True)
return redirect(reverse("invitedmember-domains-edit", kwargs={"pk": pk}))
except Exception as e:
messages.error(
request,
"An unexpected error occurred: {str(e)}. If the issue persists, "
f"An unexpected error occurred: {str(e)}. If the issue persists, "
f"please contact {DefaultUserValues.HELP_EMAIL}.",
)
logger.error(f"An unexpected error occurred: {str(e)}.")
logger.error(f"An unexpected error occurred: {str(e)}.", exc_info=True)
return redirect(reverse("invitedmember-domains-edit", kwargs={"pk": pk}))
else:
messages.info(request, "No changes detected.")
@ -493,16 +508,24 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission
logger.error(f"Invalid data for {domain_type}.")
return None
def _process_added_domains(self, added_domain_ids, email):
def _process_added_domains(self, added_domain_ids, email, requestor, portfolio):
"""
Processes added domain invitations by updating existing invitations
or creating new ones.
"""
if not added_domain_ids:
return
if added_domain_ids:
# get added_domains from ids to pass to send email method and bulk create
added_domains = Domain.objects.filter(id__in=added_domain_ids)
member_of_a_different_org, _ = get_org_membership(portfolio, email, None)
send_domain_invitation_email(
email=email,
requestor=requestor,
domains=added_domains,
is_member_of_different_org=member_of_a_different_org,
)
# Update existing invitations from CANCELED to INVITED
existing_invitations = DomainInvitation.objects.filter(domain_id__in=added_domain_ids, email=email)
existing_invitations = DomainInvitation.objects.filter(domain__in=added_domains, email=email)
existing_invitations.update(status=DomainInvitation.DomainInvitationStatus.INVITED)
# Determine which domains need new invitations
@ -754,7 +777,11 @@ class PortfolioAddMemberView(PortfolioMembersPermissionView, FormMixin):
try:
if not requested_user or not permission_exists:
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=portfolio)
form.save()
portfolio_invitation = form.save()
# if user exists for email, immediately retrieve portfolio invitation upon creation
if requested_user is not None:
portfolio_invitation.retrieve()
portfolio_invitation.save()
messages.success(self.request, f"{requested_email} has been invited.")
else:
if permission_exists:

View file

@ -0,0 +1,86 @@
from django.contrib import messages
from django.db import IntegrityError
from registrar.models import PortfolioInvitation, User, UserPortfolioPermission
from registrar.utility.email import EmailSendingError
import logging
from registrar.utility.errors import (
AlreadyDomainInvitedError,
AlreadyDomainManagerError,
MissingEmailError,
OutsideOrgMemberError,
)
logger = logging.getLogger(__name__)
# These methods are used by multiple views which share similar logic and function
# when creating invitations and sending associated emails. These can be reused in
# any view, and were initially developed for domain.py, portfolios.py and admin.py
def get_org_membership(org, email, user):
"""
Determines if an email/user belongs to a different organization or this organization
as either a member or an invited member.
This function returns a tuple (member_of_a_different_org, member_of_this_org),
which provides:
- member_of_a_different_org: True if the user/email is associated with an organization other than the given org.
- member_of_this_org: True if the user/email is associated with the given org.
Note: This implementation assumes single portfolio ownership for a user.
If the "multiple portfolios" feature is enabled, this logic may not account for
situations where a user or email belongs to multiple organizations.
"""
# Check for existing permissions or invitations for the user
existing_org_permission = UserPortfolioPermission.objects.filter(user=user).first()
existing_org_invitation = PortfolioInvitation.objects.filter(email=email).first()
# Determine membership in a different organization
member_of_a_different_org = (existing_org_permission and existing_org_permission.portfolio != org) or (
existing_org_invitation and existing_org_invitation.portfolio != org
)
# Determine membership in the same organization
member_of_this_org = (existing_org_permission and existing_org_permission.portfolio == org) or (
existing_org_invitation and existing_org_invitation.portfolio == org
)
return member_of_a_different_org, member_of_this_org
def get_requested_user(email):
"""Retrieve a user by email or return None if the user doesn't exist."""
try:
return User.objects.get(email=email)
except User.DoesNotExist:
return None
def handle_invitation_exceptions(request, exception, email):
"""Handle exceptions raised during the process."""
if isinstance(exception, EmailSendingError):
logger.warning(str(exception), exc_info=True)
messages.error(request, str(exception))
elif isinstance(exception, MissingEmailError):
messages.error(request, str(exception))
logger.error(str(exception), exc_info=True)
elif isinstance(exception, OutsideOrgMemberError):
logger.warning(
"Could not send email. Can not invite member of a .gov organization to a different organization.",
exc_info=True,
)
messages.error(
request,
f"{email} is already a member of another .gov organization.",
)
elif isinstance(exception, AlreadyDomainManagerError):
messages.warning(request, str(exception))
elif isinstance(exception, AlreadyDomainInvitedError):
messages.warning(request, str(exception))
elif isinstance(exception, IntegrityError):
messages.warning(request, f"{email} is already a manager for this domain")
else:
logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
messages.warning(request, "Could not send email invitation.")