Update extend_expiration_dates.py

This commit is contained in:
zandercymatics 2023-12-12 08:55:29 -07:00
parent 23db6dea9d
commit 89a04edfe6
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7

View file

@ -2,13 +2,14 @@
import argparse import argparse
from datetime import date from datetime import date
import datetime
import logging import logging
from django.core.management import BaseCommand from django.core.management import BaseCommand
from epplibwrapper.errors import RegistryError from epplibwrapper.errors import RegistryError
from registrar.models import Domain from registrar.models import Domain
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from datetime import datetime
try: try:
from epplib.exceptions import TransportError from epplib.exceptions import TransportError
except ImportError: except ImportError:
@ -27,6 +28,7 @@ class Command(BaseCommand):
self.update_success = [] self.update_success = []
self.update_skipped = [] self.update_skipped = []
self.update_failed = [] self.update_failed = []
self.expiration_cutoff = date(2023, 11, 15)
def add_arguments(self, parser): def add_arguments(self, parser):
"""Add command line arguments.""" """Add command line arguments."""
@ -68,7 +70,7 @@ class Command(BaseCommand):
self.check_if_positive_int(limit_parse, "limitParse") self.check_if_positive_int(limit_parse, "limitParse")
valid_domains = Domain.objects.filter( valid_domains = Domain.objects.filter(
expiration_date__gte=date(2023, 11, 15), state=Domain.State.READY expiration_date__gte=self.expiration_cutoff, state=Domain.State.READY
).order_by("name") ).order_by("name")
domains_to_change_count = valid_domains.count() domains_to_change_count = valid_domains.count()
@ -81,49 +83,44 @@ class Command(BaseCommand):
self.prompt_user_to_proceed(extension_amount, domains_to_change_count) self.prompt_user_to_proceed(extension_amount, domains_to_change_count)
for domain in valid_domains: for domain in valid_domains:
is_idempotent = self.idempotence_check(domain, extension_amount) try:
if not disable_idempotence and not is_idempotent: is_idempotent = self.idempotence_check(domain, extension_amount)
self.update_skipped.append(domain.name) if not disable_idempotence and not is_idempotent:
logger.info(f"{TerminalColors.YELLOW}" f"Skipping update for {domain}" f"{TerminalColors.ENDC}") self.update_skipped.append(domain.name)
logger.info(f"{TerminalColors.YELLOW}" f"Skipping update for {domain}" f"{TerminalColors.ENDC}")
else:
domain.renew_domain(extension_amount)
# Catches registry errors. Failures indicate bad data, or a faulty connection.
except (RegistryError, KeyError, TransportError) as err:
self.update_failed.append(domain.name)
logger.error(
f"{TerminalColors.FAIL}" f"Failed to update expiration date for {domain}" f"{TerminalColors.ENDC}"
)
logger.error(err)
else: else:
self.extend_expiration_date_on_domain(domain, extension_amount, debug) self.update_success.append(domain.name)
logger.info(
self.log_script_run_summary(debug) f"{TerminalColors.OKCYAN}" f"Successfully updated expiration date for {domain}" f"{TerminalColors.ENDC}"
)
def extend_expiration_date_on_domain(self, domain: Domain, extension_amount: int, debug: bool): finally:
""" self.log_script_run_summary(debug)
Given a particular domain,
extend the expiration date by the period specified in extension_amount
"""
try:
domain.renew_domain(extension_amount)
except (RegistryError, TransportError) as err:
logger.error(
f"{TerminalColors.FAIL}" f"Failed to update expiration date for {domain}" f"{TerminalColors.ENDC}"
)
logger.error(err)
except Exception as err:
self.log_script_run_summary(debug)
raise err
else:
self.update_success.append(domain.name)
logger.info(
f"{TerminalColors.OKCYAN}" f"Successfully updated expiration date for {domain}" f"{TerminalColors.ENDC}"
)
# == Helper functions == # # == Helper functions == #
def idempotence_check(self, domain, extension_amount): def idempotence_check(self, domain: Domain, extension_amount):
"""Determines if the proposed operation violates idempotency""" """Determines if the proposed operation violates idempotency"""
proposed_date = self.add_years(domain.registry_expiration_date, extension_amount)
# Because our migration data had a hard stop date, we can determine if our change # Because our migration data had a hard stop date, we can determine if our change
# is valid simply checking if adding two years to our current date yields a greater date # is valid simply checking the date is within a valid range and it was updated
# than the proposed. # in epp on the current day.
# CAVEAT: This check stops working after a year has elapsed between when this script # CAVEAT: This check stops working a day after it is ran (for some domains) and
# was ran, and when it was ran again. This is good enough for now, but a more robust # if the domain was updated by a user on the day it was ran. A more robust
# solution would be a DB flag. # solution would be a db flag
extension_from_today = self.add_years(date.today(), extension_amount + 1) proposed_date = self.add_years(domain.registry_expiration_date, extension_amount)
is_idempotent = proposed_date < extension_from_today minimum_extension_date = self.add_years(self.expiration_cutoff, extension_amount)
return is_idempotent maximum_extension_date = self.add_years(date(2025, 12, 31), extension_amount)
valid_range = minimum_extension_date <= proposed_date <= maximum_extension_date
return valid_range
def prompt_user_to_proceed(self, extension_amount, domains_to_change_count): def prompt_user_to_proceed(self, extension_amount, domains_to_change_count):
"""Asks if the user wants to proceed with this action""" """Asks if the user wants to proceed with this action"""