Merge pull request #1470 from cisagov/za/1212-extend-expiration-dates

Ticket #1212: Extend Expiration Dates
This commit is contained in:
zandercymatics 2023-12-14 15:21:20 -07:00 committed by GitHub
commit 335b96b2a7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 455 additions and 2 deletions

View file

@ -493,3 +493,34 @@ The `load_organization_data` script has five optional parameters. These are as f
| 3 | **directory** | Specifies the directory containing the files that will be parsed. Defaults to "migrationdata" |
| 4 | **domain_additional_filename** | Specifies the filename of domain_additional. Used as an override for the JSON. Has no default. |
| 5 | **organization_adhoc_filename** | Specifies the filename of organization_adhoc. Used as an override for the JSON. Has no default. |
## Extend Domain Extension Dates
This section outlines how to extend the expiration date of all ready domains (or a select subset) by a defined period of time.
### Running on sandboxes
#### Step 1: Login to CloudFoundry
```cf login -a api.fr.cloud.gov --sso```
#### Step 2: SSH into your environment
```cf ssh getgov-{space}```
Example: `cf ssh getgov-za`
#### Step 3: Create a shell instance
```/tmp/lifecycle/shell```
#### Step 4: Extend domains
```./manage.py extend_expiration_dates```
### Running locally
```docker-compose exec app ./manage.py extend_expiration_dates```
##### Optional parameters
| | Parameter | Description |
|:-:|:-------------------------- |:----------------------------------------------------------------------------|
| 1 | **extensionAmount** | Determines the period of time to extend by, in years. Defaults to 1 year. |
| 2 | **debug** | Increases logging detail. Defaults to False. |
| 3 | **limitParse** | Determines how many domains to parse. Defaults to all. |
| 4 | **disableIdempotentCheck** | Boolean that determines if we should check for idempotence or not. Compares the proposed extension date to the value in TransitionDomains. Defaults to False. |

View file

