Add faster copy script

This commit is contained in:
zandercymatics 2024-05-28 10:39:01 -06:00
parent e70fb3ae6b
commit ddd3ed450c
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
3 changed files with 72 additions and 14 deletions

View file

@ -0,0 +1,50 @@
import logging
from django.core.management import BaseCommand
from registrar.management.commands.utility.terminal_helper import PopulateScriptTemplate, TerminalColors
from registrar.models import User, Contact
from registrar.models.utility.domain_helper import DomainHelper
logger = logging.getLogger(__name__)
class Command(BaseCommand, PopulateScriptTemplate):
help = "Loops through each valid User object and updates its verification_type value"
# Get the fields that exist on both User and Contact. Excludes id.
common_fields = DomainHelper.get_common_fields(User, Contact)
def handle(self, **kwargs):
"""Loops through each valid User object and updates its verification_type value"""
# Don't change the email field.
if "email" in self.common_fields:
self.common_fields.remove("email")
filter_condition = {"contact__isnull": False}
self.mass_populate_field(User, filter_condition, self.common_fields)
skipped_users = User.objects.filter(contact__isnull=True)
if skipped_users and len(skipped_users) > 0:
logger.warning(
f"""{TerminalColors.YELLOW}
===== SKIPPED USERS =====
{list(skipped_users)}
{TerminalColors.ENDC}
""",
)
def populate_field(self, object_to_update):
"""Defines how we update the user field on mass_populate_field()"""
new_value = None
for field in self.common_fields:
# Grab the value that contact has stored for this field
new_value = getattr(object_to_update.contact, field)
# Set it on the user field
setattr(object_to_update, field, new_value)
logger.info(
f"{TerminalColors.OKCYAN}Updating {object_to_update}{TerminalColors.ENDC}"
)

View file

@ -14,10 +14,10 @@ class Command(BaseCommand, PopulateScriptTemplate):
filter_condition = {"verification_type__isnull": True}
self.mass_populate_field(User, filter_condition, ["verification_type"])
def populate_field(self, field_to_update):
def populate_field(self, object_to_update):
"""Defines how we update the verification_type field"""
field_to_update.set_user_verification_type()
object_to_update.set_user_verification_type()
logger.info(
f"{TerminalColors.OKCYAN}Updating {field_to_update} => "
f"{field_to_update.verification_type}{TerminalColors.OKCYAN}"
f"{TerminalColors.OKCYAN}Updating {object_to_update} => "
f"{object_to_update.verification_type}{TerminalColors.OKCYAN}"
)

View file

@ -64,14 +64,22 @@ class PopulateScriptTemplate(ABC):
Contains an ABC for generic populate scripts
"""
def mass_populate_field(self, sender, filter_conditions, fields_to_update):
def get_objects_to_update(self, sender, filter_conditions):
"""Given a model of type 'object', perform a filter operation on
filter_conditions and return the result.
For example: User.objects.filter(contact__isnull: False)
"""
return sender.objects.filter(**filter_conditions)
def mass_populate_field(self, sender, filter_conditions, objects_to_update):
"""Loops through each valid "sender" object - specified by filter_conditions - and
updates fields defined by fields_to_update using populate_function.
You must define populate_field before you can use this function.
"""
objects = sender.objects.filter(**filter_conditions)
objects = self.get_objects_to_update(sender, filter_conditions)
# Code execution will stop here if the user prompts "N"
TerminalHelper.prompt_for_execution(
@ -79,7 +87,7 @@ class PopulateScriptTemplate(ABC):
info_to_inspect=f"""
==Proposed Changes==
Number of {sender} objects to change: {len(objects)}
These fields will be updated on each record: {fields_to_update}
These fields will be updated on each record: {objects_to_update}
""",
prompt_title="Do you wish to patch this data?",
)
@ -87,23 +95,23 @@ class PopulateScriptTemplate(ABC):
to_update: List[sender] = []
failed_to_update: List[sender] = []
for updated_object in objects:
for item in objects:
try:
self.populate_field(updated_object)
to_update.append(updated_object)
self.populate_field(item)
to_update.append(item)
except Exception as err:
failed_to_update.append(updated_object)
failed_to_update.append(item)
logger.error(err)
logger.error(f"{TerminalColors.FAIL}" f"Failed to update {updated_object}" f"{TerminalColors.ENDC}")
logger.error(f"{TerminalColors.FAIL}" f"Failed to update {item}" f"{TerminalColors.ENDC}")
# Do a bulk update on the first_ready field
ScriptDataHelper.bulk_update_fields(sender, to_update, fields_to_update)
ScriptDataHelper.bulk_update_fields(sender, to_update, objects_to_update)
# Log what happened
TerminalHelper.log_script_run_summary(to_update, failed_to_update, skipped=[], debug=True)
@abstractmethod
def populate_field(self, field_to_update):
def populate_field(self, object_to_update):
"""Defines how we update each field. Must be defined before using mass_populate_field."""
raise NotImplementedError