mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-08-05 01:11:55 +02:00
refactor for readability
This commit is contained in:
parent
ec8e8b1f65
commit
0958eda7cc
1 changed files with 81 additions and 67 deletions
|
@ -281,80 +281,24 @@ class UserFixture:
|
|||
"""Loads the users into the database and assigns them to the specified group."""
|
||||
logger.info(f"Going to load {len(users)} users for group {group_name}")
|
||||
|
||||
# Step 1: Fetch the group
|
||||
group = UserGroup.objects.get(name=group_name)
|
||||
|
||||
# Prepare sets of existing usernames and IDs in one query
|
||||
user_identifiers = [(user.get("username"), user.get("id")) for user in users]
|
||||
existing_users = User.objects.filter(
|
||||
username__in=[user[0] for user in user_identifiers] + [user[1] for user in user_identifiers]
|
||||
).values_list("username", "id")
|
||||
# Step 2: Identify new and existing users
|
||||
existing_usernames, existing_user_ids = cls._get_existing_users(users)
|
||||
new_users = cls._prepare_new_users(users, existing_usernames, existing_user_ids, are_superusers)
|
||||
|
||||
existing_usernames = set(user[0] for user in existing_users)
|
||||
existing_user_ids = set(user[1] for user in existing_users)
|
||||
|
||||
# Filter out users with existing IDs or usernames
|
||||
new_users = [
|
||||
User(
|
||||
id=user_data.get("id"),
|
||||
first_name=user_data.get("first_name"),
|
||||
last_name=user_data.get("last_name"),
|
||||
username=user_data.get("username"),
|
||||
email=user_data.get("email", ""),
|
||||
title=user_data.get("title", "Peon"),
|
||||
phone=user_data.get("phone", "2022222222"),
|
||||
is_active=user_data.get("is_active", True),
|
||||
is_staff=True,
|
||||
is_superuser=are_superusers,
|
||||
)
|
||||
for user_data in users
|
||||
if user_data.get("username") not in existing_usernames and user_data.get("id") not in existing_user_ids
|
||||
]
|
||||
|
||||
# Perform bulk creation for new users
|
||||
if new_users:
|
||||
try:
|
||||
User.objects.bulk_create(new_users)
|
||||
logger.info(f"Created {len(new_users)} new users.")
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error during user bulk creation: {e}")
|
||||
else:
|
||||
logger.info("No new users to create.")
|
||||
# Step 3: Create new users
|
||||
cls._create_new_users(new_users)
|
||||
|
||||
# Step 4: Update existing users
|
||||
# Get all users to be updated (both new and existing)
|
||||
created_or_existing_users = User.objects.filter(username__in=[user.get("username") for user in users])
|
||||
users_to_update = cls._get_users_to_update(created_or_existing_users)
|
||||
cls._update_existing_users(users_to_update)
|
||||
|
||||
# Update `is_staff` for existing users if necessary
|
||||
users_to_update = []
|
||||
for user in created_or_existing_users:
|
||||
updated = False
|
||||
|
||||
if not user.title:
|
||||
user.title = "Peon"
|
||||
updated = True
|
||||
|
||||
if not user.phone:
|
||||
user.phone = "2022222222"
|
||||
updated = True
|
||||
|
||||
if not user.is_staff:
|
||||
user.is_staff = True
|
||||
updated = True
|
||||
|
||||
# Only append the user if any of the fields were updated
|
||||
if updated:
|
||||
users_to_update.append(user)
|
||||
|
||||
# Save any users that were updated
|
||||
if users_to_update:
|
||||
User.objects.bulk_update(users_to_update, ["is_staff"])
|
||||
logger.info(f"Updated {len(users_to_update)} existing users to have is_staff=True.")
|
||||
|
||||
# Filter out users who are already in the group
|
||||
users_not_in_group = created_or_existing_users.exclude(groups__id=group.id)
|
||||
|
||||
# Add only users who are not already in the group
|
||||
if users_not_in_group.exists():
|
||||
group.user_set.add(*users_not_in_group)
|
||||
# Step 5: Assign users to the group
|
||||
cls._assign_users_to_group(group, created_or_existing_users)
|
||||
|
||||
logger.info(f"Users loaded for group {group_name}.")
|
||||
|
||||
|
@ -386,6 +330,76 @@ class UserFixture:
|
|||
else:
|
||||
logger.info("No allowed emails to load")
|
||||
|
||||
@staticmethod
|
||||
def _get_existing_users(users):
|
||||
user_identifiers = [(user.get("username"), user.get("id")) for user in users]
|
||||
existing_users = User.objects.filter(
|
||||
username__in=[user[0] for user in user_identifiers] + [user[1] for user in user_identifiers]
|
||||
).values_list("username", "id")
|
||||
existing_usernames = set(user[0] for user in existing_users)
|
||||
existing_user_ids = set(user[1] for user in existing_users)
|
||||
return existing_usernames, existing_user_ids
|
||||
|
||||
@staticmethod
|
||||
def _prepare_new_users(users, existing_usernames, existing_user_ids, are_superusers):
|
||||
return [
|
||||
User(
|
||||
id=user_data.get("id"),
|
||||
first_name=user_data.get("first_name"),
|
||||
last_name=user_data.get("last_name"),
|
||||
username=user_data.get("username"),
|
||||
email=user_data.get("email", ""),
|
||||
title=user_data.get("title", "Peon"),
|
||||
phone=user_data.get("phone", "2022222222"),
|
||||
is_active=user_data.get("is_active", True),
|
||||
is_staff=True,
|
||||
is_superuser=are_superusers,
|
||||
)
|
||||
for user_data in users
|
||||
if user_data.get("username") not in existing_usernames and user_data.get("id") not in existing_user_ids
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def _create_new_users(new_users):
|
||||
if new_users:
|
||||
try:
|
||||
User.objects.bulk_create(new_users)
|
||||
logger.info(f"Created {len(new_users)} new users.")
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error during user bulk creation: {e}")
|
||||
else:
|
||||
logger.info("No new users to create.")
|
||||
|
||||
@staticmethod
|
||||
def _get_users_to_update(users):
|
||||
users_to_update = []
|
||||
for user in users:
|
||||
updated = False
|
||||
if not user.title:
|
||||
user.title = "Peon"
|
||||
updated = True
|
||||
if not user.phone:
|
||||
user.phone = "2022222222"
|
||||
updated = True
|
||||
if not user.is_staff:
|
||||
user.is_staff = True
|
||||
updated = True
|
||||
if updated:
|
||||
users_to_update.append(user)
|
||||
return users_to_update
|
||||
|
||||
@staticmethod
|
||||
def _update_existing_users(users_to_update):
|
||||
if users_to_update:
|
||||
User.objects.bulk_update(users_to_update, ["is_staff", "title", "phone"])
|
||||
logger.info(f"Updated {len(users_to_update)} existing users.")
|
||||
|
||||
@staticmethod
|
||||
def _assign_users_to_group(group, users):
|
||||
users_not_in_group = users.exclude(groups__id=group.id)
|
||||
if users_not_in_group.exists():
|
||||
group.user_set.add(*users_not_in_group)
|
||||
|
||||
@classmethod
|
||||
def load(cls):
|
||||
with transaction.atomic():
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue