initial fix

This commit is contained in:
matthewswspence 2025-01-14 14:33:30 -06:00
parent a0a53ea855
commit 5fce7c7b4e
No known key found for this signature in database
GPG key ID: FB458202A7852BA4
2 changed files with 215 additions and 73 deletions

View file

@ -2734,7 +2734,7 @@ class TestTransferUser(WebTest):
@less_console_noise_decorator @less_console_noise_decorator
def test_transfer_user_transfers_user_portfolio_roles_no_error_when_duplicates(self): def test_transfer_user_transfers_user_portfolio_roles_no_error_when_duplicates(self):
"""Assert that duplicate portfolio user roles do not throw errorsd""" """Assert that duplicate portfolio user roles do not throw errors"""
portfolio1 = Portfolio.objects.create(organization_name="Hotel California", creator=self.user2) portfolio1 = Portfolio.objects.create(organization_name="Hotel California", creator=self.user2)
UserPortfolioPermission.objects.create( UserPortfolioPermission.objects.create(
user=self.user1, portfolio=portfolio1, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] user=self.user1, portfolio=portfolio1, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
@ -2759,7 +2759,7 @@ class TestTransferUser(WebTest):
messages.error.assert_not_called() messages.error.assert_not_called()
@less_console_noise_decorator # @less_console_noise_decorator
def test_transfer_user_transfers_domain_request_creator_and_investigator(self): def test_transfer_user_transfers_domain_request_creator_and_investigator(self):
"""Assert that domain request fields get transferred""" """Assert that domain request fields get transferred"""
domain_request = completed_domain_request(user=self.user2, name="wasteland.gov", investigator=self.user2) domain_request = completed_domain_request(user=self.user2, name="wasteland.gov", investigator=self.user2)
@ -2776,6 +2776,7 @@ class TestTransferUser(WebTest):
self.assertEquals(domain_request.creator, self.user1) self.assertEquals(domain_request.creator, self.user1)
self.assertEquals(domain_request.investigator, self.user1) self.assertEquals(domain_request.investigator, self.user1)
@less_console_noise_decorator @less_console_noise_decorator
def test_transfer_user_transfers_domain_information_creator(self): def test_transfer_user_transfers_domain_information_creator(self):
"""Assert that domain fields get transferred""" """Assert that domain fields get transferred"""
@ -2866,7 +2867,7 @@ class TestTransferUser(WebTest):
with self.assertRaises(User.DoesNotExist): with self.assertRaises(User.DoesNotExist):
self.user2.refresh_from_db() self.user2.refresh_from_db()
@less_console_noise_decorator # @less_console_noise_decorator
def test_transfer_user_throws_transfer_and_delete_success_messages(self): def test_transfer_user_throws_transfer_and_delete_success_messages(self):
"""Test that success messages for data transfer and user deletion are displayed.""" """Test that success messages for data transfer and user deletion are displayed."""
# Ensure the setup for VerifiedByStaff # Ensure the setup for VerifiedByStaff
@ -2882,6 +2883,10 @@ class TestTransferUser(WebTest):
submit_form["selected_user"] = self.user2.pk submit_form["selected_user"] = self.user2.pk
after_submit = submit_form.submit().follow() after_submit = submit_form.submit().follow()
print("mock_success_message.call_args_list:")
for call in mock_success_message.call_args_list:
print(call)
self.assertContains(after_submit, "<h1>Change user</h1>") self.assertContains(after_submit, "<h1>Change user</h1>")
mock_success_message.assert_any_call( mock_success_message.assert_any_call(
@ -2898,7 +2903,7 @@ class TestTransferUser(WebTest):
def test_transfer_user_throws_error_message(self): def test_transfer_user_throws_error_message(self):
"""Test that an error message is thrown if the transfer fails.""" """Test that an error message is thrown if the transfer fails."""
with patch( with patch(
"registrar.views.TransferUserView.transfer_user_fields_and_log", side_effect=Exception("Simulated Error") "registrar.views.TransferUserView.transfer_related_fields_and_log", side_effect=Exception("Simulated Error")
): ):
with patch("django.contrib.messages.error") as mock_error: with patch("django.contrib.messages.error") as mock_error:
# Access the transfer user page # Access the transfer user page

View file

@ -1,7 +1,10 @@
import logging import logging
from django.db import transaction
from django.db.models import Manager,ForeignKey, OneToOneField, ManyToManyField, ManyToOneRel
from django.shortcuts import render, get_object_or_404, redirect from django.shortcuts import render, get_object_or_404, redirect
from django.views import View from django.views import View
from registrar import models
from registrar.models.domain import Domain from registrar.models.domain import Domain
from registrar.models.domain_information import DomainInformation from registrar.models.domain_information import DomainInformation
from registrar.models.domain_request import DomainRequest from registrar.models.domain_request import DomainRequest
@ -21,22 +24,8 @@ logger = logging.getLogger(__name__)
class TransferUserView(View): class TransferUserView(View):
"""Transfer user methods that set up the transfer_user template and handle the forms on it.""" """Transfer user methods that set up the transfer_user template and handle the forms on it."""
JOINS = [
(DomainRequest, "creator"),
(DomainInformation, "creator"),
(Portfolio, "creator"),
(DomainRequest, "investigator"),
(UserDomainRole, "user"),
(VerifiedByStaff, "requestor"),
(UserPortfolioPermission, "user"),
]
# Future-proofing in case joined fields get added on the user model side
# This was tested in the first portfolio model iteration and works
USER_FIELDS: List[Any] = []
def get(self, request, user_id): def get(self, request, user_id):
"""current_user referes to the 'source' user where the button that redirects to this view was clicked. """current_user refers to the 'source' user where the button that redirects to this view was clicked.
other_users exclude current_user and populate a dropdown, selected_user is the selection in the dropdown. other_users exclude current_user and populate a dropdown, selected_user is the selection in the dropdown.
This also querries the relevant domains and domain requests, and the admin context needed for the sidenav.""" This also querries the relevant domains and domain requests, and the admin context needed for the sidenav."""
@ -71,86 +60,234 @@ class TransferUserView(View):
def post(self, request, user_id): def post(self, request, user_id):
"""This handles the transfer from selected_user to current_user then deletes selected_user. """This handles the transfer from selected_user to current_user then deletes selected_user.
"""
NOTE: We have a ticket to refactor this into a more solid lookup for related fields in #2645"""
current_user = get_object_or_404(User, pk=user_id) current_user = get_object_or_404(User, pk=user_id)
selected_user_id = request.POST.get("selected_user") selected_user_id = request.POST.get("selected_user")
selected_user = get_object_or_404(User, pk=selected_user_id) selected_user = get_object_or_404(User, pk=selected_user_id)
try: try:
change_logs = [] # Make this atomic so that we don't get any partial transfers
with transaction.atomic():
change_logs = []
# Transfer specific fields self._delete_duplicate_user_domain_roles_and_log(selected_user, current_user, change_logs)
self.transfer_user_fields_and_log(selected_user, current_user, change_logs)
# Perform the updates and log the changes self._delete_duplicate_user_portfolio_permissions_and_log(selected_user, current_user, change_logs)
for model_class, field_name in self.JOINS: # Dynamically handle related fields
self.update_joins_and_log(model_class, field_name, selected_user, current_user, change_logs) self.transfer_related_fields_and_log(selected_user, current_user, change_logs)
# Success message if any related objects were updated # Success message if any related objects were updated
if change_logs: logger.debug(f"change_logs: {change_logs}")
success_message = f"Data transferred successfully for the following objects: {change_logs}" if change_logs:
messages.success(request, success_message) logger.debug(f"change_logs: {change_logs}")
success_message = f"Data transferred successfully for the following objects: {change_logs}"
messages.success(request, success_message)
selected_user.delete() logger.debug("Deleting old user")
messages.success(request, f"Deleted {selected_user} {selected_user.username}") selected_user.delete()
messages.success(request, f"Deleted {selected_user} {selected_user.username}")
except Exception as e: except Exception as e:
messages.error(request, f"An error occurred during the transfer: {e}") messages.error(request, f"An error occurred during the transfer: {e}")
logger.error(f"An error occurred during the transfer: {e}", exc_info=True)
return redirect("admin:registrar_user_change", object_id=user_id) return redirect("admin:registrar_user_change", object_id=user_id)
@classmethod def _delete_duplicate_user_portfolio_permissions_and_log(self, selected_user, current_user, change_logs):
def update_joins_and_log(cls, model_class, field_name, selected_user, current_user, change_logs):
""" """
Helper function to update the user join fields for a given model and log the changes. Check and remove duplicate UserPortfolioPermission objects from the selected_user based on portfolios associated with the current_user.
"""
try:
# Fetch portfolios associated with the current_user
current_user_portfolios = UserPortfolioPermission.objects.filter(user=current_user).values_list('portfolio_id', flat=True)
# Identify duplicates in selected_user for these portfolios
duplicates = (
UserPortfolioPermission.objects
.filter(user=selected_user, portfolio_id__in=current_user_portfolios)
)
duplicate_count = duplicates.count()
if duplicate_count > 0:
# Log the specific duplicates before deletion for better traceability
duplicate_permissions = list(duplicates)
logger.debug(f"Duplicate permissions to be removed: {duplicate_permissions}")
duplicates.delete()
logger.info(f"Removed {duplicate_count} duplicate UserPortfolioPermission(s) from user_id {selected_user.id} for portfolios already associated with user_id {current_user.id}")
change_logs.append(f"Removed {duplicate_count} duplicate UserPortfolioPermission(s) from user_id {selected_user.id} for portfolios already associated with user_id {current_user.id}")
except Exception as e:
logger.error(f"Failed to check and remove duplicate UserPortfolioPermissions: {e}", exc_info=True)
raise
def _delete_duplicate_user_domain_roles_and_log(self, selected_user, current_user, change_logs):
"""
Check and remove duplicate UserDomainRole objects from the selected_user based on domains associated with the current_user.
Retain one instance per domain to maintain data integrity.
""" """
filter_kwargs = {field_name: selected_user} try:
updated_objects = model_class.objects.filter(**filter_kwargs) # Fetch domains associated with the current_user
current_user_domains = UserDomainRole.objects.filter(user=current_user).values_list('domain_id', flat=True)
for obj in updated_objects: # Identify duplicates in selected_user for these domains
# Check for duplicate UserDomainRole before updating duplicates = (
if model_class == UserDomainRole: UserDomainRole.objects
if model_class.objects.filter(user=current_user, domain=obj.domain).exists(): .filter(user=selected_user, domain_id__in=current_user_domains)
continue # Skip the update to avoid a duplicate )
if model_class == UserPortfolioPermission: duplicate_count = duplicates.count()
if model_class.objects.filter(user=current_user, portfolio=obj.portfolio).exists():
continue # Skip the update to avoid a duplicate
# Update the field on the object and save it if duplicate_count > 0:
duplicates.delete()
logger.info(
f"Removed {duplicate_count} duplicate UserDomainRole(s) from user_id {selected_user.id} "
f"for domains already associated with user_id {current_user.id}"
)
change_logs.append(
f"Removed {duplicate_count} duplicate UserDomainRole(s) from user_id {selected_user.id} "
f"for domains already associated with user_id {current_user.id}"
)
except Exception as e:
logger.error(f"Failed to check and remove duplicate UserDomainRoles: {e}", exc_info=True)
raise
def transfer_related_fields_and_log(self, selected_user, current_user, change_logs):
"""
Dynamically find all related fields to the User model and transfer them from selected_user to current_user.
Handles ForeignKey, OneToOneField, ManyToManyField, and ManyToOneRel relationships.
"""
user_model = User
# Handle forward relationships
for related_field in user_model._meta.get_fields():
if related_field.is_relation and related_field.related_model:
if isinstance(related_field, ForeignKey):
self._handle_foreign_key(related_field, selected_user, current_user, change_logs)
elif isinstance(related_field, OneToOneField):
self._handle_one_to_one(related_field, selected_user, current_user, change_logs)
elif isinstance(related_field, ManyToManyField):
self._handle_many_to_many(related_field, selected_user, current_user, change_logs)
elif isinstance(related_field, ManyToOneRel):
self._handle_many_to_one_rel(related_field, selected_user, current_user, change_logs)
# # Handle reverse relationships
for related_object in user_model._meta.related_objects:
if isinstance(related_object, ManyToOneRel):
self._handle_many_to_one_rel(related_object, selected_user, current_user, change_logs)
elif isinstance(related_object.field, OneToOneField):
self._handle_one_to_one_reverse(related_object, selected_user, current_user, change_logs)
elif isinstance(related_object.field, ForeignKey):
self._handle_foreign_key_reverse(related_object, selected_user, current_user, change_logs)
elif isinstance(related_object.field, ManyToManyField):
self._handle_many_to_many_reverse(related_object, selected_user, current_user, change_logs)
def _handle_foreign_key(self, related_field: ForeignKey, selected_user, current_user, change_logs):
related_name = related_field.get_accessor_name()
# for foreign key relationships, getattr returns a manager
related_manager = getattr(selected_user, related_name, None)
if related_manager:
# get all the related objects
related_queryset = related_manager.all()
for obj in related_queryset:
# set the foreign key to the current user
setattr(obj, related_field.field.name, current_user)
obj.save()
log_entry = f'Transferred {related_field.field.name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
def _handle_one_to_one(self, related_field: OneToOneField, selected_user, current_user, change_logs):
related_name = related_field.get_accessor_name()
# for one to one relationships, getattr returns the related object
related_object = getattr(selected_user, related_name, None)
if related_object:
# set the one to one field to the current user
setattr(related_object, related_field.field.name, current_user)
related_object.save()
log_entry = f'Transferred {related_field.field.name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
def _handle_many_to_many(self, related_field: ManyToManyField, selected_user, current_user, change_logs):
# for many to many relationships, getattr returns a manager
related_manager = getattr(selected_user, related_field.name, None)
if related_manager:
# get all the related objects
related_queryset = related_manager.all()
# add the related objects to the current user
getattr(current_user, related_field.name).add(*related_queryset)
log_entry = f'Transferred {related_field.name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
def _handle_many_to_one_rel(self, related_object: ManyToOneRel, selected_user: User, current_user: User, change_logs: List[str]):
"""
Handles ManyToOneRel relationships, where multiple objects relate to the User via a ForeignKey.
"""
related_model = related_object.related_model
related_name = related_object.field.name
# for many to one relationships, we need a queryset
related_queryset = related_model.objects.filter(**{related_name: selected_user})
for obj in related_queryset:
setattr(obj, related_name, current_user)
obj.save()
log_entry = f'Transferred {related_name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
def _handle_one_to_one_reverse(self, related_object: OneToOneField, selected_user: User, current_user: User, change_logs: List[str]):
"""
Handles reverse OneToOneField relationships.
"""
related_model = related_object.related_model
field_name = related_object.field.name
try:
related_instance = related_model.objects.filter(**{field_name: selected_user}).first()
setattr(related_instance, field_name, current_user)
related_instance.save()
log_entry = f'Transferred {field_name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
except related_model.DoesNotExist:
logger.warning(f"No related instance found for reverse OneToOneField {field_name} for {selected_user}")
def _handle_foreign_key_reverse(self, related_object: ForeignKey, selected_user: User, current_user: User, change_logs: List[str]):
"""
Handles reverse ForeignKey relationships.
"""
related_model = related_object.related_model
field_name = related_object.field.name
related_queryset = related_model.objects.filter(**{field_name: selected_user})
for obj in related_queryset:
setattr(obj, field_name, current_user) setattr(obj, field_name, current_user)
obj.save() obj.save()
log_entry = f'Transferred {field_name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
# Log the change def _handle_many_to_many_reverse(self, related_object: ManyToManyField, selected_user: User, current_user: User, change_logs: List[str]):
cls.log_change(obj, field_name, selected_user, current_user, change_logs)
@classmethod
def transfer_user_fields_and_log(cls, selected_user, current_user, change_logs):
""" """
Transfers portfolio fields from the selected_user to the current_user. Handles reverse ManyToManyField relationships.
Logs the changes for each transferred field.
""" """
for field in cls.USER_FIELDS: related_model = related_object.related_model
field_value = getattr(selected_user, field, None) field_name = related_object.field.name
if field_value: related_manager = related_model.objects.filter(**{field_name: selected_user})
setattr(current_user, field, field_value) if related_manager:
cls.log_change(current_user, field, field_value, field_value, change_logs) related_qs = related_manager.all()
getattr(current_user, field_name).add(*related_qs)
current_user.save() log_entry = f'Transferred {field_name} from {selected_user} to {current_user}'
logger.info(log_entry)
@classmethod change_logs.append(log_entry)
def log_change(cls, obj, field_name, field_value, new_value, change_logs):
"""Logs the change for a specific field on an object"""
log_entry = f'Changed {field_name} from "{field_value}" to "{new_value}" on {obj}'
logger.info(log_entry)
# Collect the related object for the success message
change_logs.append(log_entry)
@classmethod @classmethod
def get_domains(cls, user): def get_domains(cls, user):