diff --git a/src/registrar/admin.py b/src/registrar/admin.py index 1b6e2de6a..e89147b11 100644 --- a/src/registrar/admin.py +++ b/src/registrar/admin.py @@ -1367,6 +1367,8 @@ class UserDomainRoleAdmin(ListHeaderAdmin, ImportExportModelAdmin): autocomplete_fields = ["user", "domain"] + change_form_template = "django/admin/user_domain_role_change_form.html" + # Fixes a bug where non-superusers are redirected to the main page def delete_view(self, request, object_id, extra_context=None): """Custom delete_view implementation that specifies redirect behaviour""" @@ -1500,7 +1502,7 @@ class DomainInvitationAdmin(BaseInvitationAdmin): autocomplete_fields = ["domain"] - change_form_template = "django/admin/email_clipboard_change_form.html" + change_form_template = "django/admin/domain_invitation_change_form.html" # Select domain invitations to change -> Domain invitations def changelist_view(self, request, extra_context=None): diff --git a/src/registrar/templates/django/admin/domain_invitation_change_form.html b/src/registrar/templates/django/admin/domain_invitation_change_form.html new file mode 100644 index 000000000..6ce6ed0d1 --- /dev/null +++ b/src/registrar/templates/django/admin/domain_invitation_change_form.html @@ -0,0 +1,14 @@ +{% extends 'django/admin/email_clipboard_change_form.html' %} +{% load custom_filters %} +{% load i18n static %} + +{% block content_subtitle %} +
+
+

+ If you add someone to a domain here, it will trigger emails to the invitee and all managers of the domain when you click "save." If you don't want to trigger those emails, use the User domain roles permissions table instead. +

+
+
+ {{ block.super }} +{% endblock %} diff --git a/src/registrar/templates/django/admin/user_domain_role_change_form.html b/src/registrar/templates/django/admin/user_domain_role_change_form.html new file mode 100644 index 000000000..d8c298bc1 --- /dev/null +++ b/src/registrar/templates/django/admin/user_domain_role_change_form.html @@ -0,0 +1,14 @@ +{% extends 'django/admin/email_clipboard_change_form.html' %} +{% load custom_filters %} +{% load i18n static %} + +{% block content_subtitle %} +
+
+

+ If you add someone to a domain here, it will not trigger any emails. To trigger emails, use the User Domain Role invitations table instead. +

