Unit tests

This commit is contained in:
zandercymatics 2023-12-29 13:19:45 -07:00
parent 21edb7b0ad
commit 5716af3239
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
2 changed files with 85 additions and 22 deletions

View file

@ -1,4 +1,4 @@
""""""
"""Loops through each valid DomainInformation object and updates its agency value"""
import argparse
import logging
@ -6,7 +6,6 @@ from typing import List
from django.core.management import BaseCommand
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from registrar.models.domain import Domain
from registrar.models.domain_information import DomainInformation
from django.db.models import Q
@ -16,7 +15,7 @@ logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Runs the cat command on files from /tmp into the getgov directory."
help = "Loops through each valid DomainInformation object and updates its agency value"
def __init__(self):
super().__init__()
@ -30,18 +29,26 @@ class Command(BaseCommand):
"""Adds command line arguments"""
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
def handle(self, **options):
debug = options.get("debug")
def handle(self, **kwargs):
"""Loops through each valid DomainInformation object and updates its agency value"""
debug = kwargs.get("debug")
# Update the "federal_agency" field
self.patch_agency_info(debug)
def patch_agency_info(self, debug):
domain_info_to_fix = DomainInformation.objects.filter(Q(federal_agency=None) | Q(federal_agency=""))
"""
Updates the `federal_agency` field of each valid `DomainInformation` object based on the corresponding
`TransitionDomain` object. Skips the update if the `TransitionDomain` object does not exist or its
`federal_agency` field is `None`. Logs the update, skip, and failure actions if debug mode is on.
After all updates, logs a summary of the results.
"""
empty_agency_query = Q(federal_agency=None) | Q(federal_agency="")
domain_info_to_fix = DomainInformation.objects.filter(empty_agency_query)
domain_names = domain_info_to_fix.values_list("domain__name", flat=True)
transition_domains = TransitionDomain.objects.filter(domain_name__in=domain_names).exclude(empty_agency_query)
domain_names = domain_info_to_fix.values_list('domain__name', flat=True)
transition_domains = TransitionDomain.objects.filter(domain_name__in=domain_names)
# Get the domain names from TransitionDomain
td_agencies = transition_domains.values_list("domain_name", "federal_agency").distinct()
@ -63,18 +70,19 @@ class Command(BaseCommand):
for di in domain_info_to_fix:
domain_name = di.domain.name
federal_agency = td_dict.get(domain_name)
log_message = None
# If agency exists on a TransitionDomain, update the related DomainInformation object
if domain_name in td_dict and federal_agency is not None:
di.federal_agency = federal_agency
self.di_to_update.append(di)
log_message = f"{TerminalColors.OKCYAN}Updated {di}{TerminalColors.ENDC}"
else:
elif domain_name not in td_dict:
self.di_skipped.append(di)
log_message = f"{TerminalColors.YELLOW}Skipping update for {di}{TerminalColors.ENDC}"
# Log the action if debug mode is on
if debug:
if debug and log_message is not None:
logger.info(log_message)
# Bulk update the federal agency field in DomainInformation objects
@ -87,11 +95,9 @@ class Command(BaseCommand):
# If the fields we expect to update are still None, then something is wrong.
for di in corrected_domains:
if domain_name in td_dict and td_dict.get(domain_name) is None:
logger.info(
f"{TerminalColors.FAIL}Failed to update {di}{TerminalColors.ENDC}"
)
logger.info(f"{TerminalColors.FAIL}Failed to update {di}{TerminalColors.ENDC}")
self.di_failed_to_update.append(di)
# === Log results and return data === #
self.log_script_run_summary(debug)
@ -106,9 +112,7 @@ class Command(BaseCommand):
debug_messages = {
"success": (f"{TerminalColors.OKCYAN}Updated: {self.di_to_update}{TerminalColors.ENDC}\n"),
"skipped": (f"{TerminalColors.YELLOW}Skipped: {self.di_skipped}{TerminalColors.ENDC}\n"),
"failed": (
f"{TerminalColors.FAIL}Failed: {self.di_failed_to_update}{TerminalColors.ENDC}\n"
),
"failed": (f"{TerminalColors.FAIL}Failed: {self.di_failed_to_update}{TerminalColors.ENDC}\n"),
}
# Print out a list of everything that was changed, if we have any changes to log.
@ -129,7 +133,7 @@ class Command(BaseCommand):
"""
)
elif update_failed_count == 0:
logger.info(
logger.warning(
f"""{TerminalColors.YELLOW}
============= FINISHED ===============
Updated {update_success_count} DomainInformation entries
@ -140,7 +144,7 @@ class Command(BaseCommand):
"""
)
else:
logger.info(
logger.error(
f"""{TerminalColors.FAIL}
============= FINISHED ===============
Updated {update_success_count} DomainInformation entries
@ -150,4 +154,4 @@ class Command(BaseCommand):
Skipped updating {update_skipped_count} DomainInformation entries
{TerminalColors.ENDC}
"""
)
)

View file

@ -21,6 +21,65 @@ from registrar.models.contact import Contact
from .common import MockEppLib, less_console_noise
class TestPatchAgencyInfo(TestCase):
def setUp(self):
self.user, _ = User.objects.get_or_create(username="testuser")
self.domain, _ = Domain.objects.get_or_create(name="testdomain.gov")
self.domain_info, _ = DomainInformation.objects.get_or_create(domain=self.domain, creator=self.user)
self.transition_domain, _ = TransitionDomain.objects.get_or_create(
domain_name="testdomain.gov", federal_agency="test agency"
)
def tearDown(self):
Domain.objects.all().delete()
DomainInformation.objects.all().delete()
User.objects.all().delete()
TransitionDomain.objects.all().delete()
@patch("registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", return_value=True)
def call_patch_federal_agency_info(self, mock_prompt):
"""Calls the patch_federal_agency_info command and mimics a keypress"""
call_command("patch_federal_agency_info", debug=True)
def test_patch_agency_info(self):
"""
Tests that the `patch_federal_agency_info` command successfully
updates the `federal_agency` field
of a `DomainInformation` object when the corresponding
`TransitionDomain` object has a valid `federal_agency`.
"""
self.call_patch_federal_agency_info()
# Reload the domain_info object from the database
self.domain_info.refresh_from_db()
# Check that the federal_agency field was updated
self.assertEqual(self.domain_info.federal_agency, "test agency")
def test_patch_agency_info_skip(self):
"""
Tests that the `patch_federal_agency_info` command logs a warning and
does not update the `federal_agency` field
of a `DomainInformation` object when the corresponding
`TransitionDomain` object does not exist.
"""
# Set federal_agency to None to simulate a skip
self.transition_domain.federal_agency = None
self.transition_domain.save()
with self.assertLogs("registrar.management.commands.patch_federal_agency_info", level="WARNING") as context:
self.call_patch_federal_agency_info()
# Check that the correct log message was output
self.assertIn("SOME AGENCY DATA WAS NONE", context.output[0])
# Reload the domain_info object from the database
self.domain_info.refresh_from_db()
# Check that the federal_agency field was not updated
self.assertIsNone(self.domain_info.federal_agency)
class TestExtendExpirationDates(MockEppLib):
def setUp(self):
"""Defines the file name of migration_json and the folder its contained in"""