Merge branch 'main' into litterbox/3418-refactor-create-federal-portfolio

This commit is contained in:
zandercymatics 2025-03-27 08:25:27 -06:00
commit befdbc045e
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
9 changed files with 406 additions and 12 deletions

View file

@ -151,7 +151,7 @@ class EPPLibWrapper:
raise RegistryError(message) from err
else:
if response.code >= 2000:
raise RegistryError(response.msg, code=response.code)
raise RegistryError(response.msg, code=response.code, response=response)
else:
return response
@ -174,6 +174,8 @@ class EPPLibWrapper:
try:
return self._send(command)
except RegistryError as err:
if err.response:
logger.info(f"cltrid is {err.response.cl_tr_id} svtrid is {err.response.sv_tr_id}")
if (
err.is_transport_error()
or err.is_connection_error()

View file

@ -1,4 +1,4 @@
from enum import IntEnum
from enum import IntEnum, Enum
class ErrorCode(IntEnum):
@ -52,6 +52,10 @@ class ErrorCode(IntEnum):
SESSION_LIMIT_EXCEEDED_SERVER_CLOSING_CONNECTION = 2502
class RegistryErrorMessage(Enum):
REGISTRAR_NOT_LOGGED_IN = "Registrar is not logged in."
class RegistryError(Exception):
"""
Overview of registry response codes from RFC 5730. See RFC 5730 for full text.
@ -62,14 +66,21 @@ class RegistryError(Exception):
- 2501 - 2502 Something malicious or abusive may have occurred
"""
def __init__(self, *args, code=None, note="", **kwargs):
def __init__(self, *args, code=None, note="", response=None, **kwargs):
super().__init__(*args, **kwargs)
self.code = code
self.response = response
# note is a string that can be used to provide additional context
self.note = note
def should_retry(self):
return self.code == ErrorCode.COMMAND_FAILED
# COMMAND_USE_ERROR is returning with message, Registrar is not logged in,
# which can be recovered from with a retry
return self.code == ErrorCode.COMMAND_FAILED or (
self.code == ErrorCode.COMMAND_USE_ERROR
and self.response
and getattr(self.response, "msg", None) == RegistryErrorMessage.REGISTRAR_NOT_LOGGED_IN.value
)
def is_transport_error(self):
return self.code == ErrorCode.TRANSPORT_ERROR

View file

@ -264,6 +264,58 @@ class TestClient(TestCase):
# send() is called 5 times: send(login), send(command) fail, send(logout), send(login), send(command)
self.assertEquals(mock_send.call_count, 5)
@less_console_noise_decorator
@patch("epplibwrapper.client.Client")
@patch("epplibwrapper.client.logger")
def test_send_command_2002_failure_prompts_successful_retry(self, mock_logger, mock_client):
"""Test when the send("InfoDomainCommand) call fails with a 2002, prompting a retry
and the subsequent send("InfoDomainCommand) call succeeds
Flow:
Initialization succeeds
Send command fails (with 2002 code) prompting retry
Client closes and re-initializes, and command succeeds"""
# Mock the Client instance and its methods
# connect() and close() should succeed throughout
mock_connect = MagicMock()
mock_close = MagicMock()
# create success and failure result messages
send_command_success_result = self.fake_result(1000, "Command completed successfully")
send_command_failure_result = self.fake_result(2002, "Registrar is not logged in.")
# side_effect for send call, initial send(login) succeeds during initialization, next send(command)
# fails, subsequent sends (logout, login, command) all succeed
send_call_count = 0
# Create a mock command
mock_command = MagicMock()
mock_command.__class__.__name__ = "InfoDomainCommand"
def side_effect(*args, **kwargs):
nonlocal send_call_count
send_call_count += 1
if send_call_count == 2:
return send_command_failure_result
else:
return send_command_success_result
mock_send = MagicMock(side_effect=side_effect)
mock_client.return_value.connect = mock_connect
mock_client.return_value.close = mock_close
mock_client.return_value.send = mock_send
# Create EPPLibWrapper instance and initialize client
wrapper = EPPLibWrapper()
wrapper.send(mock_command, cleaned=True)
# connect() is called twice, once during initialization of app, once during retry
self.assertEquals(mock_connect.call_count, 2)
# close() is called once, during retry
mock_close.assert_called_once()
# send() is called 5 times: send(login), send(command) fail, send(logout), send(login), send(command)
self.assertEquals(mock_send.call_count, 5)
# Assertion proper logging; note that the
mock_logger.info.assert_any_call(
"InfoDomainCommand failed and will be retried Error: Registrar is not logged in."
)
mock_logger.info.assert_any_call("cltrid is cl_tr_id svtrid is sv_tr_id")
@less_console_noise_decorator
def fake_failure_send_concurrent_threads(self, command=None, cleaned=None):
"""

View file

@ -1261,6 +1261,13 @@ class HostIpAdmin(AuditedAdmin, ImportExportRegistrarModelAdmin):
resource_classes = [HostIpResource]
model = models.HostIP
search_fields = ["host__name", "address"]
search_help_text = "Search by host name or address."
list_display = (
"host",
"address",
)
class ContactResource(resources.ModelResource):
"""defines how each field in the referenced model should be mapped to the corresponding fields in the
@ -4535,6 +4542,10 @@ class PublicContactAdmin(ListHeaderAdmin, ImportExportRegistrarModelAdmin):
change_form_template = "django/admin/email_clipboard_change_form.html"
autocomplete_fields = ["domain"]
list_display = ("registry_id", "contact_type", "domain", "name")
search_fields = ["registry_id", "domain__name", "name"]
search_help_text = "Search by registry id, domain, or name."
list_filter = ("contact_type",)
def changeform_view(self, request, object_id=None, form_url="", extra_context=None):
if extra_context is None:

View file

@ -0,0 +1,105 @@
import logging
import argparse
from django.core.management import BaseCommand
from registrar.management.commands.utility.terminal_helper import PopulateScriptTemplate, TerminalColors
from registrar.models import PublicContact
from registrar.models.utility.generic_helper import normalize_string
from registrar.utility.enums import DefaultEmail
logger = logging.getLogger(__name__)
class Command(BaseCommand, PopulateScriptTemplate):
help = "Loops through each default PublicContact and updates some values on each"
def add_arguments(self, parser):
"""Adds command line arguments"""
parser.add_argument(
"--overwrite_updated_contacts",
action=argparse.BooleanOptionalAction,
help=(
"Loops over PublicContacts with an email of 'help@get.gov' when enabled."
"Use this setting if the record was updated in the DB but not correctly in EPP."
),
)
parser.add_argument(
"--target_domain",
help=(
"Updates the public contact on a given domain name (case insensitive). "
"Use this option to avoid doing a mass-update of every public contact record."
),
)
def handle(self, **kwargs):
"""Loops through each valid User object and updates its verification_type value"""
overwrite_updated_contacts = kwargs.get("overwrite_updated_contacts")
target_domain = kwargs.get("target_domain")
default_emails = {email for email in DefaultEmail}
# Don't update records we've already updated
if not overwrite_updated_contacts:
default_emails.remove(DefaultEmail.PUBLIC_CONTACT_DEFAULT)
# We should only update DEFAULT records. This means that if all values are not default,
# we should skip as this could lead to data corruption.
# Since we check for all fields, we don't account for casing differences.
self.old_and_new_default_contact_values = {
"name": {
"csd/cb attn: .gov tld",
"csd/cb attn: cameron dixon",
"program manager",
"registry customer service",
},
"street1": {"1110 n. glebe rd", "cisa ngr stop 0645", "4200 wilson blvd."},
"pc": {"22201", "20598-0645"},
"email": default_emails,
}
if not target_domain:
filter_condition = {"email__in": default_emails}
else:
filter_condition = {"email__in": default_emails, "domain__name__iexact": target_domain}
# This variable is decorative since we are skipping bulk update
fields_to_update = ["name", "street1", "pc", "email"]
self.mass_update_records(PublicContact, filter_condition, fields_to_update, show_record_count=True)
def bulk_update_fields(self, *args, **kwargs):
"""Skip bulk update since we need to manually save each field.
Our EPP logic is tied to an override of .save(), and this also associates
with our caching logic for this area of the code.
Since bulk update does not trigger .save() for each field, we have to
call it manually.
"""
return None
def update_record(self, record: PublicContact):
"""Defines how we update the verification_type field"""
record.name = "CSD/CB Attn: .gov TLD"
record.street1 = "1110 N. Glebe Rd"
record.pc = "22201"
record.email = DefaultEmail.PUBLIC_CONTACT_DEFAULT
record.save()
logger.info(f"{TerminalColors.OKCYAN}Updated '{record}' in EPP.{TerminalColors.ENDC}")
def should_skip_record(self, record) -> bool: # noqa
"""Skips updating a public contact if it contains different default info."""
if record.registry_id and len(record.registry_id) < 16:
message = (
f"Skipping legacy verisign contact '{record}'. "
f"The registry_id field has a length less than 16 characters."
)
logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
return True
for key, expected_values in self.old_and_new_default_contact_values.items():
record_field = normalize_string(getattr(record, key))
if record_field not in expected_values:
message = (
f"Skipping '{record}' to avoid potential data corruption. "
f"The field '{key}' does not match the default.\n"
f"Details: DB value - {record_field}, expected value(s) - {expected_values}"
)
logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
return True
return False

View file

@ -124,7 +124,9 @@ class PopulateScriptTemplate(ABC):
"""
raise NotImplementedError
def mass_update_records(self, object_class, filter_conditions, fields_to_update, debug=True, verbose=False):
def mass_update_records(
self, object_class, filter_conditions, fields_to_update, debug=True, verbose=False, show_record_count=False
):
"""Loops through each valid "object_class" object - specified by filter_conditions - and
updates fields defined by fields_to_update using update_record.
@ -144,6 +146,9 @@ class PopulateScriptTemplate(ABC):
verbose: Whether to print a detailed run summary *before* run confirmation.
Default: False.
show_record_count: Whether to show a 'Record 1/10' dialog when running update.
Default: False.
Raises:
NotImplementedError: If you do not define update_record before using this function.
TypeError: If custom_filter is not Callable.
@ -153,13 +158,14 @@ class PopulateScriptTemplate(ABC):
# apply custom filter
records = self.custom_filter(records)
records_length = len(records)
readable_class_name = self.get_class_name(object_class)
# for use in the execution prompt.
proposed_changes = (
"==Proposed Changes==\n"
f"Number of {readable_class_name} objects to change: {len(records)}\n"
f"Number of {readable_class_name} objects to change: {records_length}\n"
f"These fields will be updated on each record: {fields_to_update}"
)
@ -177,7 +183,9 @@ class PopulateScriptTemplate(ABC):
to_update: List[object_class] = []
to_skip: List[object_class] = []
failed_to_update: List[object_class] = []
for record in records:
for i, record in enumerate(records, start=1):
if show_record_count:
logger.info(f"{TerminalColors.BOLD}Record {i}/{records_length}{TerminalColors.ENDC}")
try:
if not self.should_skip_record(record):
self.update_record(record)
@ -191,7 +199,7 @@ class PopulateScriptTemplate(ABC):
logger.error(fail_message)
# Do a bulk update on the desired field
ScriptDataHelper.bulk_update_fields(object_class, to_update, fields_to_update)
self.bulk_update_fields(object_class, to_update, fields_to_update)
# Log what happened
TerminalHelper.log_script_run_summary(
@ -204,6 +212,10 @@ class PopulateScriptTemplate(ABC):
display_as_str=True,
)
def bulk_update_fields(self, object_class, to_update, fields_to_update):
"""Bulk updates the given fields"""
ScriptDataHelper.bulk_update_fields(object_class, to_update, fields_to_update)
def get_class_name(self, sender) -> str:
"""Returns the class name that we want to display for the terminal prompt.
Example: DomainRequest => "Domain Request"
@ -516,4 +528,4 @@ class TerminalHelper:
terminal_color = color
colored_message = f"{terminal_color}{message}{TerminalColors.ENDC}"
log_method(colored_message, exc_info=exc_info)
return log_method(colored_message, exc_info=exc_info)

View file

@ -164,4 +164,4 @@ class PublicContact(TimeStampedModel):
return cls._meta.get_field("registry_id").max_length
def __str__(self):
return f"{self.name} <{self.email}>" f"id: {self.registry_id} " f"type: {self.contact_type}"
return self.registry_id

View file

@ -1931,7 +1931,14 @@ class MockEppLib(TestCase):
return MagicMock(res_data=[mocked_result])
def mockCreateContactCommands(self, _request, cleaned):
if getattr(_request, "id", None) == "fail" and self.mockedSendFunction.call_count == 3:
ids_to_throw_already_exists = [
"failAdmin1234567",
"failTech12345678",
"failSec123456789",
"failReg123456789",
"fail",
]
if getattr(_request, "id", None) in ids_to_throw_already_exists and self.mockedSendFunction.call_count == 3:
# use this for when a contact is being updated
# sets the second send() to fail
raise RegistryError(code=ErrorCode.OBJECT_EXISTS)
@ -1946,7 +1953,14 @@ class MockEppLib(TestCase):
return MagicMock(res_data=[self.mockDataInfoHosts])
def mockDeleteContactCommands(self, _request, cleaned):
if getattr(_request, "id", None) == "fail":
ids_to_throw_already_exists = [
"failAdmin1234567",
"failTech12345678",
"failSec123456789",
"failReg123456789",
"fail",
]
if getattr(_request, "id", None) in ids_to_throw_already_exists:
raise RegistryError(code=ErrorCode.OBJECT_EXISTS)
else:
return MagicMock(

View file

@ -32,6 +32,7 @@ from registrar.models import (
Portfolio,
Suborganization,
)
from registrar.utility.enums import DefaultEmail
import tablib
from unittest.mock import patch, call, MagicMock, mock_open
from epplibwrapper import commands, common
@ -2508,3 +2509,189 @@ class TestRemovePortfolios(TestCase):
# Check that the portfolio was deleted
self.assertFalse(Portfolio.objects.filter(organization_name="Test with suborg").exists())
class TestUpdateDefaultPublicContacts(MockEppLib):
"""Tests for the update_default_public_contacts management command."""
@less_console_noise_decorator
def setUp(self):
"""Setup test data with PublicContact records."""
super().setUp()
self.domain_request = completed_domain_request(
name="testdomain.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
)
self.domain_request.approve()
self.domain = self.domain_request.approved_domain
# 1. PublicContact with all old default values
self.old_default_contact = self.domain.get_default_administrative_contact()
self.old_default_contact.registry_id = "failAdmin1234567"
self.old_default_contact.name = "CSD/CB ATTN: Cameron Dixon"
self.old_default_contact.street1 = "CISA NGR STOP 0645"
self.old_default_contact.pc = "20598-0645"
self.old_default_contact.email = DefaultEmail.OLD_PUBLIC_CONTACT_DEFAULT
self.old_default_contact.save()
# 2. PublicContact with current default email but old values for other fields
self.mixed_default_contact = self.domain.get_default_technical_contact()
self.mixed_default_contact.registry_id = "failTech12345678"
self.mixed_default_contact.save(skip_epp_save=True)
self.mixed_default_contact.name = "registry customer service"
self.mixed_default_contact.street1 = "4200 Wilson Blvd."
self.mixed_default_contact.pc = "22201"
self.mixed_default_contact.email = DefaultEmail.PUBLIC_CONTACT_DEFAULT
self.mixed_default_contact.save()
# 3. PublicContact with non-default values
self.non_default_contact = self.domain.get_default_security_contact()
self.non_default_contact.registry_id = "failSec123456789"
self.non_default_contact.domain = self.domain
self.non_default_contact.save(skip_epp_save=True)
self.non_default_contact.name = "Hotdogs"
self.non_default_contact.street1 = "123 hotdog town"
self.non_default_contact.pc = "22111"
self.non_default_contact.email = "thehotdogman@igorville.gov"
self.non_default_contact.save()
# 4. Create a default contact but with an old email
self.default_registrant_old_email = self.domain.get_default_registrant_contact()
self.default_registrant_old_email.registry_id = "failReg123456789"
self.default_registrant_old_email.email = DefaultEmail.LEGACY_DEFAULT
self.default_registrant_old_email.save()
DF = common.DiscloseField
excluded_disclose_fields = {DF.NOTIFY_EMAIL, DF.VAT, DF.IDENT}
self.all_disclose_fields = {field for field in DF} - excluded_disclose_fields
def tearDown(self):
"""Clean up test data."""
super().tearDown()
PublicContact.objects.all().delete()
Domain.objects.all().delete()
DomainRequest.objects.all().delete()
DomainInformation.objects.all().delete()
User.objects.all().delete()
@patch("registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", return_value=True)
@less_console_noise_decorator
def run_update_default_public_contacts(self, mock_prompt, **kwargs):
"""Execute the update_default_public_contacts command with options."""
call_command("update_default_public_contacts", **kwargs)
# @less_console_noise_decorator
def test_updates_old_default_contact(self):
"""
Test that contacts with old default values are updated to new default values.
Also tests for string normalization.
"""
self.run_update_default_public_contacts()
self.old_default_contact.refresh_from_db()
# Verify updates occurred
self.assertEqual(self.old_default_contact.name, "CSD/CB Attn: .gov TLD")
self.assertEqual(self.old_default_contact.street1, "1110 N. Glebe Rd")
self.assertEqual(self.old_default_contact.pc, "22201")
self.assertEqual(self.old_default_contact.email, DefaultEmail.PUBLIC_CONTACT_DEFAULT)
# Verify EPP create/update calls were made
expected_update = self._convertPublicContactToEpp(
self.old_default_contact,
disclose=False,
disclose_fields=self.all_disclose_fields - {"name", "email", "voice", "addr"},
)
self.mockedSendFunction.assert_any_call(expected_update, cleaned=True)
@less_console_noise_decorator
def test_updates_with_default_contact_values(self):
"""
Test that contacts created from the default helper function with old email are updated.
"""
self.run_update_default_public_contacts()
self.default_registrant_old_email.refresh_from_db()
# Verify updates occurred
self.assertEqual(self.default_registrant_old_email.name, "CSD/CB Attn: .gov TLD")
self.assertEqual(self.default_registrant_old_email.street1, "1110 N. Glebe Rd")
self.assertEqual(self.default_registrant_old_email.pc, "22201")
self.assertEqual(self.default_registrant_old_email.email, DefaultEmail.PUBLIC_CONTACT_DEFAULT)
# Verify values match the default
default_reg = PublicContact.get_default_registrant()
self.assertEqual(self.default_registrant_old_email.name, default_reg.name)
self.assertEqual(self.default_registrant_old_email.street1, default_reg.street1)
self.assertEqual(self.default_registrant_old_email.pc, default_reg.pc)
self.assertEqual(self.default_registrant_old_email.email, default_reg.email)
# Verify EPP create/update calls were made
expected_update = self._convertPublicContactToEpp(
self.default_registrant_old_email, disclose=False, disclose_fields=self.all_disclose_fields
)
self.mockedSendFunction.assert_any_call(expected_update, cleaned=True)
@less_console_noise_decorator
def test_skips_non_default_contacts(self):
"""
Test that contacts with non-default values are skipped.
"""
original_name = self.non_default_contact.name
original_street1 = self.non_default_contact.street1
original_pc = self.non_default_contact.pc
original_email = self.non_default_contact.email
self.run_update_default_public_contacts()
self.non_default_contact.refresh_from_db()
# Verify no updates occurred
self.assertEqual(self.non_default_contact.name, original_name)
self.assertEqual(self.non_default_contact.street1, original_street1)
self.assertEqual(self.non_default_contact.pc, original_pc)
self.assertEqual(self.non_default_contact.email, original_email)
# Ensure that the update is still skipped even with the override flag
self.run_update_default_public_contacts(overwrite_updated_contacts=True)
self.non_default_contact.refresh_from_db()
# Verify no updates occurred
self.assertEqual(self.non_default_contact.name, original_name)
self.assertEqual(self.non_default_contact.street1, original_street1)
self.assertEqual(self.non_default_contact.pc, original_pc)
self.assertEqual(self.non_default_contact.email, original_email)
@less_console_noise_decorator
def test_skips_contacts_with_current_default_email_by_default(self):
"""
Test that contacts with the current default email are skipped when not using the override flag.
"""
# Get original values
original_name = self.mixed_default_contact.name
original_street1 = self.mixed_default_contact.street1
self.run_update_default_public_contacts()
self.mixed_default_contact.refresh_from_db()
# Verify no updates occurred
self.assertEqual(self.mixed_default_contact.name, original_name)
self.assertEqual(self.mixed_default_contact.street1, original_street1)
self.assertEqual(self.mixed_default_contact.email, DefaultEmail.PUBLIC_CONTACT_DEFAULT)
@less_console_noise_decorator
def test_updates_with_overwrite_flag(self):
"""
Test that contacts with the current default email are updated when using the override flag.
"""
# Run the command with the override flag
self.run_update_default_public_contacts(overwrite_updated_contacts=True)
self.mixed_default_contact.refresh_from_db()
# Verify updates occurred
self.assertEqual(self.mixed_default_contact.name, "CSD/CB Attn: .gov TLD")
self.assertEqual(self.mixed_default_contact.street1, "1110 N. Glebe Rd")
self.assertEqual(self.mixed_default_contact.pc, "22201")
self.assertEqual(self.mixed_default_contact.email, DefaultEmail.PUBLIC_CONTACT_DEFAULT)
# Verify EPP create/update calls were made
expected_update = self._convertPublicContactToEpp(
self.mixed_default_contact, disclose=False, disclose_fields=self.all_disclose_fields
)
self.mockedSendFunction.assert_any_call(expected_update, cleaned=True)