Merge branch 'main' into za/3307-duplicate-key-bug

This commit is contained in:
zandercymatics 2025-01-21 14:36:08 -07:00
commit c0b7de45b4
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
11 changed files with 631 additions and 104 deletions

View file

@ -1367,6 +1367,8 @@ class UserDomainRoleAdmin(ListHeaderAdmin, ImportExportModelAdmin):
autocomplete_fields = ["user", "domain"]
change_form_template = "django/admin/user_domain_role_change_form.html"
# Fixes a bug where non-superusers are redirected to the main page
def delete_view(self, request, object_id, extra_context=None):
"""Custom delete_view implementation that specifies redirect behaviour"""
@ -1500,7 +1502,7 @@ class DomainInvitationAdmin(BaseInvitationAdmin):
autocomplete_fields = ["domain"]
change_form_template = "django/admin/email_clipboard_change_form.html"
change_form_template = "django/admin/domain_invitation_change_form.html"
# Select domain invitations to change -> Domain invitations
def changelist_view(self, request, extra_context=None):

View file

@ -0,0 +1,14 @@
{% extends 'django/admin/email_clipboard_change_form.html' %}
{% load custom_filters %}
{% load i18n static %}
{% block content_subtitle %}
<div class="usa-alert usa-alert--info usa-alert--slim">
<div class="usa-alert__body margin-left-1 maxw-none">
<p class="usa-alert__text maxw-none">
If you add someone to a domain here, it will trigger emails to the invitee and all managers of the domain when you click "save." If you don't want to trigger those emails, use the <a class="usa-link" href="{% url 'admin:registrar_userdomainrole_changelist' %}">User domain roles permissions table</a> instead.
</p>
</div>
</div>
{{ block.super }}
{% endblock %}

View file

@ -0,0 +1,14 @@
{% extends 'django/admin/email_clipboard_change_form.html' %}
{% load custom_filters %}
{% load i18n static %}
{% block content_subtitle %}
<div class="usa-alert usa-alert--info usa-alert--slim">
<div class="usa-alert__body margin-left-1 maxw-none">
<p class="usa-alert__text maxw-none">
If you add someone to a domain here, it will not trigger any emails. To trigger emails, use the <a class="usa-link" href="{% url 'admin:registrar_domaininvitation_changelist' %}">User Domain Role invitations table</a> instead.
</p>
</div>
</div>
{{ block.super }}
{% endblock %}

View file

