Finish script

This commit is contained in:
zandercymatics 2023-12-07 13:25:56 -07:00
parent 0ee7c598f8
commit 8faef8b9d1
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7

View file

@ -5,8 +5,13 @@ from datetime import date
import logging import logging
from django.core.management import BaseCommand from django.core.management import BaseCommand
from epplibwrapper.errors import RegistryError
from registrar.models import Domain from registrar.models import Domain
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from dateutil.relativedelta import relativedelta
from epplib.exceptions import TransportError
except ImportError:
pass
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -14,12 +19,13 @@ logger = logging.getLogger(__name__)
class Command(BaseCommand): class Command(BaseCommand):
help = "Extends expiration dates for valid domains" help = "Extends expiration dates for valid domains"
# Generates test transition domains for testing send_domain_invitations script. def __init__(self):
# Running this script removes all existing transition domains, so use with caution. """Sets global variables for code tidyness"""
# Transition domains are created with email addresses provided as command line super().__init__()
# argument. Email addresses for testing are passed as comma delimited list of self.update_success = []
# email addresses, and are required to be provided. Email addresses from the list self.update_skipped = []
# are assigned to transition domains at time of creation. self.update_failed = []
self.debug = False
def add_arguments(self, parser): def add_arguments(self, parser):
"""Add command line arguments.""" """Add command line arguments."""
@ -30,26 +36,98 @@ class Command(BaseCommand):
help="Determines the period (in years) to extend expiration dates by", help="Determines the period (in years) to extend expiration dates by",
) )
parser.add_argument( parser.add_argument(
"--parseLimit", "--limitParse",
type=int, type=int,
default=0, default=0,
help="Sets a cap on the number of records to parse", help="Sets a cap on the number of records to parse",
) )
parser.add_argument(
"--disableIdempotentCheck",
action=argparse.BooleanOptionalAction,
help="Disable script idempotence"
)
parser.add_argument(
"--debug",
action=argparse.BooleanOptionalAction,
help="Increases log chattiness"
)
def handle(self, **options): def handle(self, **options):
"""""" """"""
extension_amount = options.get("extensionAmount") extension_amount = options.get("extensionAmount")
parse_limit = options.get("parseLimit") limit_parse = options.get("limitParse")
disable_idempotence = options.get("disableIdempotentCheck")
self.debug = options.get("debug")
# Does a check to see if parse_limit is a positive int # Does a check to see if parse_limit is a positive int.
self.check_if_positive_int(parse_limit, "parseLimit") # Raise an error if not.
self.check_if_positive_int(limit_parse, "limitParse")
# TODO - Do we need to check status?
valid_domains = Domain.objects.filter( valid_domains = Domain.objects.filter(
expiration_date__gte=date(2023, 11, 15), expiration_date__gte=date(2023, 11, 15),
State=Domain.State.READY state=Domain.State.READY
) ).order_by("name")
domains_to_change_count = valid_domains.count()
if limit_parse != 0 and limit_parse < domains_to_change_count:
domains_to_change_count = limit_parse
# Determines if we should continue code execution or not.
# If the user prompts 'N', a sys.exit() will be called.
self.prompt_user_to_proceed(extension_amount, domains_to_change_count)
for i, domain in enumerate(valid_domains):
if limit_parse != 0 and i > limit_parse:
break
is_idempotent = self.idempotence_check(domain, extension_amount)
if not disable_idempotence and not is_idempotent:
self.update_skipped.append(domain.name)
else:
self.extend_expiration_date_on_domain(domain, extension_amount)
self.log_script_run_summary()
def extend_expiration_date_on_domain(self, domain: Domain, extension_amount: int):
"""
Given a particular domain,
extend the expiration date by the period specified in extension_amount
"""
try:
domain.renew_domain(extension_amount)
except (RegistryError, TransportError) as err:
logger.error(
f"{TerminalColors.FAIL}"
f"Failed to update expiration date for {domain}"
f"{TerminalColors.ENDC}"
)
logger.error(err)
except Exception as err:
self.log_script_run_summary()
raise err
else:
self.update_success.append(domain.name)
logger.info(
f"{TerminalColors.OKCYAN}"
f"Successfully updated expiration date for {domain}"
f"{TerminalColors.ENDC}"
)
# == Helper functions == #
def idempotence_check(self, domain, extension_amount):
"""Determines if the proposed operation violates idempotency"""
proposed_date = domain.expiration_date + relativedelta(years=extension_amount)
# Because our migration data had a hard stop date, we can determine if our change
# is valid simply checking if adding a year to our current date yields a greater date
# than the proposed.
# CAVEAT: This check stops working after a year has elapsed between when this script
# was ran, and when it was ran again. This is good enough for now, but a more robust
# solution would be a DB flag.
is_idempotent = proposed_date < date.today() + relativedelta(years=extension_amount)
return is_idempotent
def prompt_user_to_proceed(self, extension_amount, domains_to_change_count):
"""Asks if the user wants to proceed with this action"""
TerminalHelper.prompt_for_execution( TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True, system_exit_on_terminate=True,
info_to_inspect=f""" info_to_inspect=f"""
@ -57,7 +135,7 @@ class Command(BaseCommand):
Period: {extension_amount} year(s) Period: {extension_amount} year(s)
==Proposed Changes== ==Proposed Changes==
Domains to change: {valid_domains.count()} Domains to change: {domains_to_change_count}
""", """,
prompt_title="Do you wish to modify Expiration Dates for the given Domains?", prompt_title="Do you wish to modify Expiration Dates for the given Domains?",
) )
@ -68,29 +146,84 @@ class Command(BaseCommand):
f"{TerminalColors.ENDC}" f"{TerminalColors.ENDC}"
) )
for i, domain in enumerate(valid_domains): def log_script_run_summary(self):
if i > parse_limit: """Prints success, failed, and skipped counts, as well as
break all affected domains."""
update_success_count = len(self.update_success)
update_failed_count = len(self.update_failed)
update_skipped_count = len(self.update_skipped)
if update_failed_count == 0 and update_skipped_count == 0:
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============
Updated {update_success_count} Domain entries
{TerminalColors.ENDC}
"""
)
TerminalHelper.print_conditional(
self.debug,
f"""
{TerminalColors.OKGREEN}
Updated the following Domains: {self.update_success}
{TerminalColors.ENDC}
""",
)
elif update_failed_count == 0:
TerminalHelper.print_conditional(
self.debug,
f"""
{TerminalColors.OKGREEN}
Updated the following Domains: {self.update_success}
{TerminalColors.ENDC}
{TerminalColors.YELLOW}
Skipped the following Domains: {self.update_skipped}
{TerminalColors.ENDC}
""",
)
logger.info(
f"""{TerminalColors.YELLOW}
============= FINISHED ===============
Updated {update_success_count} Domain entries
----- IDEMPOTENCY CHECK FAILED -----
Skipped updating {update_skipped_count} Domain entries
{TerminalColors.ENDC}
"""
)
else:
TerminalHelper.print_conditional(
self.debug,
f"""
{TerminalColors.OKGREEN}
Updated the following Domains: {self.update_success}
{TerminalColors.ENDC}
{TerminalColors.YELLOW}
Skipped the following Domains: {self.update_skipped}
{TerminalColors.ENDC}
{TerminalColors.FAIL}
Failed to update the following Domains: {self.update_failed}
{TerminalColors.ENDC}
""",
)
logger.info(
f"""{TerminalColors.FAIL}
============= FINISHED ===============
Updated {update_success_count} Domain entries
----- UPDATE FAILED -----
Failed to update {update_failed_count} Domain entries,
Skipped updating {update_skipped_count} Domain entries
{TerminalColors.ENDC}
"""
)
try:
domain.renew_domain(extension_amount)
except Exception as err:
logger.error(
f"{TerminalColors.OKBLUE}"
f"Failed to update expiration date for {domain}"
f"{TerminalColors.ENDC}"
)
raise err
else:
logger.info(
f"{TerminalColors.OKBLUE}"
f"Successfully updated expiration date for {domain}"
f"{TerminalColors.ENDC}"
)
def check_if_positive_int(self, value: int, var_name: str): def check_if_positive_int(self, value: int, var_name: str):
""" """
Determines if the given integer value is postive or not. Determines if the given integer value is positive or not.
If not, it raises an ArgumentTypeError If not, it raises an ArgumentTypeError
""" """
if value < 0: if value < 0: