refactor transfer user function

This commit is contained in:
matthewswspence 2025-01-14 14:58:30 -06:00
parent 5fce7c7b4e
commit bc9bc8fbae
No known key found for this signature in database
GPG key ID: FB458202A7852BA4
2 changed files with 65 additions and 83 deletions

View file

@ -2759,7 +2759,7 @@ class TestTransferUser(WebTest):
messages.error.assert_not_called()
# @less_console_noise_decorator
@less_console_noise_decorator
def test_transfer_user_transfers_domain_request_creator_and_investigator(self):
"""Assert that domain request fields get transferred"""
domain_request = completed_domain_request(user=self.user2, name="wasteland.gov", investigator=self.user2)
@ -2776,7 +2776,6 @@ class TestTransferUser(WebTest):
self.assertEquals(domain_request.creator, self.user1)
self.assertEquals(domain_request.investigator, self.user1)
@less_console_noise_decorator
def test_transfer_user_transfers_domain_information_creator(self):
"""Assert that domain fields get transferred"""
@ -2867,7 +2866,7 @@ class TestTransferUser(WebTest):
with self.assertRaises(User.DoesNotExist):
self.user2.refresh_from_db()
# @less_console_noise_decorator
@less_console_noise_decorator
def test_transfer_user_throws_transfer_and_delete_success_messages(self):
"""Test that success messages for data transfer and user deletion are displayed."""
# Ensure the setup for VerifiedByStaff
@ -2892,8 +2891,8 @@ class TestTransferUser(WebTest):
mock_success_message.assert_any_call(
ANY,
(
"Data transferred successfully for the following objects: ['Changed requestor "
+ 'from "Furiosa Jabassa " to "Max Rokatanski " on immortan.joe@citadel.com\']'
"Data transferred successfully for the following objects: ['Transferred requestor "
+ "from Furiosa Jabassa to Max Rokatanski ']"
),
)

View file

