diff --git a/src/registrar/management/commands/populate_organization_type.py b/src/registrar/management/commands/populate_organization_type.py new file mode 100644 index 000000000..9e2b1bf6a --- /dev/null +++ b/src/registrar/management/commands/populate_organization_type.py @@ -0,0 +1,180 @@ +import argparse +import logging +import os +from typing import List +from django.core.management import BaseCommand +from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper, ScriptDataHelper +from registrar.models import DomainInformation, DomainRequest +from registrar.models.utility.generic_helper import CreateOrUpdateOrganizationTypeHelper + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Loops through each valid DomainInformation and DomainRequest object and updates its organization_type value" + + def __init__(self): + super().__init__() + # Get lists for DomainRequest + self.request_to_update: List[DomainRequest] = [] + self.request_failed_to_update: List[DomainRequest] = [] + self.request_skipped: List[DomainRequest] = [] + + # Get lists for DomainInformation + self.di_to_update: List[DomainInformation] = [] + self.di_failed_to_update: List[DomainInformation] = [] + self.di_skipped: List[DomainInformation] = [] + + # Define a global variable for all domains with election offices + self.domains_with_election_offices_set = set() + + def add_arguments(self, parser): + """Adds command line arguments""" + parser.add_argument("--debug", action=argparse.BooleanOptionalAction) + parser.add_argument( + "domain_election_office_filename", + help=("A JSON file that holds the location and filenames" "of all the data files used for migrations"), + ) + + def handle(self, domain_election_office_filename, **kwargs): + """Loops through each valid Domain object and updates its first_created value""" + debug = kwargs.get("debug") + + # Check if the provided file path is valid + if not os.path.isfile(domain_election_office_filename): + raise argparse.ArgumentTypeError(f"Invalid file path '{domain_election_office_filename}'") + + with open(domain_election_office_filename, "r") as file: + for line in file: + # Remove any leading/trailing whitespace + domain = line.strip() + if domain not in self.domains_with_election_offices_set: + self.domains_with_election_offices_set.add(domain) + + domain_requests = DomainRequest.objects.filter( + organization_type__isnull=True, requested_domain__name__isnull=False + ) + + # Code execution will stop here if the user prompts "N" + TerminalHelper.prompt_for_execution( + system_exit_on_terminate=True, + info_to_inspect=f""" + ==Proposed Changes== + Number of DomainRequest objects to change: {len(domain_requests)} + + Organization_type data will be added for all of these fields. + """, + prompt_title="Do you wish to process DomainRequest?", + ) + logger.info("Updating DomainRequest(s)...") + + self.update_domain_requests(domain_requests, debug) + + # We should actually be targeting all fields with no value for organization type, + # but do have a value for generic_org_type. This is because there is data that we can infer. + domain_infos = DomainInformation.objects.filter(organization_type__isnull=True) + # Code execution will stop here if the user prompts "N" + TerminalHelper.prompt_for_execution( + system_exit_on_terminate=True, + info_to_inspect=f""" + ==Proposed Changes== + Number of DomainInformation objects to change: {len(domain_infos)} + + Organization_type data will be added for all of these fields. + """, + prompt_title="Do you wish to process DomainInformation?", + ) + logger.info("Updating DomainInformation(s)...") + + self.update_domain_informations(domain_infos, debug) + + def update_domain_requests(self, domain_requests, debug): + for request in domain_requests: + try: + if request.generic_org_type is not None: + domain_name = request.requested_domain.name + request.is_election_board = domain_name in self.domains_with_election_offices_set + request = self.sync_organization_type(DomainRequest, request) + self.request_to_update.append(request) + + if debug: + logger.info(f"Updating {request} => {request.organization_type}") + else: + self.request_skipped.append(request) + if debug: + logger.warning(f"Skipped updating {request}. No generic_org_type was found.") + except Exception as err: + self.request_failed_to_update.append(request) + logger.error(err) + logger.error(f"{TerminalColors.FAIL}" f"Failed to update {request}" f"{TerminalColors.ENDC}") + + # Do a bulk update on the organization_type field + ScriptDataHelper.bulk_update_fields( + DomainRequest, self.request_to_update, ["organization_type", "is_election_board", "generic_org_type"] + ) + + # Log what happened + log_header = "============= FINISHED UPDATE FOR DOMAINREQUEST ===============" + TerminalHelper.log_script_run_summary( + self.request_to_update, self.request_failed_to_update, self.request_skipped, debug, log_header + ) + + def update_domain_informations(self, domain_informations, debug): + for info in domain_informations: + try: + if info.generic_org_type is not None: + domain_name = info.domain.name + info.is_election_board = domain_name in self.domains_with_election_offices_set + info = self.sync_organization_type(DomainInformation, info) + self.di_to_update.append(info) + if debug: + logger.info(f"Updating {info} => {info.organization_type}") + else: + self.di_skipped.append(info) + if debug: + logger.warning(f"Skipped updating {info}. No generic_org_type was found.") + except Exception as err: + self.di_failed_to_update.append(info) + logger.error(err) + logger.error(f"{TerminalColors.FAIL}" f"Failed to update {info}" f"{TerminalColors.ENDC}") + + # Do a bulk update on the organization_type field + ScriptDataHelper.bulk_update_fields( + DomainInformation, self.di_to_update, ["organization_type", "is_election_board", "generic_org_type"] + ) + + # Log what happened + log_header = "============= FINISHED UPDATE FOR DOMAININFORMATION ===============" + TerminalHelper.log_script_run_summary( + self.di_to_update, self.di_failed_to_update, self.di_skipped, debug, log_header + ) + + def sync_organization_type(self, sender, instance): + """ + Updates the organization_type (without saving) to match + the is_election_board and generic_organization_type fields. + """ + + # Define mappings between generic org and election org. + # These have to be defined here, as you'd get a cyclical import error + # otherwise. + + # For any given organization type, return the "_election" variant. + # For example: STATE_OR_TERRITORY => STATE_OR_TERRITORY_ELECTION + generic_org_map = DomainRequest.OrgChoicesElectionOffice.get_org_generic_to_org_election() + + # For any given "_election" variant, return the base org type. + # For example: STATE_OR_TERRITORY_ELECTION => STATE_OR_TERRITORY + election_org_map = DomainRequest.OrgChoicesElectionOffice.get_org_election_to_org_generic() + + # Manages the "organization_type" variable and keeps in sync with + # "is_election_office" and "generic_organization_type" + org_type_helper = CreateOrUpdateOrganizationTypeHelper( + sender=sender, + instance=instance, + generic_org_to_org_map=generic_org_map, + election_org_to_generic_org_map=election_org_map, + ) + + instance = org_type_helper.create_or_update_organization_type() + return instance diff --git a/src/registrar/management/commands/utility/terminal_helper.py b/src/registrar/management/commands/utility/terminal_helper.py index 49ab89b9a..b54209750 100644 --- a/src/registrar/management/commands/utility/terminal_helper.py +++ b/src/registrar/management/commands/utility/terminal_helper.py @@ -49,6 +49,7 @@ class ScriptDataHelper: Usage: bulk_update_fields(Domain, page.object_list, ["first_ready"]) """ + logger.info(f"{TerminalColors.YELLOW} Bulk updating fields... {TerminalColors.ENDC}") # Create a Paginator object. Bulk_update on the full dataset # is too memory intensive for our current app config, so we can chunk this data instead. paginator = Paginator(update_list, batch_size) @@ -59,13 +60,16 @@ class ScriptDataHelper: class TerminalHelper: @staticmethod - def log_script_run_summary(to_update, failed_to_update, skipped, debug: bool): + def log_script_run_summary(to_update, failed_to_update, skipped, debug: bool, log_header=None): """Prints success, failed, and skipped counts, as well as all affected objects.""" update_success_count = len(to_update) update_failed_count = len(failed_to_update) update_skipped_count = len(skipped) + if log_header is None: + log_header = "============= FINISHED ===============" + # Prepare debug messages debug_messages = { "success": (f"{TerminalColors.OKCYAN}Updated: {to_update}{TerminalColors.ENDC}\n"), @@ -85,7 +89,7 @@ class TerminalHelper: if update_failed_count == 0 and update_skipped_count == 0: logger.info( f"""{TerminalColors.OKGREEN} - ============= FINISHED =============== + {log_header} Updated {update_success_count} entries {TerminalColors.ENDC} """ @@ -93,7 +97,7 @@ class TerminalHelper: elif update_failed_count == 0: logger.warning( f"""{TerminalColors.YELLOW} - ============= FINISHED =============== + {log_header} Updated {update_success_count} entries ----- SOME DATA WAS INVALID (NEEDS MANUAL PATCHING) ----- Skipped updating {update_skipped_count} entries @@ -103,7 +107,7 @@ class TerminalHelper: else: logger.error( f"""{TerminalColors.FAIL} - ============= FINISHED =============== + {log_header} Updated {update_success_count} entries ----- UPDATE FAILED ----- Failed to update {update_failed_count} entries, diff --git a/src/registrar/models/domain_information.py b/src/registrar/models/domain_information.py index 2ed27504c..6bdc6c00d 100644 --- a/src/registrar/models/domain_information.py +++ b/src/registrar/models/domain_information.py @@ -236,8 +236,11 @@ class DomainInformation(TimeStampedModel): except Exception: return "" - def save(self, *args, **kwargs): - """Save override for custom properties""" + def sync_organization_type(self): + """ + Updates the organization_type (without saving) to match + the is_election_board and generic_organization_type fields. + """ # Define mappings between generic org and election org. # These have to be defined here, as you'd get a cyclical import error @@ -262,6 +265,12 @@ class DomainInformation(TimeStampedModel): # Actually updates the organization_type field org_type_helper.create_or_update_organization_type() + + return self + + def save(self, *args, **kwargs): + """Save override for custom properties""" + self.sync_organization_type() super().save(*args, **kwargs) @classmethod diff --git a/src/registrar/models/domain_request.py b/src/registrar/models/domain_request.py index bd529f7e6..1b8a519a0 100644 --- a/src/registrar/models/domain_request.py +++ b/src/registrar/models/domain_request.py @@ -666,9 +666,11 @@ class DomainRequest(TimeStampedModel): help_text="Notes about this request", ) - def save(self, *args, **kwargs): - """Save override for custom properties""" - + def sync_organization_type(self): + """ + Updates the organization_type (without saving) to match + the is_election_board and generic_organization_type fields. + """ # Define mappings between generic org and election org. # These have to be defined here, as you'd get a cyclical import error # otherwise. @@ -692,6 +694,10 @@ class DomainRequest(TimeStampedModel): # Actually updates the organization_type field org_type_helper.create_or_update_organization_type() + + def save(self, *args, **kwargs): + """Save override for custom properties""" + self.sync_organization_type() super().save(*args, **kwargs) def __str__(self): diff --git a/src/registrar/tests/data/fake_election_domains.csv b/src/registrar/tests/data/fake_election_domains.csv new file mode 100644 index 000000000..4ec005bb1 --- /dev/null +++ b/src/registrar/tests/data/fake_election_domains.csv @@ -0,0 +1 @@ +manualtransmission.gov \ No newline at end of file diff --git a/src/registrar/tests/test_management_scripts.py b/src/registrar/tests/test_management_scripts.py index 34178e262..26ec6fd1d 100644 --- a/src/registrar/tests/test_management_scripts.py +++ b/src/registrar/tests/test_management_scripts.py @@ -7,6 +7,9 @@ from django.test import TestCase from registrar.models import ( User, Domain, + DomainRequest, + Contact, + Website, DomainInvitation, TransitionDomain, DomainInformation, @@ -18,7 +21,246 @@ from django.core.management import call_command from unittest.mock import patch, call from epplibwrapper import commands, common -from .common import MockEppLib, less_console_noise +from .common import MockEppLib, less_console_noise, completed_domain_request +from api.tests.common import less_console_noise_decorator + + +class TestPopulateOrganizationType(MockEppLib): + """Tests for the populate_organization_type script""" + + def setUp(self): + """Creates a fake domain object""" + super().setUp() + + # Get the domain requests + self.domain_request_1 = completed_domain_request( + name="lasers.gov", + generic_org_type=DomainRequest.OrganizationChoices.FEDERAL, + is_election_board=True, + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + ) + self.domain_request_2 = completed_domain_request( + name="readysetgo.gov", + generic_org_type=DomainRequest.OrganizationChoices.CITY, + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + ) + self.domain_request_3 = completed_domain_request( + name="manualtransmission.gov", + generic_org_type=DomainRequest.OrganizationChoices.TRIBAL, + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + ) + self.domain_request_4 = completed_domain_request( + name="saladandfries.gov", + generic_org_type=DomainRequest.OrganizationChoices.TRIBAL, + is_election_board=True, + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + ) + + # Approve all three requests + self.domain_request_1.approve() + self.domain_request_2.approve() + self.domain_request_3.approve() + self.domain_request_4.approve() + + # Get the domains + self.domain_1 = Domain.objects.get(name="lasers.gov") + self.domain_2 = Domain.objects.get(name="readysetgo.gov") + self.domain_3 = Domain.objects.get(name="manualtransmission.gov") + self.domain_4 = Domain.objects.get(name="saladandfries.gov") + + # Get the domain infos + self.domain_info_1 = DomainInformation.objects.get(domain=self.domain_1) + self.domain_info_2 = DomainInformation.objects.get(domain=self.domain_2) + self.domain_info_3 = DomainInformation.objects.get(domain=self.domain_3) + self.domain_info_4 = DomainInformation.objects.get(domain=self.domain_4) + + def tearDown(self): + """Deletes all DB objects related to migrations""" + super().tearDown() + + # Delete domains and related information + Domain.objects.all().delete() + DomainInformation.objects.all().delete() + DomainRequest.objects.all().delete() + User.objects.all().delete() + Contact.objects.all().delete() + Website.objects.all().delete() + + @less_console_noise_decorator + def run_populate_organization_type(self): + """ + This method executes the populate_organization_type command. + + The 'call_command' function from Django's management framework is then used to + execute the populate_organization_type command with the specified arguments. + """ + with patch( + "registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa + return_value=True, + ): + call_command("populate_organization_type", "registrar/tests/data/fake_election_domains.csv", debug=True) + + def assert_expected_org_values_on_request_and_info( + self, + domain_request: DomainRequest, + domain_info: DomainInformation, + expected_values: dict, + ): + """ + This is a a helper function that ensures that: + 1. DomainRequest and DomainInformation (on given objects) are equivalent + 2. That generic_org_type, is_election_board, and organization_type are equal to passed in values + """ + + # Test domain request + with self.subTest(field="DomainRequest"): + self.assertEqual(domain_request.generic_org_type, expected_values["generic_org_type"]) + self.assertEqual(domain_request.is_election_board, expected_values["is_election_board"]) + self.assertEqual(domain_request.organization_type, expected_values["organization_type"]) + + # Test domain info + with self.subTest(field="DomainInformation"): + self.assertEqual(domain_info.generic_org_type, expected_values["generic_org_type"]) + self.assertEqual(domain_info.is_election_board, expected_values["is_election_board"]) + self.assertEqual(domain_info.organization_type, expected_values["organization_type"]) + + def test_request_and_info_city_not_in_csv(self): + """Tests what happens to a city domain that is not defined in the CSV""" + city_request = self.domain_request_2 + city_info = self.domain_request_2 + + # Make sure that all data is correct before proceeding. + # Since the presave fixture is in effect, we should expect that + # is_election_board is equal to none, even though we tried to define it as "True" + expected_values = { + "is_election_board": False, + "generic_org_type": DomainRequest.OrganizationChoices.CITY, + "organization_type": DomainRequest.OrgChoicesElectionOffice.CITY, + } + self.assert_expected_org_values_on_request_and_info(city_request, city_info, expected_values) + + # Run the populate script + try: + self.run_populate_organization_type() + except Exception as e: + self.fail(f"Could not run populate_organization_type script. Failed with exception: {e}") + + # All values should be the same + self.assert_expected_org_values_on_request_and_info(city_request, city_info, expected_values) + + def test_request_and_info_federal(self): + """Tests what happens to a federal domain after the script is run (should be unchanged)""" + federal_request = self.domain_request_1 + federal_info = self.domain_info_1 + + # Make sure that all data is correct before proceeding. + # Since the presave fixture is in effect, we should expect that + # is_election_board is equal to none, even though we tried to define it as "True" + expected_values = { + "is_election_board": None, + "generic_org_type": DomainRequest.OrganizationChoices.FEDERAL, + "organization_type": DomainRequest.OrgChoicesElectionOffice.FEDERAL, + } + self.assert_expected_org_values_on_request_and_info(federal_request, federal_info, expected_values) + + # Run the populate script + try: + self.run_populate_organization_type() + except Exception as e: + self.fail(f"Could not run populate_organization_type script. Failed with exception: {e}") + + # All values should be the same + self.assert_expected_org_values_on_request_and_info(federal_request, federal_info, expected_values) + + def do_nothing(self): + """Does nothing for mocking purposes""" + pass + + def test_request_and_info_tribal_add_election_office(self): + """ + Tests if a tribal domain in the election csv changes organization_type to TRIBAL - ELECTION + for the domain request and the domain info + """ + + # Set org type fields to none to mimic an environment without this data + tribal_request = self.domain_request_3 + tribal_request.organization_type = None + tribal_info = self.domain_info_3 + tribal_info.organization_type = None + with patch.object(DomainRequest, "sync_organization_type", self.do_nothing): + with patch.object(DomainInformation, "sync_organization_type", self.do_nothing): + tribal_request.save() + tribal_info.save() + + # Make sure that all data is correct before proceeding. + expected_values = { + "is_election_board": False, + "generic_org_type": DomainRequest.OrganizationChoices.TRIBAL, + "organization_type": None, + } + self.assert_expected_org_values_on_request_and_info(tribal_request, tribal_info, expected_values) + + # Run the populate script + try: + self.run_populate_organization_type() + except Exception as e: + self.fail(f"Could not run populate_organization_type script. Failed with exception: {e}") + + tribal_request.refresh_from_db() + tribal_info.refresh_from_db() + + # Because we define this in the "csv", we expect that is election board will switch to True, + # and organization_type will now be tribal_election + expected_values["is_election_board"] = True + expected_values["organization_type"] = DomainRequest.OrgChoicesElectionOffice.TRIBAL_ELECTION + + self.assert_expected_org_values_on_request_and_info(tribal_request, tribal_info, expected_values) + + def test_request_and_info_tribal_remove_election_office(self): + """ + Tests if a tribal domain in the election csv changes organization_type to TRIBAL + when it used to be TRIBAL - ELECTION + for the domain request and the domain info + """ + + # Set org type fields to none to mimic an environment without this data + tribal_election_request = self.domain_request_4 + tribal_election_info = self.domain_info_4 + tribal_election_request.organization_type = None + tribal_election_info.organization_type = None + with patch.object(DomainRequest, "sync_organization_type", self.do_nothing): + with patch.object(DomainInformation, "sync_organization_type", self.do_nothing): + tribal_election_request.save() + tribal_election_info.save() + + # Make sure that all data is correct before proceeding. + # Because the presave fixture is in place when creating this, we should expect that the + # organization_type variable is already pre-populated. We will test what happens when + # it is not in another test. + expected_values = { + "is_election_board": True, + "generic_org_type": DomainRequest.OrganizationChoices.TRIBAL, + "organization_type": None, + } + self.assert_expected_org_values_on_request_and_info( + tribal_election_request, tribal_election_info, expected_values + ) + + # Run the populate script + try: + self.run_populate_organization_type() + except Exception as e: + self.fail(f"Could not run populate_organization_type script. Failed with exception: {e}") + + # Because we don't define this in the "csv", we expect that is election board will switch to False, + # and organization_type will now be tribal + expected_values["is_election_board"] = False + expected_values["organization_type"] = DomainRequest.OrgChoicesElectionOffice.TRIBAL + tribal_election_request.refresh_from_db() + tribal_election_info.refresh_from_db() + self.assert_expected_org_values_on_request_and_info( + tribal_election_request, tribal_election_info, expected_values + ) class TestPopulateFirstReady(TestCase):