Update script

This commit is contained in:
zandercymatics 2024-04-04 14:37:45 -06:00
parent 0961ecbb33
commit 3b0fa34ee2
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
2 changed files with 69 additions and 39 deletions

View file

@ -883,6 +883,8 @@ class DomainInformationAdmin(ListHeaderAdmin):
"Type of organization", "Type of organization",
{ {
"fields": [ "fields": [
"is_election_board",
"generic_org_type",
"organization_type", "organization_type",
] ]
}, },

View file

@ -1,10 +1,12 @@
import argparse import argparse
import logging import logging
import os
from registrar.signals import create_or_update_organization_type
from typing import List from typing import List
from django.core.management import BaseCommand from django.core.management import BaseCommand
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper, ScriptDataHelper from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper, ScriptDataHelper
from registrar.models import DomainInformation, DomainRequest, Domain from registrar.models import DomainInformation, DomainRequest, Domain
from django.db import transaction
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -23,18 +25,37 @@ class Command(BaseCommand):
self.di_failed_to_update: List[DomainInformation] = [] self.di_failed_to_update: List[DomainInformation] = []
self.di_skipped: List[DomainInformation] = [] self.di_skipped: List[DomainInformation] = []
# Define a global variable for all domains with election offices
self.domains_with_election_offices_set = set()
def add_arguments(self, parser): def add_arguments(self, parser):
"""Adds command line arguments""" """Adds command line arguments"""
parser.add_argument("--debug", action=argparse.BooleanOptionalAction) parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
parser.add_argument( parser.add_argument(
"election_office_filename", "domain_election_office_filename",
help=("A JSON file that holds the location and filenames" "of all the data files used for migrations"), help=("A JSON file that holds the location and filenames" "of all the data files used for migrations"),
) )
def handle(self, **kwargs): def handle(self, domain_election_office_filename, **kwargs):
"""Loops through each valid Domain object and updates its first_created value""" """Loops through each valid Domain object and updates its first_created value"""
debug = kwargs.get("debug") debug = kwargs.get("debug")
domain_requests = DomainRequest.objects.filter(organization_type__isnull=True)
# Check if the provided file path is valid
if not os.path.isfile(domain_election_office_filename):
raise argparse.ArgumentTypeError(f"Invalid file path '{domain_election_office_filename}'")
with open(domain_election_office_filename, "r") as file:
for line in file:
# Remove any leading/trailing whitespace
domain = line.strip()
if domain not in self.domains_with_election_offices_set:
self.domains_with_election_offices_set.add(domain)
domain_requests = DomainRequest.objects.filter(
organization_type__isnull=True,
requested_domain__name__isnull=False
)
# Code execution will stop here if the user prompts "N" # Code execution will stop here if the user prompts "N"
TerminalHelper.prompt_for_execution( TerminalHelper.prompt_for_execution(
@ -51,7 +72,9 @@ class Command(BaseCommand):
self.update_domain_requests(domain_requests, debug) self.update_domain_requests(domain_requests, debug)
domain_infos = DomainInformation.objects.filter(domain_request__isnull=False, organization_type__isnull=True) # We should actually be targeting all fields with no value for organization type,
# but do have a value for generic_org_type. This is because there is data that we can infer.
domain_infos = DomainInformation.objects.filter(organization_type__isnull=True)
# Code execution will stop here if the user prompts "N" # Code execution will stop here if the user prompts "N"
TerminalHelper.prompt_for_execution( TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True, system_exit_on_terminate=True,
@ -68,25 +91,28 @@ class Command(BaseCommand):
self.update_domain_informations(domain_infos, debug) self.update_domain_informations(domain_infos, debug)
def update_domain_requests(self, domain_requests, debug): def update_domain_requests(self, domain_requests, debug):
with transaction.atomic():
for request in domain_requests: for request in domain_requests:
try: try:
# TODO - parse data from hfile ere # TODO - parse data from hfile ere
if request.generic_org_type is not None: if request.generic_org_type is not None:
request.is_election_board = True domain_name = request.requested_domain.name
request.is_election_board = domain_name in self.domains_with_election_offices_set
request.save()
self.request_to_update.append(request) self.request_to_update.append(request)
if debug: if debug:
logger.info(f"Updating {request}") logger.info(f"Updated {request} => {request.organization_type}")
else: else:
self.request_skipped.append(request) self.request_skipped.append(request)
if debug: if debug:
logger.warning(f"Skipped updating {request}") logger.warning(f"Skipped updating {request}. No generic_org_type was found.")
except Exception as err: except Exception as err:
self.request_failed_to_update.append(request) self.request_failed_to_update.append(request)
logger.error(err) logger.error(err)
logger.error(f"{TerminalColors.FAIL}" f"Failed to update {request}" f"{TerminalColors.ENDC}") logger.error(f"{TerminalColors.FAIL}" f"Failed to update {request}" f"{TerminalColors.ENDC}")
# Do a bulk update on the organization_type field # Do a bulk update on the organization_type field
ScriptDataHelper.bulk_update_fields(DomainRequest, self.request_to_update, ["organization_type"]) # ScriptDataHelper.bulk_update_fields(DomainRequest, self.request_to_update, ["is_election_board"])
# Log what happened # Log what happened
log_header = "============= FINISHED UPDATE FOR DOMAINREQUEST ===============" log_header = "============= FINISHED UPDATE FOR DOMAINREQUEST ==============="
@ -95,25 +121,27 @@ class Command(BaseCommand):
) )
def update_domain_informations(self, domain_informations, debug): def update_domain_informations(self, domain_informations, debug):
with transaction.atomic():
for info in domain_informations: for info in domain_informations:
try: try:
# TODO - parse data from hfile ere
if info.generic_org_type is not None: if info.generic_org_type is not None:
info.is_election_board = True domain_name = info.domain.name
info.is_election_board = domain_name in self.domains_with_election_offices_set
info.save()
self.di_to_update.append(info) self.di_to_update.append(info)
if debug: if debug:
logger.info(f"Updating {info}") logger.info(f"Updated {info} => {info.organization_type}")
else: else:
self.di_skipped.append(info) self.di_skipped.append(info)
if debug: if debug:
logger.warning(f"Skipped updating {info}") logger.warning(f"Skipped updating {info}. No generic_org_type was found.")
except Exception as err: except Exception as err:
self.di_failed_to_update.append(info) self.di_failed_to_update.append(info)
logger.error(err) logger.error(err)
logger.error(f"{TerminalColors.FAIL}" f"Failed to update {info}" f"{TerminalColors.ENDC}") logger.error(f"{TerminalColors.FAIL}" f"Failed to update {info}" f"{TerminalColors.ENDC}")
# Do a bulk update on the organization_type field # Do a bulk update on the organization_type field
ScriptDataHelper.bulk_update_fields(DomainInformation, self.di_to_update, ["organization_type"]) # ScriptDataHelper.bulk_update_fields(DomainInformation, self.di_to_update, ["organization_type", "is_election_board", "generic_org_type"])
# Log what happened # Log what happened
log_header = "============= FINISHED UPDATE FOR DOMAININFORMATION ===============" log_header = "============= FINISHED UPDATE FOR DOMAININFORMATION ==============="