@ -0,0 +1,212 @@
"""Data migration: Extends expiration dates for valid domains"""
import argparse
from datetime import date
import logging
from django.core.management import BaseCommand
from epplibwrapper.errors import RegistryError
from registrar.models import Domain
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from registrar.models.transition_domain import TransitionDomain
try:
from epplib.exceptions import TransportError
except ImportError:
pass
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Extends expiration dates for valid domains"
def __init__(self):
"""Sets global variables for code tidyness"""
super().__init__()
self.update_success = []
self.update_skipped = []
self.update_failed = []
self.expiration_minimum_cutoff = date(2023, 11, 15)
self.expiration_maximum_cutoff = date(2023, 12, 30)
def add_arguments(self, parser):
"""Add command line arguments."""
parser.add_argument(
"--extensionAmount",
type=int,
default=1,
help="Determines the period (in years) to extend expiration dates by",
)
parser.add_argument(
"--limitParse",
type=int,
default=0,
help="Sets a cap on the number of records to parse",
)
parser.add_argument(
"--disableIdempotentCheck", action=argparse.BooleanOptionalAction, help="Disable script idempotence"
)
parser.add_argument("--debug", action=argparse.BooleanOptionalAction, help="Increases log chattiness")
def handle(self, **options):
"""
Extends the expiration dates for valid domains.
If a parse limit is set and it's less than the total number of valid domains,
the number of domains to change is set to the parse limit.
Includes an idempotence check.
"""
# Retrieve command line options
extension_amount = options.get("extensionAmount")
limit_parse = options.get("limitParse")
disable_idempotence = options.get("disableIdempotentCheck")
debug = options.get("debug")
# Does a check to see if parse_limit is a positive int.
# Raise an error if not.
self.check_if_positive_int(limit_parse, "limitParse")
valid_domains = Domain.objects.filter(
expiration_date__gte=self.expiration_minimum_cutoff,
expiration_date__lte=self.expiration_maximum_cutoff,
state=Domain.State.READY,
).order_by("name")
domains_to_change_count = valid_domains.count()
if limit_parse != 0:
domains_to_change_count = limit_parse
valid_domains = valid_domains[:limit_parse]
# Determines if we should continue code execution or not.
# If the user prompts 'N', a sys.exit() will be called.
self.prompt_user_to_proceed(extension_amount, domains_to_change_count)
for domain in valid_domains:
try:
is_idempotent = self.idempotence_check(domain, extension_amount)
if not disable_idempotence and not is_idempotent:
self.update_skipped.append(domain.name)
logger.info(f"{TerminalColors.YELLOW}" f"Skipping update for {domain}" f"{TerminalColors.ENDC}")
else:
domain.renew_domain(extension_amount)
self.update_success.append(domain.name)
logger.info(
f"{TerminalColors.OKCYAN}"
f"Successfully updated expiration date for {domain}"
f"{TerminalColors.ENDC}"
)
# Catches registry errors. Failures indicate bad data, or a faulty connection.
except (RegistryError, KeyError, TransportError) as err:
self.update_failed.append(domain.name)
logger.error(
f"{TerminalColors.FAIL}" f"Failed to update expiration date for {domain}" f"{TerminalColors.ENDC}"
)
logger.error(err)
except Exception as err:
self.log_script_run_summary(debug)
raise err
self.log_script_run_summary(debug)
# == Helper functions == #
def idempotence_check(self, domain: Domain, extension_amount):
"""Determines if the proposed operation violates idempotency"""
# Because our migration data had a hard stop date, we can determine if our change
# is valid simply checking the date is within a valid range and it was updated
# in epp or not.
# CAVEAT: This is a workaround. A more robust solution would be a db flag
current_expiration_date = domain.registry_expiration_date
transition_domains = TransitionDomain.objects.filter(
domain_name=domain.name, epp_expiration_date=current_expiration_date
)
return transition_domains.count() > 0
def prompt_user_to_proceed(self, extension_amount, domains_to_change_count):
"""Asks if the user wants to proceed with this action"""
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
==Extension Amount==
Period: {extension_amount} year(s)
==Proposed Changes==
Domains to change: {domains_to_change_count}
""",
prompt_title="Do you wish to proceed with these changes?",
)
logger.info(f"{TerminalColors.MAGENTA}" "Preparing to extend expiration dates..." f"{TerminalColors.ENDC}")
def check_if_positive_int(self, value: int, var_name: str):
"""
Determines if the given integer value is positive or not.
If not, it raises an ArgumentTypeError
"""
if value < 0:
raise argparse.ArgumentTypeError(
f"{value} is an invalid integer value for {var_name}. " "Must be positive."
)
return value
def log_script_run_summary(self, debug):
"""Prints success, failed, and skipped counts, as well as
all affected domains."""
update_success_count = len(self.update_success)
update_failed_count = len(self.update_failed)
update_skipped_count = len(self.update_skipped)
# Prepare debug messages
debug_messages = {
"success": (f"{TerminalColors.OKCYAN}Updated these Domains: {self.update_success}{TerminalColors.ENDC}\n"),
"skipped": (f"{TerminalColors.YELLOW}Skipped these Domains: {self.update_skipped}{TerminalColors.ENDC}\n"),
"failed": (
f"{TerminalColors.FAIL}Failed to update these Domains: {self.update_failed}{TerminalColors.ENDC}\n"
),
}
# Print out a list of everything that was changed, if we have any changes to log.
# Otherwise, don't print anything.
TerminalHelper.print_conditional(
debug,
f"{debug_messages.get('success') if update_success_count > 0 else ''}"
f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}"
f"{debug_messages.get('failed') if update_failed_count > 0 else ''}",
)
if update_failed_count == 0 and update_skipped_count == 0:
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============
Updated {update_success_count} Domain entries
{TerminalColors.ENDC}
"""
)
elif update_failed_count == 0:
logger.info(
f"""{TerminalColors.YELLOW}
============= FINISHED ===============
Updated {update_success_count} Domain entries
----- IDEMPOTENCY CHECK FAILED -----
Skipped updating {update_skipped_count} Domain entries
{TerminalColors.ENDC}
"""
)
else:
logger.info(
f"""{TerminalColors.FAIL}
============= FINISHED ===============
Updated {update_success_count} Domain entries
----- UPDATE FAILED -----
Failed to update {update_failed_count} Domain entries,
Skipped updating {update_skipped_count} Domain entries
{TerminalColors.ENDC}
"""
)

View file

@ -1195,7 +1195,7 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error(e)
logger.error(e.code)
raise e
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST and self.state != Domain.State.DELETED:
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST and self.state == Domain.State.UNKNOWN:
# avoid infinite loop
already_tried_to_create = True
self.dns_needed_from_unknown()

View file