+
+
+ {{ block.super }} +{% endblock %} diff --git a/src/registrar/templates/emails/domain_manager_notification.txt b/src/registrar/templates/emails/domain_manager_notification.txt new file mode 100644 index 000000000..aa8c6bf34 --- /dev/null +++ b/src/registrar/templates/emails/domain_manager_notification.txt @@ -0,0 +1,43 @@ +{% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #} +Hi,{% if domain_manager and domain_manager.first_name %} {{ domain_manager.first_name }}.{% endif %} + +A domain manager was invited to {{ domain.name }}. +DOMAIN: {{ domain.name }} +INVITED BY: {{ requestor_email }} +INVITED ON: {{date}} +MANAGER INVITED: {{ invited_email_address }} + + +---------------------------------------------------------------- + + +NEXT STEPS + +The person who received the invitation will become a domain manager once they log in to the +.gov registrar. They'll need to access the registrar using a Login.gov account that's +associated with the invited email address. + +If you need to cancel this invitation or remove the domain manager (because they've already +logged in), you can do that by going to this domain in the .gov registrar . + + +WHY DID YOU RECEIVE THIS EMAIL? + +You’re listed as a domain manager for {{ domain.name }}, so you’ll receive a notification whenever +someone is invited to manage that domain. + +If you have questions or concerns, reach out to the person who sent the invitation or reply to this email. + + +THANK YOU +.Gov helps the public identify official, trusted information. Thank you for using a .gov domain. + +---------------------------------------------------------------- + +The .gov team +Contact us: +Learn about .gov + +The .gov registry is a part of the Cybersecurity and Infrastructure Security Agency +(CISA) +{% endautoescape %} diff --git a/src/registrar/templates/emails/domain_manager_notification_subject.txt b/src/registrar/templates/emails/domain_manager_notification_subject.txt new file mode 100644 index 000000000..0e9918de0 --- /dev/null +++ b/src/registrar/templates/emails/domain_manager_notification_subject.txt @@ -0,0 +1 @@ +A domain manager was invited to {{ domain.name }} \ No newline at end of file diff --git a/src/registrar/tests/test_admin.py b/src/registrar/tests/test_admin.py index 2a7a52a13..036e35a50 100644 --- a/src/registrar/tests/test_admin.py +++ b/src/registrar/tests/test_admin.py @@ -166,6 +166,29 @@ class TestDomainInvitationAdmin(TestCase): ) self.assertContains(response, "Show more") + @less_console_noise_decorator + def test_has_change_form_description(self): + """Tests if this model has a model description on the change form view""" + self.client.force_login(self.superuser) + + domain, _ = Domain.objects.get_or_create(name="systemofadown.com") + + domain_invitation, _ = DomainInvitation.objects.get_or_create(email="toxicity@systemofadown.com", domain=domain) + + response = self.client.get( + "/admin/registrar/domaininvitation/{}/change/".format(domain_invitation.pk), + follow=True, + ) + + # Make sure that the page is loaded correctly + self.assertEqual(response.status_code, 200) + + # Test for a description snippet + self.assertContains( + response, + "If you add someone to a domain here, it will trigger emails to the invitee and all managers of the domain", + ) + @less_console_noise_decorator def test_get_filters(self): """Ensures that our filters are displaying correctly""" @@ -1957,6 +1980,31 @@ class TestUserDomainRoleAdmin(TestCase): ) self.assertContains(response, "Show more") + @less_console_noise_decorator + def test_has_change_form_description(self): + """Tests if this model has a model description on the change form view""" + self.client.force_login(self.superuser) + + domain, _ = Domain.objects.get_or_create(name="systemofadown.com") + + user_domain_role, _ = UserDomainRole.objects.get_or_create( + user=self.superuser, domain=domain, role=[UserDomainRole.Roles.MANAGER] + ) + + response = self.client.get( + "/admin/registrar/userdomainrole/{}/change/".format(user_domain_role.pk), + follow=True, + ) + + # Make sure that the page is loaded correctly + self.assertEqual(response.status_code, 200) + + # Test for a description snippet + self.assertContains( + response, + "If you add someone to a domain here, it will not trigger any emails.", + ) + def test_domain_sortable(self): """Tests if the UserDomainrole sorts by domain correctly""" with less_console_noise(): @@ -3442,7 +3490,7 @@ class TestTransferUser(WebTest): @less_console_noise_decorator def test_transfer_user_transfers_user_portfolio_roles_no_error_when_duplicates(self): - """Assert that duplicate portfolio user roles do not throw errorsd""" + """Assert that duplicate portfolio user roles do not throw errors""" portfolio1 = Portfolio.objects.create(organization_name="Hotel California", creator=self.user2) UserPortfolioPermission.objects.create( user=self.user1, portfolio=portfolio1, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] @@ -3574,7 +3622,7 @@ class TestTransferUser(WebTest): with self.assertRaises(User.DoesNotExist): self.user2.refresh_from_db() - @less_console_noise_decorator + # @less_console_noise_decorator def test_transfer_user_throws_transfer_and_delete_success_messages(self): """Test that success messages for data transfer and user deletion are displayed.""" # Ensure the setup for VerifiedByStaff @@ -3592,11 +3640,13 @@ class TestTransferUser(WebTest): self.assertContains(after_submit, "

Change user

") + print(mock_success_message.call_args_list) + mock_success_message.assert_any_call( ANY, ( "Data transferred successfully for the following objects: ['Changed requestor " - + 'from "Furiosa Jabassa " to "Max Rokatanski " on immortan.joe@citadel.com\']' + + "from Furiosa Jabassa to Max Rokatanski on immortan.joe@citadel.com']" ), ) @@ -3606,7 +3656,7 @@ class TestTransferUser(WebTest): def test_transfer_user_throws_error_message(self): """Test that an error message is thrown if the transfer fails.""" with patch( - "registrar.views.TransferUserView.transfer_user_fields_and_log", side_effect=Exception("Simulated Error") + "registrar.views.TransferUserView.transfer_related_fields_and_log", side_effect=Exception("Simulated Error") ): with patch("django.contrib.messages.error") as mock_error: # Access the transfer user page diff --git a/src/registrar/tests/test_email_invitations.py b/src/registrar/tests/test_email_invitations.py new file mode 100644 index 000000000..87384d3be --- /dev/null +++ b/src/registrar/tests/test_email_invitations.py @@ -0,0 +1,311 @@ +import unittest +from unittest.mock import patch, MagicMock +from datetime import date +from registrar.utility.email import EmailSendingError +from registrar.utility.email_invitations import send_domain_invitation_email + +from api.tests.common import less_console_noise_decorator + + +class DomainInvitationEmail(unittest.TestCase): + + @less_console_noise_decorator + @patch("registrar.utility.email_invitations.send_templated_email") + @patch("registrar.utility.email_invitations.UserDomainRole.objects.filter") + @patch("registrar.utility.email_invitations.validate_invitation") + @patch("registrar.utility.email_invitations.get_requestor_email") + @patch("registrar.utility.email_invitations.send_invitation_email") + @patch("registrar.utility.email_invitations.normalize_domains") + def test_send_domain_invitation_email( + self, + mock_normalize_domains, + mock_send_invitation_email, + mock_get_requestor_email, + mock_validate_invitation, + mock_user_domain_role_filter, + mock_send_templated_email, + ): + """Test sending domain invitation email for one domain. + Should also send emails to manager of that domain. + """ + # Setup + mock_domain = MagicMock(name="domain1") + mock_domain.name = "example.com" + mock_normalize_domains.return_value = [mock_domain] + + mock_requestor = MagicMock() + mock_requestor_email = "requestor@example.com" + mock_get_requestor_email.return_value = mock_requestor_email + + mock_user1 = MagicMock() + mock_user1.email = "manager1@example.com" + + mock_user_domain_role_filter.return_value = [MagicMock(user=mock_user1)] + + email = "invitee@example.com" + is_member_of_different_org = False + + # Call the function + send_domain_invitation_email( + email=email, + requestor=mock_requestor, + domains=mock_domain, + is_member_of_different_org=is_member_of_different_org, + ) + + # Assertions + mock_normalize_domains.assert_called_once_with(mock_domain) + mock_get_requestor_email.assert_called_once_with(mock_requestor, [mock_domain]) + mock_validate_invitation.assert_called_once_with( + email, [mock_domain], mock_requestor, is_member_of_different_org + ) + mock_send_invitation_email.assert_called_once_with(email, mock_requestor_email, [mock_domain], None) + mock_user_domain_role_filter.assert_called_once_with(domain=mock_domain) + mock_send_templated_email.assert_called_once_with( + "emails/domain_manager_notification.txt", + "emails/domain_manager_notification_subject.txt", + to_address=mock_user1.email, + context={ + "domain": mock_domain, + "requestor_email": mock_requestor_email, + "invited_email_address": email, + "domain_manager": mock_user1, + "date": date.today(), + }, + ) + + @less_console_noise_decorator + @patch("registrar.utility.email_invitations.send_templated_email") + @patch("registrar.utility.email_invitations.UserDomainRole.objects.filter") + @patch("registrar.utility.email_invitations.validate_invitation") + @patch("registrar.utility.email_invitations.get_requestor_email") + @patch("registrar.utility.email_invitations.send_invitation_email") + @patch("registrar.utility.email_invitations.normalize_domains") + def test_send_domain_invitation_email_multiple_domains( + self, + mock_normalize_domains, + mock_send_invitation_email, + mock_get_requestor_email, + mock_validate_invitation, + mock_user_domain_role_filter, + mock_send_templated_email, + ): + """Test sending domain invitation email for multiple domains. + Should also send emails to managers of each domain. + """ + # Setup + # Create multiple mock domains + mock_domain1 = MagicMock(name="domain1") + mock_domain1.name = "example.com" + mock_domain2 = MagicMock(name="domain2") + mock_domain2.name = "example.org" + + mock_normalize_domains.return_value = [mock_domain1, mock_domain2] + + mock_requestor = MagicMock() + mock_requestor_email = "requestor@example.com" + mock_get_requestor_email.return_value = mock_requestor_email + + mock_user1 = MagicMock() + mock_user1.email = "manager1@example.com" + mock_user2 = MagicMock() + mock_user2.email = "manager2@example.com" + + # Configure domain roles for each domain + def filter_side_effect(domain): + if domain == mock_domain1: + return [MagicMock(user=mock_user1)] + elif domain == mock_domain2: + return [MagicMock(user=mock_user2)] + return [] + + mock_user_domain_role_filter.side_effect = filter_side_effect + + email = "invitee@example.com" + is_member_of_different_org = False + + # Call the function + send_domain_invitation_email( + email=email, + requestor=mock_requestor, + domains=[mock_domain1, mock_domain2], + is_member_of_different_org=is_member_of_different_org, + ) + + # Assertions + mock_normalize_domains.assert_called_once_with([mock_domain1, mock_domain2]) + mock_get_requestor_email.assert_called_once_with(mock_requestor, [mock_domain1, mock_domain2]) + mock_validate_invitation.assert_called_once_with( + email, [mock_domain1, mock_domain2], mock_requestor, is_member_of_different_org + ) + mock_send_invitation_email.assert_called_once_with( + email, mock_requestor_email, [mock_domain1, mock_domain2], None + ) + + # Check that domain manager emails were sent for both domains + mock_user_domain_role_filter.assert_any_call(domain=mock_domain1) + mock_user_domain_role_filter.assert_any_call(domain=mock_domain2) + + mock_send_templated_email.assert_any_call( + "emails/domain_manager_notification.txt", + "emails/domain_manager_notification_subject.txt", + to_address=mock_user1.email, + context={ + "domain": mock_domain1, + "requestor_email": mock_requestor_email, + "invited_email_address": email, + "domain_manager": mock_user1, + "date": date.today(), + }, + ) + mock_send_templated_email.assert_any_call( + "emails/domain_manager_notification.txt", + "emails/domain_manager_notification_subject.txt", + to_address=mock_user2.email, + context={ + "domain": mock_domain2, + "requestor_email": mock_requestor_email, + "invited_email_address": email, + "domain_manager": mock_user2, + "date": date.today(), + }, + ) + + # Verify the total number of calls to send_templated_email + self.assertEqual(mock_send_templated_email.call_count, 2) + + @less_console_noise_decorator + @patch("registrar.utility.email_invitations.validate_invitation") + def test_send_domain_invitation_email_raises_invite_validation_exception(self, mock_validate_invitation): + """Test sending domain invitation email for one domain and assert exception + when invite validation fails. + """ + # Setup + mock_validate_invitation.side_effect = ValueError("Validation failed") + email = "invitee@example.com" + requestor = MagicMock() + domain = MagicMock() + + # Call and assert exception + with self.assertRaises(ValueError) as context: + send_domain_invitation_email(email, requestor, domain, is_member_of_different_org=False) + + self.assertEqual(str(context.exception), "Validation failed") + mock_validate_invitation.assert_called_once() + + @less_console_noise_decorator + @patch("registrar.utility.email_invitations.get_requestor_email") + def test_send_domain_invitation_email_raises_get_requestor_email_exception(self, mock_get_requestor_email): + """Test sending domain invitation email for one domain and assert exception + when get_requestor_email fails. + """ + # Setup + mock_get_requestor_email.side_effect = ValueError("Validation failed") + email = "invitee@example.com" + requestor = MagicMock() + domain = MagicMock() + + # Call and assert exception + with self.assertRaises(ValueError) as context: + send_domain_invitation_email(email, requestor, domain, is_member_of_different_org=False) + + self.assertEqual(str(context.exception), "Validation failed") + mock_get_requestor_email.assert_called_once() + + @less_console_noise_decorator + @patch("registrar.utility.email_invitations.validate_invitation") + @patch("registrar.utility.email_invitations.get_requestor_email") + @patch("registrar.utility.email_invitations.send_invitation_email") + @patch("registrar.utility.email_invitations.normalize_domains") + def test_send_domain_invitation_email_raises_sending_email_exception( + self, + mock_normalize_domains, + mock_send_invitation_email, + mock_get_requestor_email, + mock_validate_invitation, + ): + """Test sending domain invitation email for one domain and assert exception + when send_invitation_email fails. + """ + # Setup + mock_domain = MagicMock(name="domain1") + mock_domain.name = "example.com" + mock_normalize_domains.return_value = [mock_domain] + + mock_requestor = MagicMock() + mock_requestor_email = "requestor@example.com" + mock_get_requestor_email.return_value = mock_requestor_email + + mock_user1 = MagicMock() + mock_user1.email = "manager1@example.com" + + email = "invitee@example.com" + is_member_of_different_org = False + + mock_send_invitation_email.side_effect = EmailSendingError("Error sending email") + + # Call and assert exception + with self.assertRaises(EmailSendingError) as context: + send_domain_invitation_email( + email=email, + requestor=mock_requestor, + domains=mock_domain, + is_member_of_different_org=is_member_of_different_org, + ) + + # Assertions + mock_normalize_domains.assert_called_once_with(mock_domain) + mock_get_requestor_email.assert_called_once_with(mock_requestor, [mock_domain]) + mock_validate_invitation.assert_called_once_with( + email, [mock_domain], mock_requestor, is_member_of_different_org + ) + self.assertEqual(str(context.exception), "Error sending email") + + @less_console_noise_decorator + @patch("registrar.utility.email_invitations.send_emails_to_domain_managers") + @patch("registrar.utility.email_invitations.validate_invitation") + @patch("registrar.utility.email_invitations.get_requestor_email") + @patch("registrar.utility.email_invitations.send_invitation_email") + @patch("registrar.utility.email_invitations.normalize_domains") + def test_send_domain_invitation_email_manager_emails_send_mail_exception( + self, + mock_normalize_domains, + mock_send_invitation_email, + mock_get_requestor_email, + mock_validate_invitation, + mock_send_domain_manager_emails, + ): + """Test sending domain invitation email for one domain and assert exception + when send_emails_to_domain_managers fails. + """ + # Setup + mock_domain = MagicMock(name="domain1") + mock_domain.name = "example.com" + mock_normalize_domains.return_value = [mock_domain] + + mock_requestor = MagicMock() + mock_requestor_email = "requestor@example.com" + mock_get_requestor_email.return_value = mock_requestor_email + + email = "invitee@example.com" + is_member_of_different_org = False + + mock_send_domain_manager_emails.side_effect = EmailSendingError("Error sending email") + + # Call and assert exception + with self.assertRaises(EmailSendingError) as context: + send_domain_invitation_email( + email=email, + requestor=mock_requestor, + domains=mock_domain, + is_member_of_different_org=is_member_of_different_org, + ) + + # Assertions + mock_normalize_domains.assert_called_once_with(mock_domain) + mock_get_requestor_email.assert_called_once_with(mock_requestor, [mock_domain]) + mock_validate_invitation.assert_called_once_with( + email, [mock_domain], mock_requestor, is_member_of_different_org + ) + mock_send_invitation_email.assert_called_once_with(email, mock_requestor_email, [mock_domain], None) + self.assertEqual(str(context.exception), "Error sending email") diff --git a/src/registrar/tests/test_views_domain.py b/src/registrar/tests/test_views_domain.py index f13490312..45758e502 100644 --- a/src/registrar/tests/test_views_domain.py +++ b/src/registrar/tests/test_views_domain.py @@ -9,7 +9,7 @@ from registrar.utility.email import EmailSendingError from waffle.testutils import override_flag from api.tests.common import less_console_noise_decorator from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices -from .common import MockEppLib, MockSESClient, create_user # type: ignore +from .common import MockEppLib, create_user # type: ignore from django_webtest import WebTest # type: ignore import boto3_mocking # type: ignore @@ -750,11 +750,12 @@ class TestDomainManagers(TestDomainOverview): response = self.client.get(reverse("domain-users-add", kwargs={"pk": self.domain.id})) self.assertContains(response, "Add a domain manager") - @boto3_mocking.patching @less_console_noise_decorator - def test_domain_user_add_form(self): + @patch("registrar.views.domain.send_domain_invitation_email") + def test_domain_user_add_form(self, mock_send_domain_email): """Adding an existing user works.""" get_user_model().objects.get_or_create(email="mayor@igorville.gov") + user = User.objects.filter(email="mayor@igorville.gov").first() add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id})) session_id = self.app.cookies[settings.SESSION_COOKIE_NAME] @@ -762,10 +763,15 @@ class TestDomainManagers(TestDomainOverview): self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) - mock_client = MockSESClient() - with boto3_mocking.clients.handler_for("sesv2", mock_client): - with less_console_noise(): - success_result = add_page.form.submit() + success_result = add_page.form.submit() + + mock_send_domain_email.assert_called_once_with( + email="mayor@igorville.gov", + requestor=self.user, + domains=self.domain, + is_member_of_different_org=None, + requested_user=user, + ) self.assertEqual(success_result.status_code, 302) self.assertEqual( @@ -974,13 +980,13 @@ class TestDomainManagers(TestDomainOverview): success_page = success_result.follow() self.assertContains(success_page, "Failed to send email.") - @boto3_mocking.patching @less_console_noise_decorator - def test_domain_invitation_created(self): + @patch("registrar.views.domain.send_domain_invitation_email") + def test_domain_invitation_created(self, mock_send_domain_email): """Add user on a nonexistent email creates an invitation. Adding a non-existent user sends an email as a side-effect, so mock - out the boto3 SES email sending here. + out send_domain_invitation_email here. """ # make sure there is no user with this email email_address = "mayor@igorville.gov" @@ -993,10 +999,11 @@ class TestDomainManagers(TestDomainOverview): add_page.form["email"] = email_address self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) - mock_client = MockSESClient() - with boto3_mocking.clients.handler_for("sesv2", mock_client): - with less_console_noise(): - success_result = add_page.form.submit() + success_result = add_page.form.submit() + + mock_send_domain_email.assert_called_once_with( + email="mayor@igorville.gov", requestor=self.user, domains=self.domain, is_member_of_different_org=None + ) self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) success_page = success_result.follow() @@ -1005,13 +1012,13 @@ class TestDomainManagers(TestDomainOverview): self.assertContains(success_page, "Cancel") # link to cancel invitation self.assertTrue(DomainInvitation.objects.filter(email=email_address).exists()) - @boto3_mocking.patching @less_console_noise_decorator - def test_domain_invitation_created_for_caps_email(self): + @patch("registrar.views.domain.send_domain_invitation_email") + def test_domain_invitation_created_for_caps_email(self, mock_send_domain_email): """Add user on a nonexistent email with CAPS creates an invitation to lowercase email. Adding a non-existent user sends an email as a side-effect, so mock - out the boto3 SES email sending here. + out send_domain_invitation_email here. """ # make sure there is no user with this email email_address = "mayor@igorville.gov" @@ -1025,9 +1032,11 @@ class TestDomainManagers(TestDomainOverview): add_page.form["email"] = caps_email_address self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) - mock_client = MockSESClient() - with boto3_mocking.clients.handler_for("sesv2", mock_client): - success_result = add_page.form.submit() + success_result = add_page.form.submit() + + mock_send_domain_email.assert_called_once_with( + email="mayor@igorville.gov", requestor=self.user, domains=self.domain, is_member_of_different_org=None + ) self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) success_page = success_result.follow() diff --git a/src/registrar/utility/db_helpers.py b/src/registrar/utility/db_helpers.py new file mode 100644 index 000000000..5b7e0392c --- /dev/null +++ b/src/registrar/utility/db_helpers.py @@ -0,0 +1,20 @@ +from contextlib import contextmanager +from django.db import transaction, IntegrityError +from psycopg2 import errorcodes + + +@contextmanager +def ignore_unique_violation(): + """ + Execute within an atomic transaction so that if a unique constraint violation occurs, + the individual transaction is rolled back without invalidating any larger transaction. + """ + with transaction.atomic(): + try: + yield + except IntegrityError as e: + if e.__cause__.pgcode == errorcodes.UNIQUE_VIOLATION: + # roll back to the savepoint, effectively ignoring this transaction + pass + else: + raise e diff --git a/src/registrar/utility/email_invitations.py b/src/registrar/utility/email_invitations.py index 48c796340..25e9db0f3 100644 --- a/src/registrar/utility/email_invitations.py +++ b/src/registrar/utility/email_invitations.py @@ -1,6 +1,6 @@ +from datetime import date from django.conf import settings -from registrar.models import DomainInvitation -from registrar.models.domain import Domain +from registrar.models import Domain, DomainInvitation, UserDomainRole from registrar.utility.errors import ( AlreadyDomainInvitedError, AlreadyDomainManagerError, @@ -41,8 +41,47 @@ def send_domain_invitation_email( send_invitation_email(email, requestor_email, domains, requested_user) + # send emails to domain managers + for domain in domains: + send_emails_to_domain_managers( + email=email, + requestor_email=requestor_email, + domain=domain, + requested_user=requested_user, + ) -def normalize_domains(domains): + +def send_emails_to_domain_managers(email: str, requestor_email, domain: Domain, requested_user=None): + """ + Notifies all domain managers of the provided domain of a change + Raises: + EmailSendingError + """ + # Get each domain manager from list + user_domain_roles = UserDomainRole.objects.filter(domain=domain) + for user_domain_role in user_domain_roles: + # Send email to each domain manager + user = user_domain_role.user + try: + send_templated_email( + "emails/domain_manager_notification.txt", + "emails/domain_manager_notification_subject.txt", + to_address=user.email, + context={ + "domain": domain, + "requestor_email": requestor_email, + "invited_email_address": email, + "domain_manager": user, + "date": date.today(), + }, + ) + except EmailSendingError as err: + raise EmailSendingError( + f"Could not send email manager notification to {user.email} for domain: {domain.name}" + ) from err + + +def normalize_domains(domains: Domain | list[Domain]) -> list[Domain]: """Ensures domains is always a list.""" return [domains] if isinstance(domains, Domain) else domains @@ -69,6 +108,8 @@ def validate_invitation(email, domains, requestor, is_member_of_different_org): for domain in domains: validate_existing_invitation(email, domain) + # NOTE: should we also be validating against existing user_domain_roles + def check_outside_org_membership(email, requestor, is_member_of_different_org): """Raise an error if the email belongs to a different organization.""" diff --git a/src/registrar/views/transfer_user.py b/src/registrar/views/transfer_user.py index 69a0f4997..f574b76d9 100644 --- a/src/registrar/views/transfer_user.py +++ b/src/registrar/views/transfer_user.py @@ -1,19 +1,19 @@ import logging +from django.db import transaction +from django.db.models import ForeignKey, OneToOneField, ManyToManyField, ManyToOneRel, ManyToManyRel, OneToOneRel from django.shortcuts import render, get_object_or_404, redirect from django.views import View from registrar.models.domain import Domain -from registrar.models.domain_information import DomainInformation from registrar.models.domain_request import DomainRequest -from registrar.models.portfolio import Portfolio from registrar.models.user import User from django.contrib.admin import site from django.contrib import messages from registrar.models.user_domain_role import UserDomainRole from registrar.models.user_portfolio_permission import UserPortfolioPermission -from registrar.models.verified_by_staff import VerifiedByStaff -from typing import Any, List + +from registrar.utility.db_helpers import ignore_unique_violation logger = logging.getLogger(__name__) @@ -21,22 +21,8 @@ logger = logging.getLogger(__name__) class TransferUserView(View): """Transfer user methods that set up the transfer_user template and handle the forms on it.""" - JOINS = [ - (DomainRequest, "creator"), - (DomainInformation, "creator"), - (Portfolio, "creator"), - (DomainRequest, "investigator"), - (UserDomainRole, "user"), - (VerifiedByStaff, "requestor"), - (UserPortfolioPermission, "user"), - ] - - # Future-proofing in case joined fields get added on the user model side - # This was tested in the first portfolio model iteration and works - USER_FIELDS: List[Any] = [] - def get(self, request, user_id): - """current_user referes to the 'source' user where the button that redirects to this view was clicked. + """current_user refers to the 'source' user where the button that redirects to this view was clicked. other_users exclude current_user and populate a dropdown, selected_user is the selection in the dropdown. This also querries the relevant domains and domain requests, and the admin context needed for the sidenav.""" @@ -70,86 +56,122 @@ class TransferUserView(View): return render(request, "admin/transfer_user.html", context) def post(self, request, user_id): - """This handles the transfer from selected_user to current_user then deletes selected_user. - - NOTE: We have a ticket to refactor this into a more solid lookup for related fields in #2645""" - + """This handles the transfer from selected_user to current_user then deletes selected_user.""" current_user = get_object_or_404(User, pk=user_id) selected_user_id = request.POST.get("selected_user") selected_user = get_object_or_404(User, pk=selected_user_id) try: - change_logs = [] + # Make this atomic so that we don't get any partial transfers + with transaction.atomic(): + change_logs = [] - # Transfer specific fields - self.transfer_user_fields_and_log(selected_user, current_user, change_logs) + # Dynamically handle related fields + self.transfer_related_fields_and_log(selected_user, current_user, change_logs) - # Perform the updates and log the changes - for model_class, field_name in self.JOINS: - self.update_joins_and_log(model_class, field_name, selected_user, current_user, change_logs) - - # Success message if any related objects were updated - if change_logs: - success_message = f"Data transferred successfully for the following objects: {change_logs}" - messages.success(request, success_message) - - selected_user.delete() - messages.success(request, f"Deleted {selected_user} {selected_user.username}") + # Success message if any related objects were updated + if change_logs: + success_message = f"Data transferred successfully for the following objects: {change_logs}" + messages.success(request, success_message) + selected_user.delete() + messages.success(request, f"Deleted {selected_user} {selected_user.username}") except Exception as e: messages.error(request, f"An error occurred during the transfer: {e}") + logger.error(f"An error occurred during the transfer: {e}", exc_info=True) return redirect("admin:registrar_user_change", object_id=user_id) - @classmethod - def update_joins_and_log(cls, model_class, field_name, selected_user, current_user, change_logs): + def transfer_related_fields_and_log(self, selected_user, current_user, change_logs): """ - Helper function to update the user join fields for a given model and log the changes. + Dynamically find all related fields to the User model and transfer them from selected_user to current_user. + Handles ForeignKey, OneToOneField, ManyToManyField, and ManyToOneRel relationships. """ + user_model = User - filter_kwargs = {field_name: selected_user} - updated_objects = model_class.objects.filter(**filter_kwargs) + for related_field in user_model._meta.get_fields(): + if related_field.is_relation: + # Field objects represent forward relationships + if isinstance(related_field, OneToOneField): + self._handle_one_to_one(related_field, selected_user, current_user, change_logs) + elif isinstance(related_field, ManyToManyField): + self._handle_many_to_many(related_field, selected_user, current_user, change_logs) + elif isinstance(related_field, ForeignKey): + self._handle_foreign_key(related_field, selected_user, current_user, change_logs) + # Relationship objects represent reverse relationships + elif isinstance(related_field, ManyToOneRel): + # ManyToOneRel is a reverse ForeignKey + self._handle_foreign_key_reverse(related_field, selected_user, current_user, change_logs) + elif isinstance(related_field, OneToOneRel): + self._handle_one_to_one_reverse(related_field, selected_user, current_user, change_logs) + elif isinstance(related_field, ManyToManyRel): + self._handle_many_to_many_reverse(related_field, selected_user, current_user, change_logs) + else: + logger.error(f"Unknown relationship type for field {related_field}") + raise ValueError(f"Unknown relationship type for field {related_field}") - for obj in updated_objects: - # Check for duplicate UserDomainRole before updating - if model_class == UserDomainRole: - if model_class.objects.filter(user=current_user, domain=obj.domain).exists(): - continue # Skip the update to avoid a duplicate + def _handle_foreign_key_reverse(self, related_field: ManyToOneRel, selected_user, current_user, change_logs): + # Handle reverse ForeignKey relationships + related_manager = getattr(selected_user, related_field.get_accessor_name(), None) + if related_manager and related_manager.exists(): + for related_object in related_manager.all(): + with ignore_unique_violation(): + setattr(related_object, related_field.field.name, current_user) + related_object.save() + self.log_change(related_object, selected_user, current_user, related_field.field.name, change_logs) - if model_class == UserPortfolioPermission: - if model_class.objects.filter(user=current_user, portfolio=obj.portfolio).exists(): - continue # Skip the update to avoid a duplicate + def _handle_foreign_key(self, related_field: ForeignKey, selected_user, current_user, change_logs): + # Handle ForeignKey relationships + related_object = getattr(selected_user, related_field.name, None) + if related_object: + setattr(current_user, related_field.name, related_object) + current_user.save() + self.log_change(related_object, selected_user, current_user, related_field.name, change_logs) - # Update the field on the object and save it - setattr(obj, field_name, current_user) - obj.save() + def _handle_one_to_one(self, related_field: OneToOneField, selected_user, current_user, change_logs): + # Handle OneToOne relationship + related_object = getattr(selected_user, related_field.name, None) + if related_object: + with ignore_unique_violation(): + setattr(current_user, related_field.name, related_object) + current_user.save() + self.log_change(related_object, selected_user, current_user, related_field.name, change_logs) - # Log the change - cls.log_change(obj, field_name, selected_user, current_user, change_logs) + def _handle_many_to_many(self, related_field: ManyToManyField, selected_user, current_user, change_logs): + # Handle ManyToMany relationship + related_name = related_field.remote_field.name + related_manager = getattr(selected_user, related_name, None) + if related_manager and related_manager.exists(): + for instance in related_manager.all(): + with ignore_unique_violation(): + getattr(instance, related_name).remove(selected_user) + getattr(instance, related_name).add(current_user) + self.log_change(instance, selected_user, current_user, related_name, change_logs) + + def _handle_many_to_many_reverse(self, related_field: ManyToManyRel, selected_user, current_user, change_logs): + # Handle reverse relationship + related_name = related_field.field.name + related_manager = getattr(selected_user, related_name, None) + if related_manager and related_manager.exists(): + for instance in related_manager.all(): + with ignore_unique_violation(): + getattr(instance, related_name).remove(selected_user) + getattr(instance, related_name).add(current_user) + self.log_change(instance, selected_user, current_user, related_name, change_logs) + + def _handle_one_to_one_reverse(self, related_field: OneToOneRel, selected_user, current_user, change_logs): + # Handle reverse relationship + field_name = related_field.get_accessor_name() + related_instance = getattr(selected_user, field_name, None) + if related_instance: + setattr(related_instance, field_name, current_user) + related_instance.save() + self.log_change(related_instance, selected_user, current_user, field_name, change_logs) @classmethod - def transfer_user_fields_and_log(cls, selected_user, current_user, change_logs): - """ - Transfers portfolio fields from the selected_user to the current_user. - Logs the changes for each transferred field. - """ - for field in cls.USER_FIELDS: - field_value = getattr(selected_user, field, None) - - if field_value: - setattr(current_user, field, field_value) - cls.log_change(current_user, field, field_value, field_value, change_logs) - - current_user.save() - - @classmethod - def log_change(cls, obj, field_name, field_value, new_value, change_logs): - """Logs the change for a specific field on an object""" - log_entry = f'Changed {field_name} from "{field_value}" to "{new_value}" on {obj}' - + def log_change(cls, obj, selected_user, current_user, field_name, change_logs): + log_entry = f"Changed {field_name} from {selected_user} to {current_user} on {obj}" logger.info(log_entry) - - # Collect the related object for the success message change_logs.append(log_entry) @classmethod