Merge branch 'za/1911-fill-new-org-type-column' into za/1816-domain-metadata-includes-organization-type

This commit is contained in:
zandercymatics 2024-04-10 08:40:26 -06:00
commit 272f13490d
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
6 changed files with 452 additions and 10 deletions

View file

@ -0,0 +1,180 @@
import argparse
import logging
import os
from typing import List
from django.core.management import BaseCommand
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper, ScriptDataHelper
from registrar.models import DomainInformation, DomainRequest
from registrar.models.utility.generic_helper import CreateOrUpdateOrganizationTypeHelper
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Loops through each valid DomainInformation and DomainRequest object and updates its organization_type value"
def __init__(self):
super().__init__()
# Get lists for DomainRequest
self.request_to_update: List[DomainRequest] = []
self.request_failed_to_update: List[DomainRequest] = []
self.request_skipped: List[DomainRequest] = []
# Get lists for DomainInformation
self.di_to_update: List[DomainInformation] = []
self.di_failed_to_update: List[DomainInformation] = []
self.di_skipped: List[DomainInformation] = []
# Define a global variable for all domains with election offices
self.domains_with_election_offices_set = set()
def add_arguments(self, parser):
"""Adds command line arguments"""
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
parser.add_argument(
"domain_election_office_filename",
help=("A JSON file that holds the location and filenames" "of all the data files used for migrations"),
)
def handle(self, domain_election_office_filename, **kwargs):
"""Loops through each valid Domain object and updates its first_created value"""
debug = kwargs.get("debug")
# Check if the provided file path is valid
if not os.path.isfile(domain_election_office_filename):
raise argparse.ArgumentTypeError(f"Invalid file path '{domain_election_office_filename}'")
with open(domain_election_office_filename, "r") as file:
for line in file:
# Remove any leading/trailing whitespace
domain = line.strip()
if domain not in self.domains_with_election_offices_set:
self.domains_with_election_offices_set.add(domain)
domain_requests = DomainRequest.objects.filter(
organization_type__isnull=True, requested_domain__name__isnull=False
)
# Code execution will stop here if the user prompts "N"
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
==Proposed Changes==
Number of DomainRequest objects to change: {len(domain_requests)}
Organization_type data will be added for all of these fields.
""",
prompt_title="Do you wish to process DomainRequest?",
)
logger.info("Updating DomainRequest(s)...")
self.update_domain_requests(domain_requests, debug)
# We should actually be targeting all fields with no value for organization type,
# but do have a value for generic_org_type. This is because there is data that we can infer.
domain_infos = DomainInformation.objects.filter(organization_type__isnull=True)
# Code execution will stop here if the user prompts "N"
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
==Proposed Changes==
Number of DomainInformation objects to change: {len(domain_infos)}
Organization_type data will be added for all of these fields.
""",
prompt_title="Do you wish to process DomainInformation?",
)
logger.info("Updating DomainInformation(s)...")
self.update_domain_informations(domain_infos, debug)
def update_domain_requests(self, domain_requests, debug):
for request in domain_requests:
try:
if request.generic_org_type is not None:
domain_name = request.requested_domain.name
request.is_election_board = domain_name in self.domains_with_election_offices_set
request = self.sync_organization_type(DomainRequest, request)
self.request_to_update.append(request)
if debug:
logger.info(f"Updating {request} => {request.organization_type}")
else:
self.request_skipped.append(request)
if debug:
logger.warning(f"Skipped updating {request}. No generic_org_type was found.")
except Exception as err:
self.request_failed_to_update.append(request)
logger.error(err)
logger.error(f"{TerminalColors.FAIL}" f"Failed to update {request}" f"{TerminalColors.ENDC}")
# Do a bulk update on the organization_type field
ScriptDataHelper.bulk_update_fields(
DomainRequest, self.request_to_update, ["organization_type", "is_election_board", "generic_org_type"]
)
# Log what happened
log_header = "============= FINISHED UPDATE FOR DOMAINREQUEST ==============="
TerminalHelper.log_script_run_summary(
self.request_to_update, self.request_failed_to_update, self.request_skipped, debug, log_header
)
def update_domain_informations(self, domain_informations, debug):
for info in domain_informations:
try:
if info.generic_org_type is not None:
domain_name = info.domain.name
info.is_election_board = domain_name in self.domains_with_election_offices_set
info = self.sync_organization_type(DomainInformation, info)
self.di_to_update.append(info)
if debug:
logger.info(f"Updating {info} => {info.organization_type}")
else:
self.di_skipped.append(info)
if debug:
logger.warning(f"Skipped updating {info}. No generic_org_type was found.")
except Exception as err:
self.di_failed_to_update.append(info)
logger.error(err)
logger.error(f"{TerminalColors.FAIL}" f"Failed to update {info}" f"{TerminalColors.ENDC}")
# Do a bulk update on the organization_type field
ScriptDataHelper.bulk_update_fields(
DomainInformation, self.di_to_update, ["organization_type", "is_election_board", "generic_org_type"]
)
# Log what happened
log_header = "============= FINISHED UPDATE FOR DOMAININFORMATION ==============="
TerminalHelper.log_script_run_summary(
self.di_to_update, self.di_failed_to_update, self.di_skipped, debug, log_header
)
def sync_organization_type(self, sender, instance):
"""
Updates the organization_type (without saving) to match
the is_election_board and generic_organization_type fields.
"""
# Define mappings between generic org and election org.
# These have to be defined here, as you'd get a cyclical import error
# otherwise.
# For any given organization type, return the "_election" variant.
# For example: STATE_OR_TERRITORY => STATE_OR_TERRITORY_ELECTION
generic_org_map = DomainRequest.OrgChoicesElectionOffice.get_org_generic_to_org_election()
# For any given "_election" variant, return the base org type.
# For example: STATE_OR_TERRITORY_ELECTION => STATE_OR_TERRITORY
election_org_map = DomainRequest.OrgChoicesElectionOffice.get_org_election_to_org_generic()
# Manages the "organization_type" variable and keeps in sync with
# "is_election_office" and "generic_organization_type"
org_type_helper = CreateOrUpdateOrganizationTypeHelper(
sender=sender,
instance=instance,
generic_org_to_org_map=generic_org_map,
election_org_to_generic_org_map=election_org_map,
)
instance = org_type_helper.create_or_update_organization_type()
return instance

