rewrite script and extend terminal helper for more context

This commit is contained in:
matthewswspence 2024-08-22 14:30:14 -05:00
parent 1a4268dedd
commit 48901a716d
No known key found for this signature in database
GPG key ID: FB458202A7852BA4
2 changed files with 75 additions and 23 deletions

View file

@ -6,23 +6,22 @@ from registrar.models import Domain, TransitionDomain
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Command(BaseCommand, PopulateScriptTemplate): class Command(BaseCommand, PopulateScriptTemplate):
help = "Loops through eachdomain object and populates the last_status_update and first_submitted_date" help = "Loops through each domain object and populates the last_status_update and first_submitted_date"
def handle(self, **kwargs): def handle(self, **kwargs):
"""Loops through each valid Domain object and updates it's first_ready value if they are out of sync""" """Loops through each valid Domain object and updates it's first_ready value if it is out of sync"""
filter_conditions="state__in=[Domain.State.READY, Domain.State.ON_HOLD, Domain.State.DELETED]" filter_conditions={"state__in":[Domain.State.READY, Domain.State.ON_HOLD, Domain.State.DELETED]}
self.mass_update_records(Domain, filter_conditions, ["first_ready"]) self.mass_update_records(Domain, filter_conditions, ["first_ready"], verbose=True, custom_filter=self.should_update)
def update_record(self, record: Domain): def update_record(self, record: Domain):
"""Defines how we update the first_read field""" """Defines how we update the first_ready field"""
# if these are out of sync, update the # update the first_ready value based on the creation date.
if self.is_transition_domain(record) and record.first_ready != record.created_at:
record.first_ready = record.created_at record.first_ready = record.created_at
logger.info( logger.info(
f"{TerminalColors.OKCYAN}Updating {record} => first_ready: " f"{record.first_ready}{TerminalColors.OKCYAN}" f"{TerminalColors.OKCYAN}Updating {record} => first_ready: " f"{record.first_ready}{TerminalColors.OKCYAN}"
) )
# check if a transition domain object for this domain name exists # check if a transition domain object for this domain name exists, and if so whether
def is_transition_domain(record: Domain): def should_update(self, record: Domain) -> bool:
return TransitionDomain.objects.filter(domain_name=record.name).exists() return TransitionDomain.objects.filter(domain_name=record.name).exists() and record.first_ready != record.created_at

View file