@ -0,0 +1,43 @@
{% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #}
Hi,{% if domain_manager and domain_manager.first_name %} {{ domain_manager.first_name }}.{% endif %}
A domain manager was invited to {{ domain.name }}.
DOMAIN: {{ domain.name }}
INVITED BY: {{ requestor_email }}
INVITED ON: {{date}}
MANAGER INVITED: {{ invited_email_address }}
----------------------------------------------------------------
NEXT STEPS
The person who received the invitation will become a domain manager once they log in to the
.gov registrar. They'll need to access the registrar using a Login.gov account that's
associated with the invited email address.
If you need to cancel this invitation or remove the domain manager (because they've already
logged in), you can do that by going to this domain in the .gov registrar <https://manage.get.gov/>.
WHY DID YOU RECEIVE THIS EMAIL?
Youre listed as a domain manager for {{ domain.name }}, so youll receive a notification whenever
someone is invited to manage that domain.
If you have questions or concerns, reach out to the person who sent the invitation or reply to this email.
THANK YOU
.Gov helps the public identify official, trusted information. Thank you for using a .gov domain.
----------------------------------------------------------------
The .gov team
Contact us: <https://get.gov/contact/>
Learn about .gov <https://get.gov>
The .gov registry is a part of the Cybersecurity and Infrastructure Security Agency
(CISA) <https://cisa.gov/>
{% endautoescape %}

View file

@ -0,0 +1 @@
A domain manager was invited to {{ domain.name }}

View file

@ -166,6 +166,29 @@ class TestDomainInvitationAdmin(TestCase):
)
self.assertContains(response, "Show more")
@less_console_noise_decorator
def test_has_change_form_description(self):
"""Tests if this model has a model description on the change form view"""
self.client.force_login(self.superuser)
domain, _ = Domain.objects.get_or_create(name="systemofadown.com")
domain_invitation, _ = DomainInvitation.objects.get_or_create(email="toxicity@systemofadown.com", domain=domain)
response = self.client.get(
"/admin/registrar/domaininvitation/{}/change/".format(domain_invitation.pk),
follow=True,
)
# Make sure that the page is loaded correctly
self.assertEqual(response.status_code, 200)
# Test for a description snippet
self.assertContains(
response,
"If you add someone to a domain here, it will trigger emails to the invitee and all managers of the domain",
)
@less_console_noise_decorator
def test_get_filters(self):
"""Ensures that our filters are displaying correctly"""
@ -1957,6 +1980,31 @@ class TestUserDomainRoleAdmin(TestCase):
)
self.assertContains(response, "Show more")
@less_console_noise_decorator
def test_has_change_form_description(self):
"""Tests if this model has a model description on the change form view"""
self.client.force_login(self.superuser)
domain, _ = Domain.objects.get_or_create(name="systemofadown.com")
user_domain_role, _ = UserDomainRole.objects.get_or_create(
user=self.superuser, domain=domain, role=[UserDomainRole.Roles.MANAGER]
)
response = self.client.get(
"/admin/registrar/userdomainrole/{}/change/".format(user_domain_role.pk),
follow=True,
)
# Make sure that the page is loaded correctly
self.assertEqual(response.status_code, 200)
# Test for a description snippet
self.assertContains(
response,
"If you add someone to a domain here, it will not trigger any emails.",
)
def test_domain_sortable(self):
"""Tests if the UserDomainrole sorts by domain correctly"""
with less_console_noise():
@ -3442,7 +3490,7 @@ class TestTransferUser(WebTest):
@less_console_noise_decorator
def test_transfer_user_transfers_user_portfolio_roles_no_error_when_duplicates(self):
"""Assert that duplicate portfolio user roles do not throw errorsd"""
"""Assert that duplicate portfolio user roles do not throw errors"""
portfolio1 = Portfolio.objects.create(organization_name="Hotel California", creator=self.user2)
UserPortfolioPermission.objects.create(
user=self.user1, portfolio=portfolio1, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
@ -3574,7 +3622,7 @@ class TestTransferUser(WebTest):
with self.assertRaises(User.DoesNotExist):
self.user2.refresh_from_db()
@less_console_noise_decorator
# @less_console_noise_decorator
def test_transfer_user_throws_transfer_and_delete_success_messages(self):
"""Test that success messages for data transfer and user deletion are displayed."""
# Ensure the setup for VerifiedByStaff
@ -3592,11 +3640,13 @@ class TestTransferUser(WebTest):
self.assertContains(after_submit, "<h1>Change user</h1>")
print(mock_success_message.call_args_list)
mock_success_message.assert_any_call(
ANY,
(
"Data transferred successfully for the following objects: ['Changed requestor "
+ 'from "Furiosa Jabassa " to "Max Rokatanski " on immortan.joe@citadel.com\']'
+ "from Furiosa Jabassa to Max Rokatanski on immortan.joe@citadel.com']"
),
)
@ -3606,7 +3656,7 @@ class TestTransferUser(WebTest):
def test_transfer_user_throws_error_message(self):
"""Test that an error message is thrown if the transfer fails."""
with patch(
"registrar.views.TransferUserView.transfer_user_fields_and_log", side_effect=Exception("Simulated Error")
"registrar.views.TransferUserView.transfer_related_fields_and_log", side_effect=Exception("Simulated Error")
):
with patch("django.contrib.messages.error") as mock_error:
# Access the transfer user page

View file

@ -0,0 +1,311 @@
import unittest
from unittest.mock import patch, MagicMock
from datetime import date
from registrar.utility.email import EmailSendingError
from registrar.utility.email_invitations import send_domain_invitation_email
from api.tests.common import less_console_noise_decorator
class DomainInvitationEmail(unittest.TestCase):
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.send_templated_email")
@patch("registrar.utility.email_invitations.UserDomainRole.objects.filter")
@patch("registrar.utility.email_invitations.validate_invitation")
@patch("registrar.utility.email_invitations.get_requestor_email")
@patch("registrar.utility.email_invitations.send_invitation_email")
@patch("registrar.utility.email_invitations.normalize_domains")
def test_send_domain_invitation_email(
self,
mock_normalize_domains,
mock_send_invitation_email,
mock_get_requestor_email,
mock_validate_invitation,
mock_user_domain_role_filter,
mock_send_templated_email,
):
"""Test sending domain invitation email for one domain.
Should also send emails to manager of that domain.
"""
# Setup
mock_domain = MagicMock(name="domain1")
mock_domain.name = "example.com"
mock_normalize_domains.return_value = [mock_domain]
mock_requestor = MagicMock()
mock_requestor_email = "requestor@example.com"
mock_get_requestor_email.return_value = mock_requestor_email
mock_user1 = MagicMock()
mock_user1.email = "manager1@example.com"
mock_user_domain_role_filter.return_value = [MagicMock(user=mock_user1)]
email = "invitee@example.com"
is_member_of_different_org = False
# Call the function
send_domain_invitation_email(
email=email,
requestor=mock_requestor,
domains=mock_domain,
is_member_of_different_org=is_member_of_different_org,
)
# Assertions
mock_normalize_domains.assert_called_once_with(mock_domain)
mock_get_requestor_email.assert_called_once_with(mock_requestor, [mock_domain])
mock_validate_invitation.assert_called_once_with(
email, [mock_domain], mock_requestor, is_member_of_different_org
)
mock_send_invitation_email.assert_called_once_with(email, mock_requestor_email, [mock_domain], None)
mock_user_domain_role_filter.assert_called_once_with(domain=mock_domain)
mock_send_templated_email.assert_called_once_with(
"emails/domain_manager_notification.txt",
"emails/domain_manager_notification_subject.txt",
to_address=mock_user1.email,
context={
"domain": mock_domain,
"requestor_email": mock_requestor_email,
"invited_email_address": email,
"domain_manager": mock_user1,
"date": date.today(),
},
)
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.send_templated_email")
@patch("registrar.utility.email_invitations.UserDomainRole.objects.filter")
@patch("registrar.utility.email_invitations.validate_invitation")
@patch("registrar.utility.email_invitations.get_requestor_email")
@patch("registrar.utility.email_invitations.send_invitation_email")
@patch("registrar.utility.email_invitations.normalize_domains")
def test_send_domain_invitation_email_multiple_domains(
self,
mock_normalize_domains,
mock_send_invitation_email,
mock_get_requestor_email,
mock_validate_invitation,
mock_user_domain_role_filter,
mock_send_templated_email,
):
"""Test sending domain invitation email for multiple domains.
Should also send emails to managers of each domain.
"""
# Setup
# Create multiple mock domains
mock_domain1 = MagicMock(name="domain1")
mock_domain1.name = "example.com"
mock_domain2 = MagicMock(name="domain2")
mock_domain2.name = "example.org"
mock_normalize_domains.return_value = [mock_domain1, mock_domain2]
mock_requestor = MagicMock()
mock_requestor_email = "requestor@example.com"
mock_get_requestor_email.return_value = mock_requestor_email
mock_user1 = MagicMock()
mock_user1.email = "manager1@example.com"
mock_user2 = MagicMock()
mock_user2.email = "manager2@example.com"
# Configure domain roles for each domain
def filter_side_effect(domain):
if domain == mock_domain1:
return [MagicMock(user=mock_user1)]
elif domain == mock_domain2:
return [MagicMock(user=mock_user2)]
return []
mock_user_domain_role_filter.side_effect = filter_side_effect
email = "invitee@example.com"
is_member_of_different_org = False
# Call the function
send_domain_invitation_email(
email=email,
requestor=mock_requestor,
domains=[mock_domain1, mock_domain2],
is_member_of_different_org=is_member_of_different_org,
)
# Assertions
mock_normalize_domains.assert_called_once_with([mock_domain1, mock_domain2])
mock_get_requestor_email.assert_called_once_with(mock_requestor, [mock_domain1, mock_domain2])
mock_validate_invitation.assert_called_once_with(
email, [mock_domain1, mock_domain2], mock_requestor, is_member_of_different_org
)
mock_send_invitation_email.assert_called_once_with(
email, mock_requestor_email, [mock_domain1, mock_domain2], None
)
# Check that domain manager emails were sent for both domains
mock_user_domain_role_filter.assert_any_call(domain=mock_domain1)
mock_user_domain_role_filter.assert_any_call(domain=mock_domain2)
mock_send_templated_email.assert_any_call(
"emails/domain_manager_notification.txt",
"emails/domain_manager_notification_subject.txt",
to_address=mock_user1.email,
context={
"domain": mock_domain1,
"requestor_email": mock_requestor_email,
"invited_email_address": email,
"domain_manager": mock_user1,
"date": date.today(),
},
)
mock_send_templated_email.assert_any_call(
"emails/domain_manager_notification.txt",
"emails/domain_manager_notification_subject.txt",
to_address=mock_user2.email,
context={
"domain": mock_domain2,
"requestor_email": mock_requestor_email,
"invited_email_address": email,
"domain_manager": mock_user2,
"date": date.today(),
},
)
# Verify the total number of calls to send_templated_email
self.assertEqual(mock_send_templated_email.call_count, 2)
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.validate_invitation")
def test_send_domain_invitation_email_raises_invite_validation_exception(self, mock_validate_invitation):
"""Test sending domain invitation email for one domain and assert exception
when invite validation fails.
"""
# Setup
mock_validate_invitation.side_effect = ValueError("Validation failed")
email = "invitee@example.com"
requestor = MagicMock()
domain = MagicMock()
# Call and assert exception
with self.assertRaises(ValueError) as context:
send_domain_invitation_email(email, requestor, domain, is_member_of_different_org=False)
self.assertEqual(str(context.exception), "Validation failed")
mock_validate_invitation.assert_called_once()
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.get_requestor_email")
def test_send_domain_invitation_email_raises_get_requestor_email_exception(self, mock_get_requestor_email):
"""Test sending domain invitation email for one domain and assert exception
when get_requestor_email fails.
"""
# Setup
mock_get_requestor_email.side_effect = ValueError("Validation failed")
email = "invitee@example.com"
requestor = MagicMock()
domain = MagicMock()
# Call and assert exception
with self.assertRaises(ValueError) as context:
send_domain_invitation_email(email, requestor, domain, is_member_of_different_org=False)
self.assertEqual(str(context.exception), "Validation failed")
mock_get_requestor_email.assert_called_once()
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.validate_invitation")
@patch("registrar.utility.email_invitations.get_requestor_email")
@patch("registrar.utility.email_invitations.send_invitation_email")
@patch("registrar.utility.email_invitations.normalize_domains")
def test_send_domain_invitation_email_raises_sending_email_exception(
self,
mock_normalize_domains,
mock_send_invitation_email,
mock_get_requestor_email,
mock_validate_invitation,
):
"""Test sending domain invitation email for one domain and assert exception
when send_invitation_email fails.
"""
# Setup
mock_domain = MagicMock(name="domain1")
mock_domain.name = "example.com"
mock_normalize_domains.return_value = [mock_domain]
mock_requestor = MagicMock()
mock_requestor_email = "requestor@example.com"
mock_get_requestor_email.return_value = mock_requestor_email
mock_user1 = MagicMock()
mock_user1.email = "manager1@example.com"
email = "invitee@example.com"
is_member_of_different_org = False
mock_send_invitation_email.side_effect = EmailSendingError("Error sending email")
# Call and assert exception
with self.assertRaises(EmailSendingError) as context:
send_domain_invitation_email(
email=email,
requestor=mock_requestor,
domains=mock_domain,
is_member_of_different_org=is_member_of_different_org,
)
# Assertions
mock_normalize_domains.assert_called_once_with(mock_domain)
mock_get_requestor_email.assert_called_once_with(mock_requestor, [mock_domain])
mock_validate_invitation.assert_called_once_with(
email, [mock_domain], mock_requestor, is_member_of_different_org
)
self.assertEqual(str(context.exception), "Error sending email")
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.send_emails_to_domain_managers")
@patch("registrar.utility.email_invitations.validate_invitation")
@patch("registrar.utility.email_invitations.get_requestor_email")
@patch("registrar.utility.email_invitations.send_invitation_email")
@patch("registrar.utility.email_invitations.normalize_domains")
def test_send_domain_invitation_email_manager_emails_send_mail_exception(
self,
mock_normalize_domains,
mock_send_invitation_email,
mock_get_requestor_email,
mock_validate_invitation,
mock_send_domain_manager_emails,
):
"""Test sending domain invitation email for one domain and assert exception
when send_emails_to_domain_managers fails.
"""
# Setup
mock_domain = MagicMock(name="domain1")
mock_domain.name = "example.com"
mock_normalize_domains.return_value = [mock_domain]
mock_requestor = MagicMock()
mock_requestor_email = "requestor@example.com"
mock_get_requestor_email.return_value = mock_requestor_email
email = "invitee@example.com"
is_member_of_different_org = False
mock_send_domain_manager_emails.side_effect = EmailSendingError("Error sending email")
# Call and assert exception
with self.assertRaises(EmailSendingError) as context:
send_domain_invitation_email(
email=email,
requestor=mock_requestor,
domains=mock_domain,
is_member_of_different_org=is_member_of_different_org,
)
# Assertions
mock_normalize_domains.assert_called_once_with(mock_domain)
mock_get_requestor_email.assert_called_once_with(mock_requestor, [mock_domain])
mock_validate_invitation.assert_called_once_with(
email, [mock_domain], mock_requestor, is_member_of_different_org
)
mock_send_invitation_email.assert_called_once_with(email, mock_requestor_email, [mock_domain], None)
self.assertEqual(str(context.exception), "Error sending email")

View file

@ -9,7 +9,7 @@ from registrar.utility.email import EmailSendingError
from waffle.testutils import override_flag
from api.tests.common import less_console_noise_decorator
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from .common import MockEppLib, MockSESClient, create_user # type: ignore
from .common import MockEppLib, create_user # type: ignore
from django_webtest import WebTest # type: ignore
import boto3_mocking # type: ignore
@ -750,11 +750,12 @@ class TestDomainManagers(TestDomainOverview):
response = self.client.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
self.assertContains(response, "Add a domain manager")
@boto3_mocking.patching
@less_console_noise_decorator
def test_domain_user_add_form(self):
@patch("registrar.views.domain.send_domain_invitation_email")
def test_domain_user_add_form(self, mock_send_domain_email):
"""Adding an existing user works."""
get_user_model().objects.get_or_create(email="mayor@igorville.gov")
user = User.objects.filter(email="mayor@igorville.gov").first()
add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
@ -762,10 +763,15 @@ class TestDomainManagers(TestDomainOverview):
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
mock_client = MockSESClient()
with boto3_mocking.clients.handler_for("sesv2", mock_client):
with less_console_noise():
success_result = add_page.form.submit()
success_result = add_page.form.submit()
mock_send_domain_email.assert_called_once_with(
email="mayor@igorville.gov",
requestor=self.user,
domains=self.domain,
is_member_of_different_org=None,
requested_user=user,
)
self.assertEqual(success_result.status_code, 302)
self.assertEqual(
@ -974,13 +980,13 @@ class TestDomainManagers(TestDomainOverview):
success_page = success_result.follow()
self.assertContains(success_page, "Failed to send email.")
@boto3_mocking.patching
@less_console_noise_decorator
def test_domain_invitation_created(self):
@patch("registrar.views.domain.send_domain_invitation_email")
def test_domain_invitation_created(self, mock_send_domain_email):
"""Add user on a nonexistent email creates an invitation.
Adding a non-existent user sends an email as a side-effect, so mock
out the boto3 SES email sending here.
out send_domain_invitation_email here.
"""
# make sure there is no user with this email
email_address = "mayor@igorville.gov"
@ -993,10 +999,11 @@ class TestDomainManagers(TestDomainOverview):
add_page.form["email"] = email_address
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
mock_client = MockSESClient()
with boto3_mocking.clients.handler_for("sesv2", mock_client):
with less_console_noise():
success_result = add_page.form.submit()
success_result = add_page.form.submit()
mock_send_domain_email.assert_called_once_with(
email="mayor@igorville.gov", requestor=self.user, domains=self.domain, is_member_of_different_org=None
)
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_page = success_result.follow()
@ -1005,13 +1012,13 @@ class TestDomainManagers(TestDomainOverview):
self.assertContains(success_page, "Cancel") # link to cancel invitation
self.assertTrue(DomainInvitation.objects.filter(email=email_address).exists())
@boto3_mocking.patching
@less_console_noise_decorator
def test_domain_invitation_created_for_caps_email(self):
@patch("registrar.views.domain.send_domain_invitation_email")
def test_domain_invitation_created_for_caps_email(self, mock_send_domain_email):
"""Add user on a nonexistent email with CAPS creates an invitation to lowercase email.
Adding a non-existent user sends an email as a side-effect, so mock
out the boto3 SES email sending here.
out send_domain_invitation_email here.
"""
# make sure there is no user with this email
email_address = "mayor@igorville.gov"
@ -1025,9 +1032,11 @@ class TestDomainManagers(TestDomainOverview):
add_page.form["email"] = caps_email_address
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
mock_client = MockSESClient()
with boto3_mocking.clients.handler_for("sesv2", mock_client):
success_result = add_page.form.submit()
success_result = add_page.form.submit()
mock_send_domain_email.assert_called_once_with(
email="mayor@igorville.gov", requestor=self.user, domains=self.domain, is_member_of_different_org=None
)
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_page = success_result.follow()

View file

@ -0,0 +1,20 @@
from contextlib import contextmanager
from django.db import transaction, IntegrityError
from psycopg2 import errorcodes
@contextmanager
def ignore_unique_violation():
"""
Execute within an atomic transaction so that if a unique constraint violation occurs,
the individual transaction is rolled back without invalidating any larger transaction.
"""
with transaction.atomic():
try:
yield
except IntegrityError as e:
if e.__cause__.pgcode == errorcodes.UNIQUE_VIOLATION:
# roll back to the savepoint, effectively ignoring this transaction
pass
else:
raise e

View file

@ -1,6 +1,6 @@
from datetime import date
from django.conf import settings
from registrar.models import DomainInvitation
from registrar.models.domain import Domain
from registrar.models import Domain, DomainInvitation, UserDomainRole
from registrar.utility.errors import (
AlreadyDomainInvitedError,
AlreadyDomainManagerError,
@ -41,8 +41,47 @@ def send_domain_invitation_email(
send_invitation_email(email, requestor_email, domains, requested_user)
# send emails to domain managers
for domain in domains:
send_emails_to_domain_managers(
email=email,
requestor_email=requestor_email,
domain=domain,
requested_user=requested_user,
)
def normalize_domains(domains):
def send_emails_to_domain_managers(email: str, requestor_email, domain: Domain, requested_user=None):
"""
Notifies all domain managers of the provided domain of a change
Raises:
EmailSendingError
"""
# Get each domain manager from list
user_domain_roles = UserDomainRole.objects.filter(domain=domain)
for user_domain_role in user_domain_roles:
# Send email to each domain manager
user = user_domain_role.user
try:
send_templated_email(
"emails/domain_manager_notification.txt",
"emails/domain_manager_notification_subject.txt",
to_address=user.email,
context={
"domain": domain,
"requestor_email": requestor_email,
"invited_email_address": email,
"domain_manager": user,
"date": date.today(),
},
)
except EmailSendingError as err:
raise EmailSendingError(
f"Could not send email manager notification to {user.email} for domain: {domain.name}"
) from err
def normalize_domains(domains: Domain | list[Domain]) -> list[Domain]:
"""Ensures domains is always a list."""
return [domains] if isinstance(domains, Domain) else domains
@ -69,6 +108,8 @@ def validate_invitation(email, domains, requestor, is_member_of_different_org):
for domain in domains:
validate_existing_invitation(email, domain)
# NOTE: should we also be validating against existing user_domain_roles
def check_outside_org_membership(email, requestor, is_member_of_different_org):
"""Raise an error if the email belongs to a different organization."""

View file

@ -1,19 +1,19 @@
import logging
from django.db import transaction
from django.db.models import ForeignKey, OneToOneField, ManyToManyField, ManyToOneRel, ManyToManyRel, OneToOneRel
from django.shortcuts import render, get_object_or_404, redirect
from django.views import View
from registrar.models.domain import Domain
from registrar.models.domain_information import DomainInformation
from registrar.models.domain_request import DomainRequest
from registrar.models.portfolio import Portfolio
from registrar.models.user import User
from django.contrib.admin import site
from django.contrib import messages
from registrar.models.user_domain_role import UserDomainRole
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.verified_by_staff import VerifiedByStaff
from typing import Any, List
from registrar.utility.db_helpers import ignore_unique_violation
logger = logging.getLogger(__name__)
@ -21,22 +21,8 @@ logger = logging.getLogger(__name__)
class TransferUserView(View):
"""Transfer user methods that set up the transfer_user template and handle the forms on it."""
JOINS = [
(DomainRequest, "creator"),
(DomainInformation, "creator"),
(Portfolio, "creator"),
(DomainRequest, "investigator"),
(UserDomainRole, "user"),
(VerifiedByStaff, "requestor"),
(UserPortfolioPermission, "user"),
]
# Future-proofing in case joined fields get added on the user model side
# This was tested in the first portfolio model iteration and works
USER_FIELDS: List[Any] = []
def get(self, request, user_id):
"""current_user referes to the 'source' user where the button that redirects to this view was clicked.
"""current_user refers to the 'source' user where the button that redirects to this view was clicked.
other_users exclude current_user and populate a dropdown, selected_user is the selection in the dropdown.
This also querries the relevant domains and domain requests, and the admin context needed for the sidenav."""
@ -70,86 +56,122 @@ class TransferUserView(View):
return render(request, "admin/transfer_user.html", context)
def post(self, request, user_id):
"""This handles the transfer from selected_user to current_user then deletes selected_user.
NOTE: We have a ticket to refactor this into a more solid lookup for related fields in #2645"""
"""This handles the transfer from selected_user to current_user then deletes selected_user."""
current_user = get_object_or_404(User, pk=user_id)
selected_user_id = request.POST.get("selected_user")
selected_user = get_object_or_404(User, pk=selected_user_id)
try:
change_logs = []
# Make this atomic so that we don't get any partial transfers
with transaction.atomic():
change_logs = []
# Transfer specific fields
self.transfer_user_fields_and_log(selected_user, current_user, change_logs)
# Dynamically handle related fields
self.transfer_related_fields_and_log(selected_user, current_user, change_logs)
# Perform the updates and log the changes
for model_class, field_name in self.JOINS:
self.update_joins_and_log(model_class, field_name, selected_user, current_user, change_logs)
# Success message if any related objects were updated
if change_logs:
success_message = f"Data transferred successfully for the following objects: {change_logs}"
messages.success(request, success_message)
selected_user.delete()
messages.success(request, f"Deleted {selected_user} {selected_user.username}")
# Success message if any related objects were updated
if change_logs:
success_message = f"Data transferred successfully for the following objects: {change_logs}"
messages.success(request, success_message)
selected_user.delete()
messages.success(request, f"Deleted {selected_user} {selected_user.username}")
except Exception as e:
messages.error(request, f"An error occurred during the transfer: {e}")
logger.error(f"An error occurred during the transfer: {e}", exc_info=True)
return redirect("admin:registrar_user_change", object_id=user_id)
@classmethod
def update_joins_and_log(cls, model_class, field_name, selected_user, current_user, change_logs):
def transfer_related_fields_and_log(self, selected_user, current_user, change_logs):
"""
Helper function to update the user join fields for a given model and log the changes.
Dynamically find all related fields to the User model and transfer them from selected_user to current_user.
Handles ForeignKey, OneToOneField, ManyToManyField, and ManyToOneRel relationships.
"""
user_model = User
filter_kwargs = {field_name: selected_user}
updated_objects = model_class.objects.filter(**filter_kwargs)
for related_field in user_model._meta.get_fields():
if related_field.is_relation:
# Field objects represent forward relationships
if isinstance(related_field, OneToOneField):
self._handle_one_to_one(related_field, selected_user, current_user, change_logs)
elif isinstance(related_field, ManyToManyField):
self._handle_many_to_many(related_field, selected_user, current_user, change_logs)
elif isinstance(related_field, ForeignKey):
self._handle_foreign_key(related_field, selected_user, current_user, change_logs)
# Relationship objects represent reverse relationships
elif isinstance(related_field, ManyToOneRel):
# ManyToOneRel is a reverse ForeignKey
self._handle_foreign_key_reverse(related_field, selected_user, current_user, change_logs)
elif isinstance(related_field, OneToOneRel):
self._handle_one_to_one_reverse(related_field, selected_user, current_user, change_logs)
elif isinstance(related_field, ManyToManyRel):
self._handle_many_to_many_reverse(related_field, selected_user, current_user, change_logs)
else:
logger.error(f"Unknown relationship type for field {related_field}")
raise ValueError(f"Unknown relationship type for field {related_field}")
for obj in updated_objects:
# Check for duplicate UserDomainRole before updating
if model_class == UserDomainRole:
if model_class.objects.filter(user=current_user, domain=obj.domain).exists():
continue # Skip the update to avoid a duplicate
def _handle_foreign_key_reverse(self, related_field: ManyToOneRel, selected_user, current_user, change_logs):
# Handle reverse ForeignKey relationships
related_manager = getattr(selected_user, related_field.get_accessor_name(), None)
if related_manager and related_manager.exists():
for related_object in related_manager.all():
with ignore_unique_violation():
setattr(related_object, related_field.field.name, current_user)
related_object.save()
self.log_change(related_object, selected_user, current_user, related_field.field.name, change_logs)
if model_class == UserPortfolioPermission:
if model_class.objects.filter(user=current_user, portfolio=obj.portfolio).exists():
continue # Skip the update to avoid a duplicate
def _handle_foreign_key(self, related_field: ForeignKey, selected_user, current_user, change_logs):
# Handle ForeignKey relationships
related_object = getattr(selected_user, related_field.name, None)
if related_object:
setattr(current_user, related_field.name, related_object)
current_user.save()
self.log_change(related_object, selected_user, current_user, related_field.name, change_logs)
# Update the field on the object and save it
setattr(obj, field_name, current_user)
obj.save()
def _handle_one_to_one(self, related_field: OneToOneField, selected_user, current_user, change_logs):
# Handle OneToOne relationship
related_object = getattr(selected_user, related_field.name, None)
if related_object:
with ignore_unique_violation():
setattr(current_user, related_field.name, related_object)
current_user.save()
self.log_change(related_object, selected_user, current_user, related_field.name, change_logs)
# Log the change
cls.log_change(obj, field_name, selected_user, current_user, change_logs)
def _handle_many_to_many(self, related_field: ManyToManyField, selected_user, current_user, change_logs):
# Handle ManyToMany relationship
related_name = related_field.remote_field.name
related_manager = getattr(selected_user, related_name, None)
if related_manager and related_manager.exists():
for instance in related_manager.all():
with ignore_unique_violation():
getattr(instance, related_name).remove(selected_user)
getattr(instance, related_name).add(current_user)
self.log_change(instance, selected_user, current_user, related_name, change_logs)
def _handle_many_to_many_reverse(self, related_field: ManyToManyRel, selected_user, current_user, change_logs):
# Handle reverse relationship
related_name = related_field.field.name
related_manager = getattr(selected_user, related_name, None)
if related_manager and related_manager.exists():
for instance in related_manager.all():
with ignore_unique_violation():
getattr(instance, related_name).remove(selected_user)
getattr(instance, related_name).add(current_user)
self.log_change(instance, selected_user, current_user, related_name, change_logs)
def _handle_one_to_one_reverse(self, related_field: OneToOneRel, selected_user, current_user, change_logs):
# Handle reverse relationship
field_name = related_field.get_accessor_name()
related_instance = getattr(selected_user, field_name, None)
if related_instance:
setattr(related_instance, field_name, current_user)
related_instance.save()
self.log_change(related_instance, selected_user, current_user, field_name, change_logs)
@classmethod
def transfer_user_fields_and_log(cls, selected_user, current_user, change_logs):
"""
Transfers portfolio fields from the selected_user to the current_user.
Logs the changes for each transferred field.
"""
for field in cls.USER_FIELDS:
field_value = getattr(selected_user, field, None)
if field_value:
setattr(current_user, field, field_value)
cls.log_change(current_user, field, field_value, field_value, change_logs)
current_user.save()
@classmethod
def log_change(cls, obj, field_name, field_value, new_value, change_logs):
"""Logs the change for a specific field on an object"""
log_entry = f'Changed {field_name} from "{field_value}" to "{new_value}" on {obj}'
def log_change(cls, obj, selected_user, current_user, field_name, change_logs):
log_entry = f"Changed {field_name} from {selected_user} to {current_user} on {obj}"
logger.info(log_entry)
# Collect the related object for the success message
change_logs.append(log_entry)
@classmethod