View file

@ -49,6 +49,7 @@ class ScriptDataHelper:
Usage:
bulk_update_fields(Domain, page.object_list, ["first_ready"])
"""
logger.info(f"{TerminalColors.YELLOW} Bulk updating fields... {TerminalColors.ENDC}")
# Create a Paginator object. Bulk_update on the full dataset
# is too memory intensive for our current app config, so we can chunk this data instead.
paginator = Paginator(update_list, batch_size)
@ -59,13 +60,16 @@ class ScriptDataHelper:
class TerminalHelper:
@staticmethod
def log_script_run_summary(to_update, failed_to_update, skipped, debug: bool):
def log_script_run_summary(to_update, failed_to_update, skipped, debug: bool, log_header=None):
"""Prints success, failed, and skipped counts, as well as
all affected objects."""
update_success_count = len(to_update)
update_failed_count = len(failed_to_update)
update_skipped_count = len(skipped)
if log_header is None:
log_header = "============= FINISHED ==============="
# Prepare debug messages
debug_messages = {
"success": (f"{TerminalColors.OKCYAN}Updated: {to_update}{TerminalColors.ENDC}\n"),
@ -85,7 +89,7 @@ class TerminalHelper:
if update_failed_count == 0 and update_skipped_count == 0:
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============
{log_header}
Updated {update_success_count} entries
{TerminalColors.ENDC}
"""
@ -93,7 +97,7 @@ class TerminalHelper:
elif update_failed_count == 0:
logger.warning(
f"""{TerminalColors.YELLOW}
============= FINISHED ===============
{log_header}
Updated {update_success_count} entries
----- SOME DATA WAS INVALID (NEEDS MANUAL PATCHING) -----
Skipped updating {update_skipped_count} entries
@ -103,7 +107,7 @@ class TerminalHelper:
else:
logger.error(
f"""{TerminalColors.FAIL}
============= FINISHED ===============
{log_header}
Updated {update_success_count} entries
----- UPDATE FAILED -----
Failed to update {update_failed_count} entries,

View file

@ -236,8 +236,11 @@ class DomainInformation(TimeStampedModel):
except Exception:
return ""
def save(self, *args, **kwargs):
"""Save override for custom properties"""
def sync_organization_type(self):
"""
Updates the organization_type (without saving) to match
the is_election_board and generic_organization_type fields.
"""
# Define mappings between generic org and election org.
# These have to be defined here, as you'd get a cyclical import error
@ -262,6 +265,12 @@ class DomainInformation(TimeStampedModel):
# Actually updates the organization_type field
org_type_helper.create_or_update_organization_type()
return self
def save(self, *args, **kwargs):
"""Save override for custom properties"""
self.sync_organization_type()
super().save(*args, **kwargs)
@classmethod

View file

@ -666,9 +666,11 @@ class DomainRequest(TimeStampedModel):
help_text="Notes about this request",
)
def save(self, *args, **kwargs):
"""Save override for custom properties"""
def sync_organization_type(self):
"""
Updates the organization_type (without saving) to match
the is_election_board and generic_organization_type fields.
"""
# Define mappings between generic org and election org.
# These have to be defined here, as you'd get a cyclical import error
# otherwise.
@ -692,6 +694,10 @@ class DomainRequest(TimeStampedModel):
# Actually updates the organization_type field
org_type_helper.create_or_update_organization_type()
def save(self, *args, **kwargs):
"""Save override for custom properties"""
self.sync_organization_type()
super().save(*args, **kwargs)
def __str__(self):