@ -2,6 +2,7 @@ import logging
import sys import sys
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from django.core.paginator import Paginator from django.core.paginator import Paginator
from django.db.models import Model
from typing import List from typing import List
from registrar.utility.enums import LogCode from registrar.utility.enums import LogCode
@ -75,28 +76,74 @@ class PopulateScriptTemplate(ABC):
run_summary_header = None run_summary_header = None
@abstractmethod @abstractmethod
def update_record(self, record): def update_record(self, record: Model):
"""Defines how we update each field. Must be defined before using mass_update_records.""" """Defines how we update each field.
raises:
NotImplementedError: If not defined before calling mass_update_records.
"""
raise NotImplementedError raise NotImplementedError
def mass_update_records(self, object_class, filter_conditions, fields_to_update, debug=True): def mass_update_records(self, object_class: Model, filter_conditions, fields_to_update, debug=True, verbose=False, custom_filter=None):
"""Loops through each valid "object_class" object - specified by filter_conditions - and """Loops through each valid "object_class" object - specified by filter_conditions - and
updates fields defined by fields_to_update using update_record. updates fields defined by fields_to_update using update_record.
You must define update_record before you can use this function. Parameters:
object_class: The Django model class that you want to perform the bulk update on.
This should be the actual class, not a string of the class name.
filter_conditions: dictionary of valid Django Queryset filter conditions
(e.g. {'verification_type__isnull'=True}).
fields_to_update: List of strings specifying which fields to update.
(e.g. ["first_ready_date", "last_submitted_date"])
debug: Whether to log script run summary in debug mode.
Default: True.
verbose: Whether to print a detailed run summary *before* run confirmation.
Default: False.
custom_filter: function taking in a single record and returning a boolean representing whether
the record should be included of the final record set. Used to allow for filters that can't be
represented by django queryset field lookups. Applied *after* filter_conditions.
Raises:
NotImplementedError: If you do not define update_record before using this function.
TypeError: If custom_filter is not Callable.
""" """
records = object_class.objects.filter(**filter_conditions) records = object_class.objects.filter(**filter_conditions)
# apply custom filter
if custom_filter:
to_exclude_pks = []
for record in records:
try:
if not custom_filter(record):
to_exclude_pks.append(record.pk)
except TypeError as e:
logger.error(f"Error applying custom filter: {e}")
sys.exit()
records=records.exclude(pk__in=to_exclude_pks)
readable_class_name = self.get_class_name(object_class) readable_class_name = self.get_class_name(object_class)
# for use in the execution prompt.
proposed_changes=f"""==Proposed Changes==
Number of {readable_class_name} objects to change: {len(records)}
These fields will be updated on each record: {fields_to_update}
"""
if verbose:
proposed_changes=f"""{proposed_changes}
These records will be updated: {list(records.all())}
"""
# Code execution will stop here if the user prompts "N" # Code execution will stop here if the user prompts "N"
TerminalHelper.prompt_for_execution( TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True, system_exit_on_terminate=True,
info_to_inspect=f""" prompt_message=proposed_changes,
==Proposed Changes==
Number of {readable_class_name} objects to change: {len(records)}
These fields will be updated on each record: {fields_to_update}
""",
prompt_title=self.prompt_title, prompt_title=self.prompt_title,
) )
logger.info("Updating...") logger.info("Updating...")
@ -220,6 +267,9 @@ class TerminalHelper:
an answer is required of the user). an answer is required of the user).
The "answer" return value is True for "yes" or False for "no". The "answer" return value is True for "yes" or False for "no".
Raises:
ValueError: When "default" is not "yes", "no", or None.
""" """
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
if default is None: if default is None:
@ -244,6 +294,7 @@ class TerminalHelper:
@staticmethod @staticmethod
def query_yes_no_exit(question: str, default="yes"): def query_yes_no_exit(question: str, default="yes"):
"""Ask a yes/no question via raw_input() and return their answer. """Ask a yes/no question via raw_input() and return their answer.
Allows for answer "e" to exit.
"question" is a string that is presented to the user. "question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>. "default" is the presumed answer if the user just hits <Enter>.
@ -251,6 +302,9 @@ class TerminalHelper:
an answer is required of the user). an answer is required of the user).
The "answer" return value is True for "yes" or False for "no". The "answer" return value is True for "yes" or False for "no".
Raises:
ValueError: When "default" is not "yes", "no", or None.
""" """
valid = { valid = {
"yes": True, "yes": True,
@ -317,9 +371,8 @@ class TerminalHelper:
case _: case _:
logger.info(print_statement) logger.info(print_statement)
# TODO - "info_to_inspect" should be refactored to "prompt_message"
@staticmethod @staticmethod
def prompt_for_execution(system_exit_on_terminate: bool, info_to_inspect: str, prompt_title: str) -> bool: def prompt_for_execution(system_exit_on_terminate: bool, prompt_message: str, prompt_title: str) -> bool:
"""Create to reduce code complexity. """Create to reduce code complexity.
Prompts the user to inspect the given string Prompts the user to inspect the given string
and asks if they wish to proceed. and asks if they wish to proceed.
@ -340,7 +393,7 @@ class TerminalHelper:
===================================================== =====================================================
*** IMPORTANT: VERIFY THE FOLLOWING LOOKS CORRECT *** *** IMPORTANT: VERIFY THE FOLLOWING LOOKS CORRECT ***
{info_to_inspect} {prompt_message}
{TerminalColors.FAIL} {TerminalColors.FAIL}
Proceed? (Y = proceed, N = {action_description_for_selecting_no}) Proceed? (Y = proceed, N = {action_description_for_selecting_no})
{TerminalColors.ENDC}""" {TerminalColors.ENDC}"""