Merge pull request #3428 from cisagov/hotgov/3402-invitation-updates

#3402: Portfolio and domain invitation updates [HOTGOV]
This commit is contained in:
Rachid Mrad 2025-02-03 15:01:19 -05:00 committed by GitHub
commit 14c68c1f1f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 262 additions and 46 deletions

View file

@ -1407,10 +1407,13 @@ class BaseInvitationAdmin(ListHeaderAdmin):
Normal flow on successful save_model on add is to redirect to changelist_view.
If there are errors, flow is modified to instead render change form.
"""
# store current messages from request so that they are preserved throughout the method
# store current messages from request in storage so that they are preserved throughout the
# method, as some flows remove and replace all messages, and so we store here to retrieve
# later
storage = get_messages(request)
# Check if there are any error or warning messages in the `messages` framework
has_errors = any(message.level_tag in ["error", "warning"] for message in storage)
# Check if there are any error messages in the `messages` framework
# error messages stop the workflow; other message levels allow flow to continue as normal
has_errors = any(message.level_tag in ["error"] for message in storage)
if has_errors:
# Re-render the change form if there are errors or warnings
@ -1552,13 +1555,14 @@ class DomainInvitationAdmin(BaseInvitationAdmin):
portfolio_invitation.save()
messages.success(request, f"{requested_email} has been invited to the organization: {domain_org}")
send_domain_invitation_email(
if not send_domain_invitation_email(
email=requested_email,
requestor=requestor,
domains=domain,
is_member_of_different_org=member_of_a_different_org,
requested_user=requested_user,
)
):
messages.warning(request, "Could not send email confirmation to existing domain managers.")
if requested_user is not None:
# Domain Invitation creation for an existing User
obj.retrieve()

View file

@ -25,6 +25,7 @@ from typing import Final
from botocore.config import Config
import json
import logging
import traceback
from django.utils.log import ServerFormatter
# # # ###
@ -471,7 +472,11 @@ class JsonFormatter(logging.Formatter):
"lineno": record.lineno,
"message": record.getMessage(),
}
return json.dumps(log_record)
# Capture exception info if it exists
if record.exc_info:
log_record["exception"] = "".join(traceback.format_exception(*record.exc_info))
return json.dumps(log_record, ensure_ascii=False)
class JsonServerFormatter(ServerFormatter):

View file

@ -1427,7 +1427,7 @@ class TestPortfolioInvitationAdmin(TestCase):
@less_console_noise_decorator
@patch("registrar.admin.send_portfolio_invitation_email")
@patch("django.contrib.messages.warning") # Mock the `messages.error` call
@patch("django.contrib.messages.error") # Mock the `messages.error` call
def test_save_exception_generic_error(self, mock_messages_error, mock_send_email):
"""Handle generic exceptions correctly during portfolio invitation."""
self.client.force_login(self.superuser)

View file

@ -1,8 +1,11 @@
import unittest
from unittest.mock import patch, MagicMock
from datetime import date
from registrar.models.domain import Domain
from registrar.models.user import User
from registrar.models.user_domain_role import UserDomainRole
from registrar.utility.email import EmailSendingError
from registrar.utility.email_invitations import send_domain_invitation_email
from registrar.utility.email_invitations import send_domain_invitation_email, send_emails_to_domain_managers
from api.tests.common import less_console_noise_decorator
@ -290,16 +293,16 @@ class DomainInvitationEmail(unittest.TestCase):
email = "invitee@example.com"
is_member_of_different_org = False
mock_send_domain_manager_emails.side_effect = EmailSendingError("Error sending email")
# Change the return value to False for mock_send_domain_manager_emails
mock_send_domain_manager_emails.return_value = False
# Call and assert exception
with self.assertRaises(EmailSendingError) as context:
send_domain_invitation_email(
email=email,
requestor=mock_requestor,
domains=mock_domain,
is_member_of_different_org=is_member_of_different_org,
)
# Call and assert that send_domain_invitation_email returns False
result = send_domain_invitation_email(
email=email,
requestor=mock_requestor,
domains=mock_domain,
is_member_of_different_org=is_member_of_different_org,
)
# Assertions
mock_normalize_domains.assert_called_once_with(mock_domain)
@ -308,4 +311,161 @@ class DomainInvitationEmail(unittest.TestCase):
email, None, [mock_domain], mock_requestor, is_member_of_different_org
)
mock_send_invitation_email.assert_called_once_with(email, mock_requestor_email, [mock_domain], None)
self.assertEqual(str(context.exception), "Error sending email")
# Assert that the result is False
self.assertFalse(result)
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.send_templated_email")
@patch("registrar.models.UserDomainRole.objects.filter")
def test_send_emails_to_domain_managers_all_emails_sent_successfully(self, mock_filter, mock_send_templated_email):
"""Test when all emails are sent successfully."""
# Setup mocks
mock_domain = MagicMock(spec=Domain)
mock_requestor_email = "requestor@example.com"
mock_email = "invitee@example.com"
# Create mock user and UserDomainRole
mock_user = MagicMock(spec=User)
mock_user.email = "manager@example.com"
mock_user_domain_role = MagicMock(spec=UserDomainRole, user=mock_user)
# Mock the filter method to return a list of mock UserDomainRole objects
mock_filter.return_value = [mock_user_domain_role]
# Mock successful email sending
mock_send_templated_email.return_value = None # No exception means success
# Call function
result = send_emails_to_domain_managers(mock_email, mock_requestor_email, mock_domain)
# Assertions
self.assertTrue(result) # All emails should be successfully sent
mock_send_templated_email.assert_called_once_with(
"emails/domain_manager_notification.txt",
"emails/domain_manager_notification_subject.txt",
to_address="manager@example.com",
context={
"domain": mock_domain,
"requestor_email": mock_requestor_email,
"invited_email_address": mock_email,
"domain_manager": mock_user,
"date": date.today(),
},
)
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.send_templated_email")
@patch("registrar.models.UserDomainRole.objects.filter")
def test_send_emails_to_domain_managers_email_send_fails(self, mock_filter, mock_send_templated_email):
"""Test when sending an email fails (raises EmailSendingError)."""
# Setup mocks
mock_domain = MagicMock(spec=Domain)
mock_requestor_email = "requestor@example.com"
mock_email = "invitee@example.com"
# Create mock user and UserDomainRole
mock_user = MagicMock(spec=User)
mock_user.email = "manager@example.com"
mock_user_domain_role = MagicMock(spec=UserDomainRole, user=mock_user)
# Mock the filter method to return a list of mock UserDomainRole objects
mock_filter.return_value = [mock_user_domain_role]
# Mock sending email to raise an EmailSendingError
mock_send_templated_email.side_effect = EmailSendingError("Email sending failed")
# Call function
result = send_emails_to_domain_managers(mock_email, mock_requestor_email, mock_domain)
# Assertions
self.assertFalse(result) # The result should be False as email sending failed
mock_send_templated_email.assert_called_once_with(
"emails/domain_manager_notification.txt",
"emails/domain_manager_notification_subject.txt",
to_address="manager@example.com",
context={
"domain": mock_domain,
"requestor_email": mock_requestor_email,
"invited_email_address": mock_email,
"domain_manager": mock_user,
"date": date.today(),
},
)
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.send_templated_email")
@patch("registrar.models.UserDomainRole.objects.filter")
def test_send_emails_to_domain_managers_no_domain_managers(self, mock_filter, mock_send_templated_email):
"""Test when there are no domain managers."""
# Setup mocks
mock_domain = MagicMock(spec=Domain)
mock_requestor_email = "requestor@example.com"
mock_email = "invitee@example.com"
# Mock no domain managers (empty UserDomainRole queryset)
mock_filter.return_value = []
# Call function
result = send_emails_to_domain_managers(mock_email, mock_requestor_email, mock_domain)
# Assertions
self.assertTrue(result) # No emails to send, so it should return True
mock_send_templated_email.assert_not_called() # No emails should be sent
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.send_templated_email")
@patch("registrar.models.UserDomainRole.objects.filter")
def test_send_emails_to_domain_managers_some_emails_fail(self, mock_filter, mock_send_templated_email):
"""Test when some emails fail to send."""
# Setup mocks
mock_domain = MagicMock(spec=Domain)
mock_requestor_email = "requestor@example.com"
mock_email = "invitee@example.com"
# Create mock users and UserDomainRoles
mock_user_1 = MagicMock(spec=User)
mock_user_1.email = "manager1@example.com"
mock_user_2 = MagicMock(spec=User)
mock_user_2.email = "manager2@example.com"
mock_user_domain_role_1 = MagicMock(spec=UserDomainRole, user=mock_user_1)
mock_user_domain_role_2 = MagicMock(spec=UserDomainRole, user=mock_user_2)
mock_filter.return_value = [mock_user_domain_role_1, mock_user_domain_role_2]
# Mock first email success and second email failure
mock_send_templated_email.side_effect = [None, EmailSendingError("Failed to send email")]
# Call function
result = send_emails_to_domain_managers(mock_email, mock_requestor_email, mock_domain)
# Assertions
self.assertFalse(result) # One email failed, so result should be False
mock_send_templated_email.assert_any_call(
"emails/domain_manager_notification.txt",
"emails/domain_manager_notification_subject.txt",
to_address="manager1@example.com",
context={
"domain": mock_domain,
"requestor_email": mock_requestor_email,
"invited_email_address": mock_email,
"domain_manager": mock_user_1,
"date": date.today(),
},
)
mock_send_templated_email.assert_any_call(
"emails/domain_manager_notification.txt",
"emails/domain_manager_notification_subject.txt",
to_address="manager2@example.com",
context={
"domain": mock_domain,
"requestor_email": mock_requestor_email,
"invited_email_address": mock_email,
"domain_manager": mock_user_2,
"date": date.today(),
},
)

View file

@ -925,6 +925,40 @@ class TestDomainManagers(TestDomainOverview):
success_page = success_result.follow()
self.assertContains(success_page, "notauser@igorville.gov")
@override_flag("organization_feature", active=True)
@less_console_noise_decorator
@patch("registrar.views.domain.send_portfolio_invitation_email")
@patch("registrar.views.domain.send_domain_invitation_email")
def test_domain_user_add_form_fails_to_send_to_some_managers(
self, mock_send_domain_email, mock_send_portfolio_email
):
"""Adding an email not associated with a user works and sends portfolio invitation,
and when domain managers email(s) fail to send, assert proper warning displayed."""
add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
add_page.form["email"] = "notauser@igorville.gov"
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
mock_send_domain_email.return_value = False
success_result = add_page.form.submit()
self.assertEqual(success_result.status_code, 302)
self.assertEqual(
success_result["Location"],
reverse("domain-users", kwargs={"pk": self.domain.id}),
)
# Verify that the invitation emails were sent
mock_send_portfolio_email.assert_called_once()
mock_send_domain_email.assert_called_once()
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_page = success_result.follow()
self.assertContains(success_page, "Could not send email confirmation to existing domain managers.")
@boto3_mocking.patching
@override_flag("organization_feature", active=True)
@less_console_noise_decorator

View file

@ -3254,7 +3254,7 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest):
# assert that response is a redirect to reverse("members")
self.assertRedirects(response, reverse("members"))
# assert that messages contains message, "Could not send email invitation"
mock_warning.assert_called_once_with(response.wsgi_request, "Could not send email invitation.")
mock_warning.assert_called_once_with(response.wsgi_request, "Could not send portfolio email invitation.")
# assert that portfolio invitation is not created
self.assertFalse(
PortfolioInvitation.objects.filter(email=self.new_member_email, portfolio=self.portfolio).exists(),
@ -3339,7 +3339,7 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest):
# assert that response is a redirect to reverse("members")
self.assertRedirects(response, reverse("members"))
# assert that messages contains message, "Could not send email invitation"
mock_warning.assert_called_once_with(response.wsgi_request, "Could not send email invitation.")
mock_warning.assert_called_once_with(response.wsgi_request, "Could not send portfolio email invitation.")
# assert that portfolio invitation is not created
self.assertFalse(
PortfolioInvitation.objects.filter(email=self.new_member_email, portfolio=self.portfolio).exists(),

View file

@ -27,6 +27,9 @@ def send_domain_invitation_email(
is_member_of_different_org (bool): if an email belongs to a different org
requested_user (User | None): The recipient if the email belongs to a user in the registrar
Returns:
Boolean indicating if all messages were sent successfully.
Raises:
MissingEmailError: If the requestor has no email associated with their account.
AlreadyDomainManagerError: If the email corresponds to an existing domain manager.
@ -41,22 +44,28 @@ def send_domain_invitation_email(
send_invitation_email(email, requestor_email, domains, requested_user)
all_manager_emails_sent = True
# send emails to domain managers
for domain in domains:
send_emails_to_domain_managers(
if not send_emails_to_domain_managers(
email=email,
requestor_email=requestor_email,
domain=domain,
requested_user=requested_user,
)
):
all_manager_emails_sent = False
return all_manager_emails_sent
def send_emails_to_domain_managers(email: str, requestor_email, domain: Domain, requested_user=None):
"""
Notifies all domain managers of the provided domain of a change
Raises:
EmailSendingError
Returns:
Boolean indicating if all messages were sent successfully.
"""
all_emails_sent = True
# Get each domain manager from list
user_domain_roles = UserDomainRole.objects.filter(domain=domain)
for user_domain_role in user_domain_roles:
@ -75,10 +84,12 @@ def send_emails_to_domain_managers(email: str, requestor_email, domain: Domain,
"date": date.today(),
},
)
except EmailSendingError as err:
raise EmailSendingError(
f"Could not send email manager notification to {user.email} for domain: {domain.name}"
) from err
except EmailSendingError:
logger.warning(
f"Could not send email manager notification to {user.email} for domain: {domain.name}", exc_info=True
)
all_emails_sent = False
return all_emails_sent
def normalize_domains(domains: Domain | list[Domain]) -> list[Domain]:

View file

@ -1255,24 +1255,26 @@ class DomainAddUserView(DomainFormBaseView):
def _handle_new_user_invitation(self, email, requestor, member_of_different_org):
"""Handle invitation for a new user who does not exist in the system."""
send_domain_invitation_email(
if not send_domain_invitation_email(
email=email,
requestor=requestor,
domains=self.object,
is_member_of_different_org=member_of_different_org,
)
):
messages.warning(self.request, "Could not send email confirmation to existing domain managers.")
DomainInvitation.objects.get_or_create(email=email, domain=self.object)
messages.success(self.request, f"{email} has been invited to the domain: {self.object}")
def _handle_existing_user(self, email, requestor, requested_user, member_of_different_org):
"""Handle adding an existing user to the domain."""
send_domain_invitation_email(
if not send_domain_invitation_email(
email=email,
requestor=requestor,
domains=self.object,
is_member_of_different_org=member_of_different_org,
requested_user=requested_user,
)
):
messages.warning(self.request, "Could not send email confirmation to existing domain managers.")
UserDomainRole.objects.create(
user=requested_user,
domain=self.object,

View file

@ -303,13 +303,14 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V
# get added_domains from ids to pass to send email method and bulk create
added_domains = Domain.objects.filter(id__in=added_domain_ids)
member_of_a_different_org, _ = get_org_membership(portfolio, member.email, member)
send_domain_invitation_email(
if not send_domain_invitation_email(
email=member.email,
requestor=requestor,
domains=added_domains,
is_member_of_different_org=member_of_a_different_org,
requested_user=member,
)
):
messages.warning(self.request, "Could not send email confirmation to existing domain managers.")
# Bulk create UserDomainRole instances for added domains
UserDomainRole.objects.bulk_create(
[
@ -525,12 +526,13 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission
# get added_domains from ids to pass to send email method and bulk create
added_domains = Domain.objects.filter(id__in=added_domain_ids)
member_of_a_different_org, _ = get_org_membership(portfolio, email, None)
send_domain_invitation_email(
if not send_domain_invitation_email(
email=email,
requestor=requestor,
domains=added_domains,
is_member_of_different_org=member_of_a_different_org,
)
):
messages.warning(self.request, "Could not send email confirmation to existing domain managers.")
# Update existing invitations from CANCELED to INVITED
existing_invitations = DomainInvitation.objects.filter(domain__in=added_domains, email=email)
@ -807,7 +809,7 @@ class PortfolioAddMemberView(PortfolioMembersPermissionView, FormMixin):
portfolio,
exc_info=True,
)
messages.warning(self.request, "Could not send email invitation.")
messages.warning(self.request, "Could not send portfolio email invitation.")
elif isinstance(exception, MissingEmailError):
messages.error(self.request, str(exception))
logger.error(
@ -816,4 +818,4 @@ class PortfolioAddMemberView(PortfolioMembersPermissionView, FormMixin):
)
else:
logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
messages.warning(self.request, "Could not send email invitation.")
messages.warning(self.request, "Could not send portfolio email invitation.")

View file

@ -3,7 +3,6 @@ from django.db import IntegrityError
from registrar.models import PortfolioInvitation, User, UserPortfolioPermission
from registrar.utility.email import EmailSendingError
import logging
from registrar.utility.errors import (
AlreadyDomainInvitedError,
AlreadyDomainManagerError,
@ -61,20 +60,19 @@ def get_requested_user(email):
def handle_invitation_exceptions(request, exception, email):
"""Handle exceptions raised during the process."""
if isinstance(exception, EmailSendingError):
logger.warning(str(exception), exc_info=True)
logger.warning(exception, exc_info=True)
messages.error(request, str(exception))
elif isinstance(exception, MissingEmailError):
messages.error(request, str(exception))
logger.error(str(exception), exc_info=True)
logger.error(exception, exc_info=True)
elif isinstance(exception, OutsideOrgMemberError):
messages.error(request, str(exception))
logger.warning(str(exception), exc_info=True)
elif isinstance(exception, AlreadyDomainManagerError):
messages.warning(request, str(exception))
messages.error(request, str(exception))
elif isinstance(exception, AlreadyDomainInvitedError):
messages.warning(request, str(exception))
messages.error(request, str(exception))
elif isinstance(exception, IntegrityError):
messages.warning(request, f"{email} is already a manager for this domain")
messages.error(request, f"{email} is already a manager for this domain")
else:
logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
messages.warning(request, "Could not send email invitation.")
messages.error(request, "Could not send email invitation.")