This commit is contained in:
zandercymatics 2024-06-17 14:43:06 -06:00
parent 51b89e4604
commit cdaf4afd2a
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
2 changed files with 92 additions and 29 deletions

View file

@ -0,0 +1,35 @@
import logging
from django.core.management import BaseCommand
from registrar.management.commands.utility.terminal_helper import PopulateScriptTemplate, TerminalColors
from registrar.models import FederalAgency, DomainRequest
from registrar.models.utility.generic_helper import convert_queryset_to_dict
logger = logging.getLogger(__name__)
class Command(BaseCommand, PopulateScriptTemplate):
help = "Loops through each valid User object and updates its verification_type value"
prompt_title = "Do you wish to update all Federal Agencies?"
def handle(self, **kwargs):
"""Loops through each valid User object and updates its verification_type value"""
# Get all existing domain requests
self.all_domain_requests = DomainRequest.objects.select_related("federal_agency").distinct()
filter_condition = {
"agency__isnull": False,
}
updated_fields = ["federal_type"]
self.mass_update_records(FederalAgency, filter_condition, updated_fields)
def update_record(self, record: FederalAgency):
"""Defines how we update the federal_type field"""
request = self.all_domain_requests.filter(federal_agency__agency=record.agency).first()
record.federal_type = request.federal_type
def should_skip_record(self, record) -> bool: # noqa
"""Defines the conditions in which we should skip updating a record."""
request = self.all_domain_requests.filter(federal_agency__agency=record.agency).first()
return not request or not request.federal_agency

View file

@ -64,53 +64,77 @@ class PopulateScriptTemplate(ABC):
Contains an ABC for generic populate scripts Contains an ABC for generic populate scripts
""" """
def mass_populate_field(self, sender, filter_conditions, fields_to_update): prompt_title = "Do you wish to proceed?"
"""Loops through each valid "sender" object - specified by filter_conditions - and
updates fields defined by fields_to_update using populate_function.
You must define populate_field before you can use this function. def mass_update_records(self, sender, filter_conditions, fields_to_update, debug=True):
"""Loops through each valid "sender" object - specified by filter_conditions - and
updates fields defined by fields_to_update using update_record.
You must define update_record before you can use this function.
""" """
objects = sender.objects.filter(**filter_conditions) records = sender.objects.filter(**filter_conditions)
readable_class_name = self.get_class_name(sender)
# Code execution will stop here if the user prompts "N" # Code execution will stop here if the user prompts "N"
TerminalHelper.prompt_for_execution( TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True, system_exit_on_terminate=True,
info_to_inspect=f""" info_to_inspect=f"""
==Proposed Changes== ==Proposed Changes==
Number of {sender} objects to change: {len(objects)} Number of {readable_class_name} objects to change: {len(records)}
These fields will be updated on each record: {fields_to_update} These fields will be updated on each record: {fields_to_update}
""", """,
prompt_title="Do you wish to patch this data?", prompt_title=self.prompt_title,
) )
logger.info("Updating...") logger.info("Updating...")
to_update: List[sender] = [] to_update: List[sender] = []
to_skip: List[sender] = []
failed_to_update: List[sender] = [] failed_to_update: List[sender] = []
for updated_object in objects: for record in records:
try: try:
self.populate_field(updated_object) if not self.should_skip_record(record):
to_update.append(updated_object) self.update_record(record)
to_update.append(record)
else:
to_skip.append(record)
except Exception as err: except Exception as err:
failed_to_update.append(updated_object) fail_message = self.get_failure_message(record)
failed_to_update.append(record)
logger.error(err) logger.error(err)
logger.error(f"{TerminalColors.FAIL}" f"Failed to update {updated_object}" f"{TerminalColors.ENDC}") logger.error(fail_message)
# Do a bulk update on the first_ready field # Do a bulk update on the desired field
ScriptDataHelper.bulk_update_fields(sender, to_update, fields_to_update) ScriptDataHelper.bulk_update_fields(sender, to_update, fields_to_update)
# Log what happened # Log what happened
TerminalHelper.log_script_run_summary(to_update, failed_to_update, skipped=[], debug=True) TerminalHelper.log_script_run_summary(to_update, failed_to_update, to_skip, debug, display_as_str=True)
def get_class_name(self, sender) -> str:
"""Returns the class name that we want to display for the terminal prompt.
Example: DomainRequest => "Domain Request"
"""
return sender._meta.verbose_name if getattr(sender, "_meta") else sender
def get_failure_message(self, record) -> str:
"""Returns the message that we will display if a record fails to update"""
return f"{TerminalColors.FAIL}" f"Failed to update {record}" f"{TerminalColors.ENDC}"
def should_skip_record(self, record) -> bool: # noqa
"""Defines the conditions in which we should skip updating a record."""
# By default - don't skip
return False
@abstractmethod @abstractmethod
def populate_field(self, field_to_update): def update_record(self, record):
"""Defines how we update each field. Must be defined before using mass_populate_field.""" """Defines how we update each field. Must be defined before using mass_populate_field."""
raise NotImplementedError raise NotImplementedError
class TerminalHelper: class TerminalHelper:
@staticmethod @staticmethod
def log_script_run_summary(to_update, failed_to_update, skipped, debug: bool, log_header=None): def log_script_run_summary(to_update, failed_to_update, skipped, debug: bool, log_header=None, display_as_str=False):
"""Prints success, failed, and skipped counts, as well as """Prints success, failed, and skipped counts, as well as
all affected objects.""" all affected objects."""
update_success_count = len(to_update) update_success_count = len(to_update)
@ -121,20 +145,24 @@ class TerminalHelper:
log_header = "============= FINISHED ===============" log_header = "============= FINISHED ==============="
# Prepare debug messages # Prepare debug messages
debug_messages = { if debug:
"success": (f"{TerminalColors.OKCYAN}Updated: {to_update}{TerminalColors.ENDC}\n"), updated_display = [str(u) for u in to_update] if display_as_str else to_update
"skipped": (f"{TerminalColors.YELLOW}Skipped: {skipped}{TerminalColors.ENDC}\n"), skipped_display = [str(s) for s in skipped] if display_as_str else skipped
"failed": (f"{TerminalColors.FAIL}Failed: {failed_to_update}{TerminalColors.ENDC}\n"), failed_display = [str(f) for f in failed_to_update] if display_as_str else failed_to_update
} debug_messages = {
"success": (f"{TerminalColors.OKCYAN}Updated: {updated_display}{TerminalColors.ENDC}\n"),
"skipped": (f"{TerminalColors.YELLOW}Skipped: {skipped_display}{TerminalColors.ENDC}\n"),
"failed": (f"{TerminalColors.FAIL}Failed: {failed_display}{TerminalColors.ENDC}\n"),
}
# Print out a list of everything that was changed, if we have any changes to log. # Print out a list of everything that was changed, if we have any changes to log.
# Otherwise, don't print anything. # Otherwise, don't print anything.
TerminalHelper.print_conditional( TerminalHelper.print_conditional(
debug, debug,
f"{debug_messages.get('success') if update_success_count > 0 else ''}" f"{debug_messages.get('success') if update_success_count > 0 else ''}"
f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}" f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}"
f"{debug_messages.get('failed') if update_failed_count > 0 else ''}", f"{debug_messages.get('failed') if update_failed_count > 0 else ''}",
) )
if update_failed_count == 0 and update_skipped_count == 0: if update_failed_count == 0 and update_skipped_count == 0:
logger.info( logger.info(