cleanup - linting in progress

This commit is contained in:
CocoByte 2023-09-25 14:33:51 -06:00
parent 7be4c300bf
commit 0a05fe92e2
No known key found for this signature in database
GPG key ID: BBFAA2526384C97F
2 changed files with 288 additions and 91 deletions

View file

@ -1,11 +1,13 @@
"""Load domain invitations for existing domains and their contacts.""" """Load domain invitations for existing domains and their contacts."""
# NOTE: Do we want to add userID to transition_domain? (user might have multiple emails??) # NOTE: Do we want to add userID to transition_domain?
# (user might have multiple emails??)
# NOTE: How to determine of email has been sent?? # NOTE: How to determine of email has been sent??
import csv import csv
import logging import logging
import argparse
from collections import defaultdict from collections import defaultdict
@ -16,8 +18,55 @@ from registrar.models import TransitionDomain
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class termColors:
"""Colors for terminal outputs
(makes reading the logs WAY easier)"""
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
BackgroundLightYellow = "\033[103m"
def query_yes_no(question, default="yes"):
"""Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
"""
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
logger.info(question + prompt)
choice = input().lower()
if default is not None and choice == "":
return valid[default]
elif choice in valid:
return valid[choice]
else:
logger.info("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
class Command(BaseCommand): class Command(BaseCommand):
help = """Load data for domains that are in transition help = """Load data for domains that are in transition
(populates transition_domain model objects).""" (populates transition_domain model objects)."""
def add_arguments(self, parser): def add_arguments(self, parser):
@ -35,40 +84,73 @@ class Command(BaseCommand):
parser.add_argument("--sep", default="|", help="Delimiter character") parser.add_argument("--sep", default="|", help="Delimiter character")
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
parser.add_argument(
"--limitParse", default=0, help="Sets max number of entries to load"
)
parser.add_argument(
"--resetTable",
help="Deletes all data in the TransitionDomain table",
action=argparse.BooleanOptionalAction,
)
def handle( def handle(
self, self,
domain_contacts_filename, domain_contacts_filename,
contacts_filename, contacts_filename,
domain_statuses_filename, domain_statuses_filename,
**options **options,
): ):
"""Load the data files and create the DomainInvitations.""" """Load the data files and create the DomainInvitations."""
sep = options.get("sep") sep = options.get("sep")
debug_MaxEntriesToParse = 10 if options.get("resetTable"):
confirmReset = query_yes_no(
f"""
{termColors.FAIL}
WARNING: Resetting the table will permanently delete all the data!
Are you sure you want to continue?{termColors.ENDC}"""
)
if confirmReset:
logger.info(
f"""{termColors.WARNING}
----------Clearing Table Data----------
(please wait)
{termColors.ENDC}"""
)
TransitionDomain.objects.all().delete()
class termColors: debugOn = options.get("debug")
HEADER = '\033[95m' debug_MaxEntriesToParse = int(
OKBLUE = '\033[94m' options.get("limitParse")
OKCYAN = '\033[96m' ) # set to 0 to parse all entries
OKGREEN = '\033[92m'
WARNING = '\033[93m' if debugOn:
FAIL = '\033[91m' logger.info(
ENDC = '\033[0m' f"""{termColors.OKCYAN}
BOLD = '\033[1m' ----------DEBUG MODE ON----------
UNDERLINE = '\033[4m' Detailed print statements activated.
if __debug__: {termColors.ENDC}
print(termColors.WARNING) """
print("----------DEBUG MODE ON----------") )
print(f"Parsing of entries will be limited to {debug_MaxEntriesToParse} lines per file.") if debug_MaxEntriesToParse > 0:
print("Detailed print statements activated.") logger.info(
print(termColors.ENDC) f"""{termColors.OKCYAN}
----------LIMITER ON----------
Parsing of entries will be limited to {debug_MaxEntriesToParse} lines per file.")
Detailed print statements activated.
{termColors.ENDC}
"""
)
# STEP 1: # STEP 1:
# Create mapping of domain name -> status # Create mapping of domain name -> status
# TODO: figure out latest status # TODO: figure out latest status
domain_status_dictionary = defaultdict(str) # NOTE: how to determine "most recent" status? domain_status_dictionary = defaultdict(
str
) # NOTE: how to determine "most recent" status?
logger.info("Reading domain statuses data file %s", domain_statuses_filename) logger.info("Reading domain statuses data file %s", domain_statuses_filename)
with open(domain_statuses_filename, "r") as domain_statuses_file: with open(domain_statuses_filename, "r") as domain_statuses_file:
for row in csv.reader(domain_statuses_file, delimiter=sep): for row in csv.reader(domain_statuses_file, delimiter=sep):
@ -97,24 +179,32 @@ class Command(BaseCommand):
to_create = [] to_create = []
# keep track of statuses that don't match our available status values # keep track of statuses that don't match our available status values
outlier_statuses = set outlier_statuses = []
total_outlier_statuses = 0 total_outlier_statuses = 0
# keep track of domains that have no known status # keep track of domains that have no known status
domains_without_status = set domains_without_status = []
total_domains_without_status = 0 total_domains_without_status = 0
# keep track of users that have no e-mails # keep track of users that have no e-mails
users_without_email = set users_without_email = []
total_users_without_email = 0 total_users_without_email = 0
# keep track of dupliucations..
duplicate_domains = []
duplicate_domain_user_combos = []
# keep track of domains we UPDATED (instead of ones we added) # keep track of domains we UPDATED (instead of ones we added)
total_updated_domain_entries = 0 total_updated_domain_entries = 0
total_new_entries = 0 total_new_entries = 0
total_rows_parsed = 0
logger.info("Reading domain-contacts data file %s", domain_contacts_filename) logger.info("Reading domain-contacts data file %s", domain_contacts_filename)
with open(domain_contacts_filename, "r") as domain_contacts_file: with open(domain_contacts_filename, "r") as domain_contacts_file:
for row in csv.reader(domain_contacts_file, delimiter=sep): for row in csv.reader(domain_contacts_file, delimiter=sep):
total_rows_parsed += 1
# fields are just domain, userid, role # fields are just domain, userid, role
# lowercase the domain names # lowercase the domain names
domainName = row[0].lower() domainName = row[0].lower()
@ -127,20 +217,31 @@ class Command(BaseCommand):
if domainName not in domain_status_dictionary: if domainName not in domain_status_dictionary:
# this domain has no status...default to "Create" # this domain has no status...default to "Create"
# domains_without_status.add(domainName) if domainName not in domains_without_status:
# print("No status found for domain: "+domainName) domains_without_status.append(domainName)
total_domains_without_status += 1 total_domains_without_status += 1
else: else:
originalStatus = domain_status_dictionary[domainName] originalStatus = domain_status_dictionary[domainName]
# print(originalStatus) # print(originalStatus)
if originalStatus in TransitionDomain.StatusChoices.values: if originalStatus in TransitionDomain.StatusChoices.values:
# print("YAY") # This status maps directly to our available status options
domainStatus = originalStatus domainStatus = originalStatus
else: else:
# default all other statuses to "Create" # Map all other status as follows;
# outlier_statuses.add(originalStatus) # "serverHold” fields will map to hold clientHold to hold
# print("Unknown status: "+originalStatus) # and any ok state should map to Ready.
total_outlier_statuses += 1 # Check if there are any statuses that are not
# serverhold, client hold or OK in the original data set.
if (
originalStatus.lower() == "serverhold"
or originalStatus.lower() == "clienthold"
):
domainStatus = TransitionDomain.StatusChoices.HOLD
elif originalStatus.lower() != "ok":
if originalStatus not in outlier_statuses:
outlier_statuses.append(originalStatus)
logger.info("Unknown status: " + originalStatus)
total_outlier_statuses += 1
if userId not in user_emails_dictionary: if userId not in user_emails_dictionary:
# this user has no e-mail...this should never happen # this user has no e-mail...this should never happen
@ -150,73 +251,164 @@ class Command(BaseCommand):
else: else:
userEmail = user_emails_dictionary[userId] userEmail = user_emails_dictionary[userId]
# Check to see if this domain-user pairing already exists so we don't add duplicates # Check for duplicate data in the file we are parsing so we do not add duplicates
''' # NOTE: Currently, we allow duplicate domains, but not duplicate domain-user pairs.
newOrExistingEntry, isNew = TransitionDomain.objects.get_or_create( # However, track duplicate domains for now, since we are still deciding on whether
username=userEmail, # to make this field unique or not. ~10/25/2023
domain_name=domainName tempEntry_domain = next(
(x for x in to_create if x.domain_name == domainName), None
) )
if isNew: tempEntry_domainUserPair = next(
total_updated_domain_entries += 1 (
x
for x in to_create
if x.username == userEmail and x.domain_name == domainName
),
None,
)
if tempEntry_domain is not None:
if debugOn:
logger.info(
f"{termColors.WARNING} DUPLICATE Verisign entries found for domain: {domainName} {termColors.ENDC}"
)
if domainName not in duplicate_domains:
duplicate_domains.append(domainName)
if tempEntry_domainUserPair != None:
if debugOn:
logger.info(
f"""
{termColors.WARNING}
DUPLICATE Verisign entries found for domain - user {termColors.BackgroundLightYellow} PAIR {termColors.ENDC}{termColors.WARNING}:
{domainName} - {user_email} {termColors.ENDC}"""
)
if tempEntry_domainUserPair not in duplicate_domain_user_combos:
duplicate_domain_user_combos.append(tempEntry_domainUserPair)
else: else:
total_new_entries += 1 try:
newOrExistingEntry.status = domainStatus existingEntry = TransitionDomain.objects.get(
newOrExistingEntry.email_sent = emailSent username=userEmail, domain_name=domainName
to_create.append( )
newOrExistingEntry
)
'''
try: if existingEntry.status != domainStatus:
existingEntry = TransitionDomain.objects.get( # DEBUG:
username=userEmail, if debugOn:
domain_name=domainName logger.info(
) f"""{termColors.OKCYAN}
Updating entry: {existingEntry}
Status: {existingEntry.status} > {domainStatus}
Email Sent: {existingEntry.email_sent} > {emailSent}
{termColors.ENDC}"""
)
# DEBUG: existingEntry.status = domainStatus
if __debug__:
print(termColors.WARNING)
print("Updating entry: ", existingEntry)
print(" Status: ", existingEntry.status, " > ",domainStatus)
print(" Email Sent: ", existingEntry.email_sent, " > ", emailSent)
existingEntry.status = domainStatus existingEntry.email_sent = emailSent
existingEntry.email_sent = emailSent existingEntry.save()
existingEntry.save() except TransitionDomain.DoesNotExist:
except TransitionDomain.DoesNotExist: # no matching entry, make one
# no matching entry, make one newEntry = TransitionDomain(
newEntry = TransitionDomain( username=userEmail,
username=userEmail, domain_name=domainName,
domain_name=domainName, status=domainStatus,
status = domainStatus, email_sent=emailSent,
email_sent = emailSent )
) to_create.append(newEntry)
to_create.append(newEntry) total_new_entries += 1
# DEBUG:
if debugOn:
logger.info(
f"{termColors.OKCYAN} Adding entry {total_new_entries}: {newEntry} {termColors.ENDC}"
)
except TransitionDomain.MultipleObjectsReturned:
logger.info(
f"""
{termColors.FAIL}
!!! ERROR: duplicate entries exist in the transtion_domain table for domain: {domainName}
----------TERMINATING----------"""
)
import sys
sys.exit()
# DEBUG:
if __debug__:
print("Adding entry ",total_new_entries,": ", newEntry)
total_new_entries += 1
# DEBUG: # DEBUG:
if __debug__: if debugOn or debug_MaxEntriesToParse > 0:
if total_new_entries > debug_MaxEntriesToParse: if (
print("----BREAK----") total_rows_parsed > debug_MaxEntriesToParse
print(termColors.ENDC) and debug_MaxEntriesToParse != 0
):
logger.info(
f"""{termColors.WARNING}
----BREAK----
{termColors.ENDC}
"""
)
break break
logger.info("Creating %d transition domain entries", len(to_create))
TransitionDomain.objects.bulk_create(to_create) TransitionDomain.objects.bulk_create(to_create)
logger.info( logger.info(
"""Created %d transition domain entries, f"""{termColors.OKGREEN}
updated %d transition domain entries,
found %d users without email, ============= FINISHED ===============
found %d unique statuses that do not map to existing status values""", Created {total_new_entries} transition domain entries,
total_new_entries, updated {total_updated_domain_entries} transition domain entries
total_updated_domain_entries, {termColors.ENDC}
total_users_without_email, """
total_outlier_statuses,
) )
# TODO: add more info to logger?
# Print a summary of findings (duplicate entries, missing data..etc.)
totalDupDomainUserPairs = len(duplicate_domain_user_combos)
totalDupDomains = len(duplicate_domains)
if total_users_without_email > 0:
logger.warning(
"No e-mails found for users: {}".format(
", ".join(map(str, users_without_email))
)
)
if totalDupDomainUserPairs > 0 or totalDupDomains > 0:
temp_dupPairsAsString = "{}".format(
", ".join(map(str, duplicate_domain_user_combos))
)
temp_dupDomainsAsString = "{}".format(
", ".join(map(str, duplicate_domains))
)
logger.warning(
f"""{termColors.WARNING}
----DUPLICATES FOUND-----
{totalDupDomainUserPairs} DOMAIN - USER pairs were NOT unique in the supplied data files;
{temp_dupPairsAsString}
{totalDupDomains} DOMAINS were NOT unique in the supplied data files;
{temp_dupDomainsAsString}
{termColors.ENDC}"""
)
if total_domains_without_status > 0:
temp_arrayToString = "{}".format(
", ".join(map(str, domains_without_status))
)
logger.warning(
f"""{termColors.WARNING}
----Found {total_domains_without_status} domains without a status (defaulted to READY)-----
{temp_arrayToString}
{termColors.ENDC}"""
)
if total_outlier_statuses > 0:
temp_arrayToString = "{}".format(", ".join(map(str, outlier_statuses)))
logger.warning(
f"""{termColors.WARNING}
----Found {total_outlier_statuses} unaccounted for statuses-----
No mappings found for the following statuses (defaulted to Ready):
{temp_arrayToString}
{termColors.ENDC}"""
)

View file

@ -2,9 +2,11 @@ from django.db import models
from .utility.time_stamped_model import TimeStampedModel from .utility.time_stamped_model import TimeStampedModel
class StatusChoices(models.TextChoices): class StatusChoices(models.TextChoices):
CREATED = "created", "Created" CREATED = "created", "Created"
HOLD = "hold", "Hold" HOLD = "hold", "Hold"
class TransitionDomain(TimeStampedModel): class TransitionDomain(TimeStampedModel):
"""Transition Domain model stores information about the """Transition Domain model stores information about the
@ -28,6 +30,7 @@ class TransitionDomain(TimeStampedModel):
max_length=255, max_length=255,
null=False, null=False,
blank=True, blank=True,
default=StatusChoices.CREATED,
choices=StatusChoices.choices, choices=StatusChoices.choices,
verbose_name="Status", verbose_name="Status",
help_text="domain status during the transfer", help_text="domain status during the transfer",
@ -43,4 +46,6 @@ class TransitionDomain(TimeStampedModel):
return ( return (
f"username: {self.username} " f"username: {self.username} "
f"domainName: {self.domain_name} " f"domainName: {self.domain_name} "
f"status: {self.status} "
f"email sent: {self.email_sent} "
) )