Merge branch 'main' into nl/3275-slowness-admin-tables

This commit is contained in:
CuriousX 2025-01-21 11:39:05 -07:00 committed by GitHub
commit 90b08f2789
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 124 additions and 80 deletions

View file

@ -3442,7 +3442,7 @@ class TestTransferUser(WebTest):
@less_console_noise_decorator
def test_transfer_user_transfers_user_portfolio_roles_no_error_when_duplicates(self):
"""Assert that duplicate portfolio user roles do not throw errorsd"""
"""Assert that duplicate portfolio user roles do not throw errors"""
portfolio1 = Portfolio.objects.create(organization_name="Hotel California", creator=self.user2)
UserPortfolioPermission.objects.create(
user=self.user1, portfolio=portfolio1, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
@ -3574,7 +3574,7 @@ class TestTransferUser(WebTest):
with self.assertRaises(User.DoesNotExist):
self.user2.refresh_from_db()
@less_console_noise_decorator
# @less_console_noise_decorator
def test_transfer_user_throws_transfer_and_delete_success_messages(self):
"""Test that success messages for data transfer and user deletion are displayed."""
# Ensure the setup for VerifiedByStaff
@ -3592,11 +3592,13 @@ class TestTransferUser(WebTest):
self.assertContains(after_submit, "<h1>Change user</h1>")
print(mock_success_message.call_args_list)
mock_success_message.assert_any_call(
ANY,
(
"Data transferred successfully for the following objects: ['Changed requestor "
+ 'from "Furiosa Jabassa " to "Max Rokatanski " on immortan.joe@citadel.com\']'
+ "from Furiosa Jabassa to Max Rokatanski on immortan.joe@citadel.com']"
),
)
@ -3606,7 +3608,7 @@ class TestTransferUser(WebTest):
def test_transfer_user_throws_error_message(self):
"""Test that an error message is thrown if the transfer fails."""
with patch(
"registrar.views.TransferUserView.transfer_user_fields_and_log", side_effect=Exception("Simulated Error")
"registrar.views.TransferUserView.transfer_related_fields_and_log", side_effect=Exception("Simulated Error")
):
with patch("django.contrib.messages.error") as mock_error:
# Access the transfer user page

View file

@ -0,0 +1,20 @@
from contextlib import contextmanager
from django.db import transaction, IntegrityError
from psycopg2 import errorcodes
@contextmanager
def ignore_unique_violation():
"""
Execute within an atomic transaction so that if a unique constraint violation occurs,
the individual transaction is rolled back without invalidating any larger transaction.
"""
with transaction.atomic():
try:
yield
except IntegrityError as e:
if e.__cause__.pgcode == errorcodes.UNIQUE_VIOLATION:
# roll back to the savepoint, effectively ignoring this transaction
pass
else:
raise e

View file

@ -1,19 +1,19 @@
import logging
from django.db import transaction
from django.db.models import ForeignKey, OneToOneField, ManyToManyField, ManyToOneRel, ManyToManyRel, OneToOneRel
from django.shortcuts import render, get_object_or_404, redirect
from django.views import View
from registrar.models.domain import Domain
from registrar.models.domain_information import DomainInformation
from registrar.models.domain_request import DomainRequest
from registrar.models.portfolio import Portfolio
from registrar.models.user import User
from django.contrib.admin import site
from django.contrib import messages
from registrar.models.user_domain_role import UserDomainRole
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.verified_by_staff import VerifiedByStaff
from typing import Any, List
from registrar.utility.db_helpers import ignore_unique_violation
logger = logging.getLogger(__name__)
@ -21,22 +21,8 @@ logger = logging.getLogger(__name__)
class TransferUserView(View):
"""Transfer user methods that set up the transfer_user template and handle the forms on it."""
JOINS = [
(DomainRequest, "creator"),
(DomainInformation, "creator"),
(Portfolio, "creator"),
(DomainRequest, "investigator"),
(UserDomainRole, "user"),
(VerifiedByStaff, "requestor"),
(UserPortfolioPermission, "user"),
]
# Future-proofing in case joined fields get added on the user model side
# This was tested in the first portfolio model iteration and works
USER_FIELDS: List[Any] = []
def get(self, request, user_id):
"""current_user referes to the 'source' user where the button that redirects to this view was clicked.
"""current_user refers to the 'source' user where the button that redirects to this view was clicked.
other_users exclude current_user and populate a dropdown, selected_user is the selection in the dropdown.
This also querries the relevant domains and domain requests, and the admin context needed for the sidenav."""
@ -70,86 +56,122 @@ class TransferUserView(View):
return render(request, "admin/transfer_user.html", context)
def post(self, request, user_id):
"""This handles the transfer from selected_user to current_user then deletes selected_user.
NOTE: We have a ticket to refactor this into a more solid lookup for related fields in #2645"""
"""This handles the transfer from selected_user to current_user then deletes selected_user."""
current_user = get_object_or_404(User, pk=user_id)
selected_user_id = request.POST.get("selected_user")
selected_user = get_object_or_404(User, pk=selected_user_id)
try:
change_logs = []
# Make this atomic so that we don't get any partial transfers
with transaction.atomic():
change_logs = []
# Transfer specific fields
self.transfer_user_fields_and_log(selected_user, current_user, change_logs)
# Dynamically handle related fields
self.transfer_related_fields_and_log(selected_user, current_user, change_logs)
# Perform the updates and log the changes
for model_class, field_name in self.JOINS:
self.update_joins_and_log(model_class, field_name, selected_user, current_user, change_logs)
# Success message if any related objects were updated
if change_logs:
success_message = f"Data transferred successfully for the following objects: {change_logs}"
messages.success(request, success_message)
selected_user.delete()
messages.success(request, f"Deleted {selected_user} {selected_user.username}")
# Success message if any related objects were updated
if change_logs:
success_message = f"Data transferred successfully for the following objects: {change_logs}"
messages.success(request, success_message)
selected_user.delete()
messages.success(request, f"Deleted {selected_user} {selected_user.username}")
except Exception as e:
messages.error(request, f"An error occurred during the transfer: {e}")
logger.error(f"An error occurred during the transfer: {e}", exc_info=True)
return redirect("admin:registrar_user_change", object_id=user_id)
@classmethod
def update_joins_and_log(cls, model_class, field_name, selected_user, current_user, change_logs):
def transfer_related_fields_and_log(self, selected_user, current_user, change_logs):
"""
Helper function to update the user join fields for a given model and log the changes.
Dynamically find all related fields to the User model and transfer them from selected_user to current_user.
Handles ForeignKey, OneToOneField, ManyToManyField, and ManyToOneRel relationships.
"""
user_model = User
filter_kwargs = {field_name: selected_user}
updated_objects = model_class.objects.filter(**filter_kwargs)
for related_field in user_model._meta.get_fields():
if related_field.is_relation:
# Field objects represent forward relationships
if isinstance(related_field, OneToOneField):
self._handle_one_to_one(related_field, selected_user, current_user, change_logs)
elif isinstance(related_field, ManyToManyField):
self._handle_many_to_many(related_field, selected_user, current_user, change_logs)
elif isinstance(related_field, ForeignKey):
self._handle_foreign_key(related_field, selected_user, current_user, change_logs)
# Relationship objects represent reverse relationships
elif isinstance(related_field, ManyToOneRel):
# ManyToOneRel is a reverse ForeignKey
self._handle_foreign_key_reverse(related_field, selected_user, current_user, change_logs)
elif isinstance(related_field, OneToOneRel):
self._handle_one_to_one_reverse(related_field, selected_user, current_user, change_logs)
elif isinstance(related_field, ManyToManyRel):
self._handle_many_to_many_reverse(related_field, selected_user, current_user, change_logs)
else:
logger.error(f"Unknown relationship type for field {related_field}")
raise ValueError(f"Unknown relationship type for field {related_field}")
for obj in updated_objects:
# Check for duplicate UserDomainRole before updating
if model_class == UserDomainRole:
if model_class.objects.filter(user=current_user, domain=obj.domain).exists():
continue # Skip the update to avoid a duplicate
def _handle_foreign_key_reverse(self, related_field: ManyToOneRel, selected_user, current_user, change_logs):
# Handle reverse ForeignKey relationships
related_manager = getattr(selected_user, related_field.get_accessor_name(), None)
if related_manager and related_manager.exists():
for related_object in related_manager.all():
with ignore_unique_violation():
setattr(related_object, related_field.field.name, current_user)
related_object.save()
self.log_change(related_object, selected_user, current_user, related_field.field.name, change_logs)
if model_class == UserPortfolioPermission:
if model_class.objects.filter(user=current_user, portfolio=obj.portfolio).exists():
continue # Skip the update to avoid a duplicate
def _handle_foreign_key(self, related_field: ForeignKey, selected_user, current_user, change_logs):
# Handle ForeignKey relationships
related_object = getattr(selected_user, related_field.name, None)
if related_object:
setattr(current_user, related_field.name, related_object)
current_user.save()
self.log_change(related_object, selected_user, current_user, related_field.name, change_logs)
# Update the field on the object and save it
setattr(obj, field_name, current_user)
obj.save()
def _handle_one_to_one(self, related_field: OneToOneField, selected_user, current_user, change_logs):
# Handle OneToOne relationship
related_object = getattr(selected_user, related_field.name, None)
if related_object:
with ignore_unique_violation():
setattr(current_user, related_field.name, related_object)
current_user.save()
self.log_change(related_object, selected_user, current_user, related_field.name, change_logs)
# Log the change
cls.log_change(obj, field_name, selected_user, current_user, change_logs)
def _handle_many_to_many(self, related_field: ManyToManyField, selected_user, current_user, change_logs):
# Handle ManyToMany relationship
related_name = related_field.remote_field.name
related_manager = getattr(selected_user, related_name, None)
if related_manager and related_manager.exists():
for instance in related_manager.all():
with ignore_unique_violation():
getattr(instance, related_name).remove(selected_user)
getattr(instance, related_name).add(current_user)
self.log_change(instance, selected_user, current_user, related_name, change_logs)
def _handle_many_to_many_reverse(self, related_field: ManyToManyRel, selected_user, current_user, change_logs):
# Handle reverse relationship
related_name = related_field.field.name
related_manager = getattr(selected_user, related_name, None)
if related_manager and related_manager.exists():
for instance in related_manager.all():
with ignore_unique_violation():
getattr(instance, related_name).remove(selected_user)
getattr(instance, related_name).add(current_user)
self.log_change(instance, selected_user, current_user, related_name, change_logs)
def _handle_one_to_one_reverse(self, related_field: OneToOneRel, selected_user, current_user, change_logs):
# Handle reverse relationship
field_name = related_field.get_accessor_name()
related_instance = getattr(selected_user, field_name, None)
if related_instance:
setattr(related_instance, field_name, current_user)
related_instance.save()
self.log_change(related_instance, selected_user, current_user, field_name, change_logs)
@classmethod
def transfer_user_fields_and_log(cls, selected_user, current_user, change_logs):
"""
Transfers portfolio fields from the selected_user to the current_user.
Logs the changes for each transferred field.
"""
for field in cls.USER_FIELDS:
field_value = getattr(selected_user, field, None)
if field_value:
setattr(current_user, field, field_value)
cls.log_change(current_user, field, field_value, field_value, change_logs)
current_user.save()
@classmethod
def log_change(cls, obj, field_name, field_value, new_value, change_logs):
"""Logs the change for a specific field on an object"""
log_entry = f'Changed {field_name} from "{field_value}" to "{new_value}" on {obj}'
def log_change(cls, obj, selected_user, current_user, field_name, change_logs):
log_entry = f"Changed {field_name} from {selected_user} to {current_user} on {obj}"
logger.info(log_entry)
# Collect the related object for the success message
change_logs.append(log_entry)
@classmethod