diff --git a/src/registrar/management/commands/transfer_federal_agency.py b/src/registrar/management/commands/transfer_federal_agency.py new file mode 100644 index 000000000..3be7d452f --- /dev/null +++ b/src/registrar/management/commands/transfer_federal_agency.py @@ -0,0 +1,35 @@ +import logging +from django.core.management import BaseCommand +from registrar.management.commands.utility.terminal_helper import PopulateScriptTemplate, TerminalColors +from registrar.models import FederalAgency, DomainRequest +from registrar.models.utility.generic_helper import convert_queryset_to_dict + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand, PopulateScriptTemplate): + help = "Loops through each valid User object and updates its verification_type value" + prompt_title = "Do you wish to update all Federal Agencies?" + + def handle(self, **kwargs): + """Loops through each valid User object and updates its verification_type value""" + + # Get all existing domain requests + self.all_domain_requests = DomainRequest.objects.select_related("federal_agency").distinct() + + filter_condition = { + "agency__isnull": False, + } + updated_fields = ["federal_type"] + self.mass_update_records(FederalAgency, filter_condition, updated_fields) + + def update_record(self, record: FederalAgency): + """Defines how we update the federal_type field""" + request = self.all_domain_requests.filter(federal_agency__agency=record.agency).first() + record.federal_type = request.federal_type + + def should_skip_record(self, record) -> bool: # noqa + """Defines the conditions in which we should skip updating a record.""" + request = self.all_domain_requests.filter(federal_agency__agency=record.agency).first() + return not request or not request.federal_agency + diff --git a/src/registrar/management/commands/utility/terminal_helper.py b/src/registrar/management/commands/utility/terminal_helper.py index db3e4a9d3..bc5b0cc8c 100644 --- a/src/registrar/management/commands/utility/terminal_helper.py +++ b/src/registrar/management/commands/utility/terminal_helper.py @@ -64,53 +64,77 @@ class PopulateScriptTemplate(ABC): Contains an ABC for generic populate scripts """ - def mass_populate_field(self, sender, filter_conditions, fields_to_update): - """Loops through each valid "sender" object - specified by filter_conditions - and - updates fields defined by fields_to_update using populate_function. + prompt_title = "Do you wish to proceed?" - You must define populate_field before you can use this function. + def mass_update_records(self, sender, filter_conditions, fields_to_update, debug=True): + """Loops through each valid "sender" object - specified by filter_conditions - and + updates fields defined by fields_to_update using update_record. + + You must define update_record before you can use this function. """ - objects = sender.objects.filter(**filter_conditions) + records = sender.objects.filter(**filter_conditions) + readable_class_name = self.get_class_name(sender) # Code execution will stop here if the user prompts "N" TerminalHelper.prompt_for_execution( system_exit_on_terminate=True, info_to_inspect=f""" ==Proposed Changes== - Number of {sender} objects to change: {len(objects)} + Number of {readable_class_name} objects to change: {len(records)} These fields will be updated on each record: {fields_to_update} """, - prompt_title="Do you wish to patch this data?", + prompt_title=self.prompt_title, ) logger.info("Updating...") to_update: List[sender] = [] + to_skip: List[sender] = [] failed_to_update: List[sender] = [] - for updated_object in objects: + for record in records: try: - self.populate_field(updated_object) - to_update.append(updated_object) + if not self.should_skip_record(record): + self.update_record(record) + to_update.append(record) + else: + to_skip.append(record) except Exception as err: - failed_to_update.append(updated_object) + fail_message = self.get_failure_message(record) + failed_to_update.append(record) logger.error(err) - logger.error(f"{TerminalColors.FAIL}" f"Failed to update {updated_object}" f"{TerminalColors.ENDC}") + logger.error(fail_message) - # Do a bulk update on the first_ready field + # Do a bulk update on the desired field ScriptDataHelper.bulk_update_fields(sender, to_update, fields_to_update) # Log what happened - TerminalHelper.log_script_run_summary(to_update, failed_to_update, skipped=[], debug=True) + TerminalHelper.log_script_run_summary(to_update, failed_to_update, to_skip, debug, display_as_str=True) + + def get_class_name(self, sender) -> str: + """Returns the class name that we want to display for the terminal prompt. + Example: DomainRequest => "Domain Request" + """ + return sender._meta.verbose_name if getattr(sender, "_meta") else sender + + def get_failure_message(self, record) -> str: + """Returns the message that we will display if a record fails to update""" + return f"{TerminalColors.FAIL}" f"Failed to update {record}" f"{TerminalColors.ENDC}" + + def should_skip_record(self, record) -> bool: # noqa + """Defines the conditions in which we should skip updating a record.""" + # By default - don't skip + return False @abstractmethod - def populate_field(self, field_to_update): + def update_record(self, record): """Defines how we update each field. Must be defined before using mass_populate_field.""" raise NotImplementedError + class TerminalHelper: @staticmethod - def log_script_run_summary(to_update, failed_to_update, skipped, debug: bool, log_header=None): + def log_script_run_summary(to_update, failed_to_update, skipped, debug: bool, log_header=None, display_as_str=False): """Prints success, failed, and skipped counts, as well as all affected objects.""" update_success_count = len(to_update) @@ -121,20 +145,24 @@ class TerminalHelper: log_header = "============= FINISHED ===============" # Prepare debug messages - debug_messages = { - "success": (f"{TerminalColors.OKCYAN}Updated: {to_update}{TerminalColors.ENDC}\n"), - "skipped": (f"{TerminalColors.YELLOW}Skipped: {skipped}{TerminalColors.ENDC}\n"), - "failed": (f"{TerminalColors.FAIL}Failed: {failed_to_update}{TerminalColors.ENDC}\n"), - } + if debug: + updated_display = [str(u) for u in to_update] if display_as_str else to_update + skipped_display = [str(s) for s in skipped] if display_as_str else skipped + failed_display = [str(f) for f in failed_to_update] if display_as_str else failed_to_update + debug_messages = { + "success": (f"{TerminalColors.OKCYAN}Updated: {updated_display}{TerminalColors.ENDC}\n"), + "skipped": (f"{TerminalColors.YELLOW}Skipped: {skipped_display}{TerminalColors.ENDC}\n"), + "failed": (f"{TerminalColors.FAIL}Failed: {failed_display}{TerminalColors.ENDC}\n"), + } - # Print out a list of everything that was changed, if we have any changes to log. - # Otherwise, don't print anything. - TerminalHelper.print_conditional( - debug, - f"{debug_messages.get('success') if update_success_count > 0 else ''}" - f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}" - f"{debug_messages.get('failed') if update_failed_count > 0 else ''}", - ) + # Print out a list of everything that was changed, if we have any changes to log. + # Otherwise, don't print anything. + TerminalHelper.print_conditional( + debug, + f"{debug_messages.get('success') if update_success_count > 0 else ''}" + f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}" + f"{debug_messages.get('failed') if update_failed_count > 0 else ''}", + ) if update_failed_count == 0 and update_skipped_count == 0: logger.info(