@ -1,6 +1,6 @@
import logging
from django.db import transaction
from django.db.models import Manager,ForeignKey, OneToOneField, ManyToManyField, ManyToOneRel
from django.db.models import Manager, ForeignKey, OneToOneField, ManyToManyField, ManyToOneRel
from django.shortcuts import render, get_object_or_404, redirect
from django.views import View
@ -59,8 +59,7 @@ class TransferUserView(View):
return render(request, "admin/transfer_user.html", context)
def post(self, request, user_id):
"""This handles the transfer from selected_user to current_user then deletes selected_user.
"""
"""This handles the transfer from selected_user to current_user then deletes selected_user."""
current_user = get_object_or_404(User, pk=user_id)
selected_user_id = request.POST.get("selected_user")
selected_user = get_object_or_404(User, pk=selected_user_id)
@ -73,7 +72,7 @@ class TransferUserView(View):
self._delete_duplicate_user_domain_roles_and_log(selected_user, current_user, change_logs)
self._delete_duplicate_user_portfolio_permissions_and_log(selected_user, current_user, change_logs)
# Dynamically handle related fields
# Dynamically handle related fields
self.transfer_related_fields_and_log(selected_user, current_user, change_logs)
# Success message if any related objects were updated
@ -92,19 +91,20 @@ class TransferUserView(View):
logger.error(f"An error occurred during the transfer: {e}", exc_info=True)
return redirect("admin:registrar_user_change", object_id=user_id)
def _delete_duplicate_user_portfolio_permissions_and_log(self, selected_user, current_user, change_logs):
"""
Check and remove duplicate UserPortfolioPermission objects from the selected_user based on portfolios associated with the current_user.
"""
try:
# Fetch portfolios associated with the current_user
current_user_portfolios = UserPortfolioPermission.objects.filter(user=current_user).values_list('portfolio_id', flat=True)
current_user_portfolios = UserPortfolioPermission.objects.filter(user=current_user).values_list(
"portfolio_id", flat=True
)
# Identify duplicates in selected_user for these portfolios
duplicates = (
UserPortfolioPermission.objects
.filter(user=selected_user, portfolio_id__in=current_user_portfolios)
duplicates = UserPortfolioPermission.objects.filter(
user=selected_user, portfolio_id__in=current_user_portfolios
)
duplicate_count = duplicates.count()
@ -115,9 +115,13 @@ class TransferUserView(View):
logger.debug(f"Duplicate permissions to be removed: {duplicate_permissions}")
duplicates.delete()
logger.info(f"Removed {duplicate_count} duplicate UserPortfolioPermission(s) from user_id {selected_user.id} for portfolios already associated with user_id {current_user.id}")
change_logs.append(f"Removed {duplicate_count} duplicate UserPortfolioPermission(s) from user_id {selected_user.id} for portfolios already associated with user_id {current_user.id}")
logger.info(
f"Removed {duplicate_count} duplicate UserPortfolioPermission(s) from user_id {selected_user.id} for portfolios already associated with user_id {current_user.id}"
)
change_logs.append(
f"Removed {duplicate_count} duplicate UserPortfolioPermission(s) from user_id {selected_user.id} for portfolios already associated with user_id {current_user.id}"
)
except Exception as e:
logger.error(f"Failed to check and remove duplicate UserPortfolioPermissions: {e}", exc_info=True)
raise
@ -130,13 +134,10 @@ class TransferUserView(View):
try:
# Fetch domains associated with the current_user
current_user_domains = UserDomainRole.objects.filter(user=current_user).values_list('domain_id', flat=True)
current_user_domains = UserDomainRole.objects.filter(user=current_user).values_list("domain_id", flat=True)
# Identify duplicates in selected_user for these domains
duplicates = (
UserDomainRole.objects
.filter(user=selected_user, domain_id__in=current_user_domains)
)
duplicates = UserDomainRole.objects.filter(user=selected_user, domain_id__in=current_user_domains)
duplicate_count = duplicates.count()
@ -150,7 +151,7 @@ class TransferUserView(View):
f"Removed {duplicate_count} duplicate UserDomainRole(s) from user_id {selected_user.id} "
f"for domains already associated with user_id {current_user.id}"
)
except Exception as e:
logger.error(f"Failed to check and remove duplicate UserDomainRoles: {e}", exc_info=True)
raise
@ -187,65 +188,48 @@ class TransferUserView(View):
def _handle_foreign_key(self, related_field: ForeignKey, selected_user, current_user, change_logs):
related_name = related_field.get_accessor_name()
# for foreign key relationships, getattr returns a manager
related_manager = getattr(selected_user, related_name, None)
if related_manager:
# get all the related objects
if related_manager.count() > 0:
related_queryset = related_manager.all()
for obj in related_queryset:
# set the foreign key to the current user
setattr(obj, related_field.field.name, current_user)
obj.save()
log_entry = f'Transferred {related_field.field.name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
self.log_change(selected_user, current_user, related_field.field.name, change_logs)
def _handle_one_to_one(self, related_field: OneToOneField, selected_user, current_user, change_logs):
related_name = related_field.get_accessor_name()
# for one to one relationships, getattr returns the related object
related_object = getattr(selected_user, related_name, None)
if related_object:
# set the one to one field to the current user
setattr(related_object, related_field.field.name, current_user)
related_object.save()
log_entry = f'Transferred {related_field.field.name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
self.log_change(selected_user, current_user, related_field.field.name, change_logs)
def _handle_many_to_many(self, related_field: ManyToManyField, selected_user, current_user, change_logs):
# for many to many relationships, getattr returns a manager
related_manager = getattr(selected_user, related_field.name, None)
if related_manager:
# get all the related objects
if related_manager.count() > 0:
related_queryset = related_manager.all()
# add the related objects to the current user
getattr(current_user, related_field.name).add(*related_queryset)
log_entry = f'Transferred {related_field.name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
self.log_change(selected_user, current_user, related_field.name, change_logs)
def _handle_many_to_one_rel(self, related_object: ManyToOneRel, selected_user: User, current_user: User, change_logs: List[str]):
"""
Handles ManyToOneRel relationships, where multiple objects relate to the User via a ForeignKey.
"""
def _handle_many_to_one_rel(
self, related_object: ManyToOneRel, selected_user: User, current_user: User, change_logs: List[str]
):
related_model = related_object.related_model
related_name = related_object.field.name
# for many to one relationships, we need a queryset
related_queryset = related_model.objects.filter(**{related_name: selected_user})
for obj in related_queryset:
setattr(obj, related_name, current_user)
obj.save()
log_entry = f'Transferred {related_name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
def _handle_one_to_one_reverse(self, related_object: OneToOneField, selected_user: User, current_user: User, change_logs: List[str]):
"""
Handles reverse OneToOneField relationships.
"""
if related_queryset.count() > 0:
for obj in related_queryset:
setattr(obj, related_name, current_user)
obj.save()
self.log_change(selected_user, current_user, related_name, change_logs)
def _handle_one_to_one_reverse(
self, related_object: OneToOneField, selected_user: User, current_user: User, change_logs: List[str]
):
related_model = related_object.related_model
field_name = related_object.field.name
@ -253,41 +237,40 @@ class TransferUserView(View):
related_instance = related_model.objects.filter(**{field_name: selected_user}).first()
setattr(related_instance, field_name, current_user)
related_instance.save()
log_entry = f'Transferred {field_name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
self.log_change(selected_user, current_user, field_name, change_logs)
except related_model.DoesNotExist:
logger.warning(f"No related instance found for reverse OneToOneField {field_name} for {selected_user}")
def _handle_foreign_key_reverse(self, related_object: ForeignKey, selected_user: User, current_user: User, change_logs: List[str]):
"""
Handles reverse ForeignKey relationships.
"""
def _handle_foreign_key_reverse(
self, related_object: ForeignKey, selected_user: User, current_user: User, change_logs: List[str]
):
related_model = related_object.related_model
field_name = related_object.field.name
related_queryset = related_model.objects.filter(**{field_name: selected_user})
for obj in related_queryset:
setattr(obj, field_name, current_user)
obj.save()
log_entry = f'Transferred {field_name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
def _handle_many_to_many_reverse(self, related_object: ManyToManyField, selected_user: User, current_user: User, change_logs: List[str]):
"""
Handles reverse ManyToManyField relationships.
"""
if related_queryset.count() > 0:
for obj in related_queryset:
setattr(obj, field_name, current_user)
obj.save()
self.log_change(selected_user, current_user, field_name, change_logs)
def _handle_many_to_many_reverse(
self, related_object: ManyToManyField, selected_user: User, current_user: User, change_logs: List[str]
):
related_model = related_object.related_model
field_name = related_object.field.name
related_manager = related_model.objects.filter(**{field_name: selected_user})
if related_manager:
related_qs = related_manager.all()
getattr(current_user, field_name).add(*related_qs)
log_entry = f'Transferred {field_name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
related_queryset = related_model.objects.filter(**{field_name: selected_user})
if related_queryset.count() > 0:
getattr(current_user, field_name).add(*related_queryset)
self.log_change(selected_user, current_user, field_name, change_logs)
@classmethod
def log_change(cls, selected_user, current_user, field_name, change_logs):
log_entry = f"Transferred {field_name} from {selected_user} to {current_user}"
logger.info(log_entry)
change_logs.append(log_entry)
@classmethod
def get_domains(cls, user):