View file

@ -0,0 +1 @@
manualtransmission.gov
1 manualtransmission.gov

View file

@ -7,6 +7,9 @@ from django.test import TestCase
from registrar.models import (
User,
Domain,
DomainRequest,
Contact,
Website,
DomainInvitation,
TransitionDomain,
DomainInformation,
@ -18,7 +21,246 @@ from django.core.management import call_command
from unittest.mock import patch, call
from epplibwrapper import commands, common
from .common import MockEppLib, less_console_noise
from .common import MockEppLib, less_console_noise, completed_domain_request
from api.tests.common import less_console_noise_decorator
class TestPopulateOrganizationType(MockEppLib):
"""Tests for the populate_organization_type script"""
def setUp(self):
"""Creates a fake domain object"""
super().setUp()
# Get the domain requests
self.domain_request_1 = completed_domain_request(
name="lasers.gov",
generic_org_type=DomainRequest.OrganizationChoices.FEDERAL,
is_election_board=True,
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
)
self.domain_request_2 = completed_domain_request(
name="readysetgo.gov",
generic_org_type=DomainRequest.OrganizationChoices.CITY,
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
)
self.domain_request_3 = completed_domain_request(
name="manualtransmission.gov",
generic_org_type=DomainRequest.OrganizationChoices.TRIBAL,
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
)
self.domain_request_4 = completed_domain_request(
name="saladandfries.gov",
generic_org_type=DomainRequest.OrganizationChoices.TRIBAL,
is_election_board=True,
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
)
# Approve all three requests
self.domain_request_1.approve()
self.domain_request_2.approve()
self.domain_request_3.approve()
self.domain_request_4.approve()
# Get the domains
self.domain_1 = Domain.objects.get(name="lasers.gov")
self.domain_2 = Domain.objects.get(name="readysetgo.gov")
self.domain_3 = Domain.objects.get(name="manualtransmission.gov")
self.domain_4 = Domain.objects.get(name="saladandfries.gov")
# Get the domain infos
self.domain_info_1 = DomainInformation.objects.get(domain=self.domain_1)
self.domain_info_2 = DomainInformation.objects.get(domain=self.domain_2)
self.domain_info_3 = DomainInformation.objects.get(domain=self.domain_3)
self.domain_info_4 = DomainInformation.objects.get(domain=self.domain_4)
def tearDown(self):
"""Deletes all DB objects related to migrations"""
super().tearDown()
# Delete domains and related information
Domain.objects.all().delete()
DomainInformation.objects.all().delete()
DomainRequest.objects.all().delete()
User.objects.all().delete()
Contact.objects.all().delete()
Website.objects.all().delete()
@less_console_noise_decorator
def run_populate_organization_type(self):
"""
This method executes the populate_organization_type command.
The 'call_command' function from Django's management framework is then used to
execute the populate_organization_type command with the specified arguments.
"""
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa
return_value=True,
):
call_command("populate_organization_type", "registrar/tests/data/fake_election_domains.csv", debug=True)
def assert_expected_org_values_on_request_and_info(
self,
domain_request: DomainRequest,
domain_info: DomainInformation,
expected_values: dict,
):
"""
This is a a helper function that ensures that:
1. DomainRequest and DomainInformation (on given objects) are equivalent
2. That generic_org_type, is_election_board, and organization_type are equal to passed in values
"""
# Test domain request
with self.subTest(field="DomainRequest"):
self.assertEqual(domain_request.generic_org_type, expected_values["generic_org_type"])
self.assertEqual(domain_request.is_election_board, expected_values["is_election_board"])
self.assertEqual(domain_request.organization_type, expected_values["organization_type"])
# Test domain info
with self.subTest(field="DomainInformation"):
self.assertEqual(domain_info.generic_org_type, expected_values["generic_org_type"])
self.assertEqual(domain_info.is_election_board, expected_values["is_election_board"])
self.assertEqual(domain_info.organization_type, expected_values["organization_type"])
def test_request_and_info_city_not_in_csv(self):
"""Tests what happens to a city domain that is not defined in the CSV"""
city_request = self.domain_request_2
city_info = self.domain_request_2
# Make sure that all data is correct before proceeding.
# Since the presave fixture is in effect, we should expect that
# is_election_board is equal to none, even though we tried to define it as "True"
expected_values = {
"is_election_board": False,
"generic_org_type": DomainRequest.OrganizationChoices.CITY,
"organization_type": DomainRequest.OrgChoicesElectionOffice.CITY,
}
self.assert_expected_org_values_on_request_and_info(city_request, city_info, expected_values)
# Run the populate script
try:
self.run_populate_organization_type()
except Exception as e:
self.fail(f"Could not run populate_organization_type script. Failed with exception: {e}")
# All values should be the same
self.assert_expected_org_values_on_request_and_info(city_request, city_info, expected_values)
def test_request_and_info_federal(self):
"""Tests what happens to a federal domain after the script is run (should be unchanged)"""
federal_request = self.domain_request_1
federal_info = self.domain_info_1
# Make sure that all data is correct before proceeding.
# Since the presave fixture is in effect, we should expect that
# is_election_board is equal to none, even though we tried to define it as "True"
expected_values = {
"is_election_board": None,
"generic_org_type": DomainRequest.OrganizationChoices.FEDERAL,
"organization_type": DomainRequest.OrgChoicesElectionOffice.FEDERAL,
}
self.assert_expected_org_values_on_request_and_info(federal_request, federal_info, expected_values)
# Run the populate script
try:
self.run_populate_organization_type()
except Exception as e:
self.fail(f"Could not run populate_organization_type script. Failed with exception: {e}")
# All values should be the same
self.assert_expected_org_values_on_request_and_info(federal_request, federal_info, expected_values)
def do_nothing(self):
"""Does nothing for mocking purposes"""
pass
def test_request_and_info_tribal_add_election_office(self):
"""
Tests if a tribal domain in the election csv changes organization_type to TRIBAL - ELECTION
for the domain request and the domain info
"""
# Set org type fields to none to mimic an environment without this data
tribal_request = self.domain_request_3
tribal_request.organization_type = None
tribal_info = self.domain_info_3
tribal_info.organization_type = None
with patch.object(DomainRequest, "sync_organization_type", self.do_nothing):
with patch.object(DomainInformation, "sync_organization_type", self.do_nothing):
tribal_request.save()
tribal_info.save()
# Make sure that all data is correct before proceeding.
expected_values = {
"is_election_board": False,
"generic_org_type": DomainRequest.OrganizationChoices.TRIBAL,
"organization_type": None,
}
self.assert_expected_org_values_on_request_and_info(tribal_request, tribal_info, expected_values)
# Run the populate script
try:
self.run_populate_organization_type()
except Exception as e:
self.fail(f"Could not run populate_organization_type script. Failed with exception: {e}")
tribal_request.refresh_from_db()
tribal_info.refresh_from_db()
# Because we define this in the "csv", we expect that is election board will switch to True,
# and organization_type will now be tribal_election
expected_values["is_election_board"] = True
expected_values["organization_type"] = DomainRequest.OrgChoicesElectionOffice.TRIBAL_ELECTION
self.assert_expected_org_values_on_request_and_info(tribal_request, tribal_info, expected_values)
def test_request_and_info_tribal_remove_election_office(self):
"""
Tests if a tribal domain in the election csv changes organization_type to TRIBAL
when it used to be TRIBAL - ELECTION
for the domain request and the domain info
"""
# Set org type fields to none to mimic an environment without this data
tribal_election_request = self.domain_request_4
tribal_election_info = self.domain_info_4
tribal_election_request.organization_type = None
tribal_election_info.organization_type = None
with patch.object(DomainRequest, "sync_organization_type", self.do_nothing):
with patch.object(DomainInformation, "sync_organization_type", self.do_nothing):
tribal_election_request.save()
tribal_election_info.save()
# Make sure that all data is correct before proceeding.
# Because the presave fixture is in place when creating this, we should expect that the
# organization_type variable is already pre-populated. We will test what happens when
# it is not in another test.
expected_values = {
"is_election_board": True,
"generic_org_type": DomainRequest.OrganizationChoices.TRIBAL,
"organization_type": None,
}
self.assert_expected_org_values_on_request_and_info(
tribal_election_request, tribal_election_info, expected_values
)
# Run the populate script
try:
self.run_populate_organization_type()
except Exception as e:
self.fail(f"Could not run populate_organization_type script. Failed with exception: {e}")
# Because we don't define this in the "csv", we expect that is election board will switch to False,
# and organization_type will now be tribal
expected_values["is_election_board"] = False
expected_values["organization_type"] = DomainRequest.OrgChoicesElectionOffice.TRIBAL
tribal_election_request.refresh_from_db()
tribal_election_info.refresh_from_db()
self.assert_expected_org_values_on_request_and_info(
tribal_election_request, tribal_election_info, expected_values
)
class TestPopulateFirstReady(TestCase):