@ -620,6 +620,17 @@ class MockEppLib(TestCase):
],
ex_date=datetime.date(2023, 5, 25),
)
mockDataExtensionDomain = fakedEppObject(
"fakePw",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
contacts=[common.DomainContact(contact="123", type=PublicContact.ContactTypeChoices.SECURITY)],
hosts=["fake.host.com"],
statuses=[
common.Status(state="serverTransferProhibited", description="", lang="en"),
common.Status(state="inactive", description="", lang="en"),
],
ex_date=datetime.date(2023, 11, 15),
)
mockDataInfoContact = mockDataInfoDomain.dummyInfoContactResultData(
"123", "123@mail.gov", datetime.datetime(2023, 5, 25, 19, 45, 35), "lastPw"
)
@ -820,6 +831,21 @@ class MockEppLib(TestCase):
ex_date=datetime.date(2023, 5, 25),
)
mockDnsNeededRenewedDomainExpDate = fakedEppObject(
"fakeneeded.gov",
ex_date=datetime.date(2023, 2, 15),
)
mockMaximumRenewedDomainExpDate = fakedEppObject(
"fakemaximum.gov",
ex_date=datetime.date(2024, 12, 31),
)
mockRecentRenewedDomainExpDate = fakedEppObject(
"waterbutpurple.gov",
ex_date=datetime.date(2024, 11, 15),
)
def _mockDomainName(self, _name, _avail=False):
return MagicMock(
res_data=[
@ -918,6 +944,21 @@ class MockEppLib(TestCase):
def mockRenewDomainCommand(self, _request, cleaned):
if getattr(_request, "name", None) == "fake-error.gov":
raise RegistryError(code=ErrorCode.PARAMETER_VALUE_RANGE_ERROR)
elif getattr(_request, "name", None) == "waterbutpurple.gov":
return MagicMock(
res_data=[self.mockRecentRenewedDomainExpDate],
code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
)
elif getattr(_request, "name", None) == "fakeneeded.gov":
return MagicMock(
res_data=[self.mockDnsNeededRenewedDomainExpDate],
code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
)
elif getattr(_request, "name", None) == "fakemaximum.gov":
return MagicMock(
res_data=[self.mockMaximumRenewedDomainExpDate],
code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
)
else:
return MagicMock(
res_data=[self.mockRenewedDomainExpDate],
@ -943,6 +984,7 @@ class MockEppLib(TestCase):
self.infoDomainTwoHosts if self.mockedSendFunction.call_count == 5 else self.infoDomainNoHost,
None,
),
"waterbutpurple.gov": (self.mockDataExtensionDomain, None),
"nameserverwithip.gov": (self.infoDomainHasIP, None),
"namerserversubdomain.gov": (self.infoDomainCheckHostIPCombo, None),
"freeman.gov": (self.InfoDomainWithContacts, None),

View file

@ -18,7 +18,175 @@ from unittest.mock import patch
from registrar.models.contact import Contact
from .common import less_console_noise
from .common import MockEppLib, less_console_noise
class TestExtendExpirationDates(MockEppLib):
def setUp(self):
"""Defines the file name of migration_json and the folder its contained in"""
super().setUp()
# Create a valid domain that is updatable
Domain.objects.get_or_create(
name="waterbutpurple.gov", state=Domain.State.READY, expiration_date=datetime.date(2023, 11, 15)
)
TransitionDomain.objects.get_or_create(
username="testytester@mail.com",
domain_name="waterbutpurple.gov",
epp_expiration_date=datetime.date(2023, 11, 15),
)
# Create a domain with an invalid expiration date
Domain.objects.get_or_create(
name="fake.gov", state=Domain.State.READY, expiration_date=datetime.date(2022, 5, 25)
)
TransitionDomain.objects.get_or_create(
username="themoonisactuallycheese@mail.com",
domain_name="fake.gov",
epp_expiration_date=datetime.date(2022, 5, 25),
)
# Create a domain with an invalid state
Domain.objects.get_or_create(
name="fakeneeded.gov", state=Domain.State.DNS_NEEDED, expiration_date=datetime.date(2023, 11, 15)
)
TransitionDomain.objects.get_or_create(
username="fakeneeded@mail.com",
domain_name="fakeneeded.gov",
epp_expiration_date=datetime.date(2023, 11, 15),
)
# Create a domain with a date greater than the maximum
Domain.objects.get_or_create(
name="fakemaximum.gov", state=Domain.State.READY, expiration_date=datetime.date(2024, 12, 31)
)
TransitionDomain.objects.get_or_create(
username="fakemaximum@mail.com",
domain_name="fakemaximum.gov",
epp_expiration_date=datetime.date(2024, 12, 31),
)
def tearDown(self):
"""Deletes all DB objects related to migrations"""
super().tearDown()
# Delete domain information
Domain.objects.all().delete()
DomainInformation.objects.all().delete()
DomainInvitation.objects.all().delete()
TransitionDomain.objects.all().delete()
# Delete users
User.objects.all().delete()
UserDomainRole.objects.all().delete()
def run_extend_expiration_dates(self):
"""
This method executes the transfer_transition_domains_to_domains command.
The 'call_command' function from Django's management framework is then used to
execute the load_transition_domain command with the specified arguments.
"""
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa
return_value=True,
):
call_command("extend_expiration_dates")
def test_extends_expiration_date_correctly(self):
"""
Tests that the extend_expiration_dates method extends dates as expected
"""
desired_domain = Domain.objects.filter(name="waterbutpurple.gov").get()
desired_domain.expiration_date = datetime.date(2024, 11, 15)
# Run the expiration date script
self.run_extend_expiration_dates()
current_domain = Domain.objects.filter(name="waterbutpurple.gov").get()
self.assertEqual(desired_domain, current_domain)
# Explicitly test the expiration date
self.assertEqual(current_domain.expiration_date, datetime.date(2024, 11, 15))
def test_extends_expiration_date_skips_non_current(self):
"""
Tests that the extend_expiration_dates method correctly skips domains
with an expiration date less than a certain threshold.
"""
desired_domain = Domain.objects.filter(name="fake.gov").get()
desired_domain.expiration_date = datetime.date(2022, 5, 25)
# Run the expiration date script
self.run_extend_expiration_dates()
current_domain = Domain.objects.filter(name="fake.gov").get()
self.assertEqual(desired_domain, current_domain)
# Explicitly test the expiration date. The extend_expiration_dates script
# will skip all dates less than date(2023, 11, 15), meaning that this domain
# should not be affected by the change.
self.assertEqual(current_domain.expiration_date, datetime.date(2022, 5, 25))
def test_extends_expiration_date_skips_maximum_date(self):
"""
Tests that the extend_expiration_dates method correctly skips domains
with an expiration date more than a certain threshold.
"""
desired_domain = Domain.objects.filter(name="fakemaximum.gov").get()
desired_domain.expiration_date = datetime.date(2024, 12, 31)
# Run the expiration date script
self.run_extend_expiration_dates()
current_domain = Domain.objects.filter(name="fakemaximum.gov").get()
self.assertEqual(desired_domain, current_domain)
# Explicitly test the expiration date. The extend_expiration_dates script
# will skip all dates less than date(2023, 11, 15), meaning that this domain
# should not be affected by the change.
self.assertEqual(current_domain.expiration_date, datetime.date(2024, 12, 31))
def test_extends_expiration_date_skips_non_ready(self):
"""
Tests that the extend_expiration_dates method correctly skips domains not in the state "ready"
"""
desired_domain = Domain.objects.filter(name="fakeneeded.gov").get()
desired_domain.expiration_date = datetime.date(2023, 11, 15)
# Run the expiration date script
self.run_extend_expiration_dates()
current_domain = Domain.objects.filter(name="fakeneeded.gov").get()
self.assertEqual(desired_domain, current_domain)
# Explicitly test the expiration date. The extend_expiration_dates script
# will skip all dates less than date(2023, 11, 15), meaning that this domain
# should not be affected by the change.
self.assertEqual(current_domain.expiration_date, datetime.date(2023, 11, 15))
def test_extends_expiration_date_idempotent(self):
"""
Tests the idempotency of the extend_expiration_dates command.
Verifies that running the method multiple times does not change the expiration date
of a domain beyond the initial extension.
"""
desired_domain = Domain.objects.filter(name="waterbutpurple.gov").get()
desired_domain.expiration_date = datetime.date(2024, 11, 15)
# Run the expiration date script
self.run_extend_expiration_dates()
current_domain = Domain.objects.filter(name="waterbutpurple.gov").get()
self.assertEqual(desired_domain, current_domain)
# Explicitly test the expiration date
self.assertEqual(desired_domain.expiration_date, datetime.date(2024, 11, 15))
# Run the expiration date script again
self.run_extend_expiration_dates()
# The old domain shouldn't have changed
self.assertEqual(desired_domain, current_domain)
# Explicitly test the expiration date - should be the same
self.assertEqual(desired_domain.expiration_date, datetime.date(2024, 11, 15))
class TestProcessedMigrations(TestCase):