finished comments and polishing

Signed-off-by: CocoByte <nicolle.leclair@gmail.com>
This commit is contained in:
CocoByte 2023-10-06 02:06:53 -06:00
parent 1b0c261729
commit d468c3f4eb
No known key found for this signature in database
GPG key ID: BBFAA2526384C97F
2 changed files with 66 additions and 20 deletions

View file

@ -30,7 +30,7 @@ class termColors:
BackgroundLightYellow = "\033[103m"
def query_yes_no(question, default="yes"):
def query_yes_no(question: str, default="yes") -> dict[str,bool]:
"""Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
@ -62,11 +62,24 @@ def query_yes_no(question, default="yes"):
class Command(BaseCommand):
help = """Load data for domains that are in transition
help = """Loads data for domains that are in transition
(populates transition_domain model objects)."""
def add_arguments(self, parser):
"""Add our three filename arguments."""
"""Add our three filename arguments (in order: domain contacts,
contacts, and domain statuses)
OPTIONAL ARGUMENTS:
--sep
The default delimiter is set to "|", but may be changed using --sep
--debug
A boolean (default to true), which activates additional print statements
--limitParse
Used to set a limit for the number of data entries to insert. Set to 0
(or just don't use this argument) to parse every entry.
--resetTable
Use this to trigger a prompt for deleting all table entries. Useful
for testing purposes, but USE WITH CAUTION
"""
parser.add_argument(
"domain_contacts_filename",
help="Data file with domain contact information"
@ -94,7 +107,9 @@ class Command(BaseCommand):
action=argparse.BooleanOptionalAction,
)
def print_debug_mode_statements(self, debug_on, debug_max_entries_to_parse):
def print_debug_mode_statements(self, debug_on: bool, debug_max_entries_to_parse: int):
"""Prints additional terminal statements to indicate if --debug
or --limitParse are in use"""
if debug_on:
logger.info(
f"""{termColors.OKCYAN}
@ -114,7 +129,7 @@ class Command(BaseCommand):
"""
)
def get_domain_user_dict(self, domain_statuses_filename, sep):
def get_domain_user_dict(self, domain_statuses_filename: str, sep: str) -> defaultdict(str):
"""Creates a mapping of domain name -> status"""
domain_status_dictionary = defaultdict(str)
logger.info("Reading domain statuses data file %s", domain_statuses_filename)
@ -126,7 +141,7 @@ class Command(BaseCommand):
logger.info("Loaded statuses for %d domains", len(domain_status_dictionary))
return domain_status_dictionary
def get_user_emails_dict(self, contacts_filename, sep):
def get_user_emails_dict(self, contacts_filename: str, sep) -> defaultdict(str):
"""Creates mapping of userId -> emails"""
user_emails_dictionary = defaultdict(list)
logger.info("Reading domain-contacts data file %s", contacts_filename)
@ -138,12 +153,15 @@ class Command(BaseCommand):
logger.info("Loaded emails for %d users", len(user_emails_dictionary))
return user_emails_dictionary
def get_mapped_status(self, status_to_map):
# Map statuses as follows;
# "serverHold” fields will map to hold clientHold to hold
# and any ok state should map to Ready.
# Check if there are any statuses that are not
# serverhold, client hold or OK in the original data set.
def get_mapped_status(self, status_to_map: str) -> str:
"""
Given a verisign domain status, return a corresponding
status defined for our domains.
We map statuses as follows;
"serverHold” fields will map to hold, clientHold to hold
and any ok state should map to Ready.
"""
status_maps = {
"hold": TransitionDomain.StatusChoices.HOLD,
"serverhold": TransitionDomain.StatusChoices.HOLD,
@ -154,8 +172,18 @@ class Command(BaseCommand):
return status_maps.get(status_to_map)
def print_summary_duplications(
self, duplicate_domain_user_combos, duplicate_domains, users_without_email
self,
duplicate_domain_user_combos: list[TransitionDomain],
duplicate_domains: list[TransitionDomain],
users_without_email: list[str]
):
"""Called at the end of the script execution to print out a summary of
data anomalies in the imported Verisign data. Currently, we check for:
- duplicate domains
- duplicate domain - user pairs
- any users without e-mails (this would likely only happen if the contacts
file is missing a user found in the domain_contacts file)
"""
total_duplicate_pairs = len(duplicate_domain_user_combos)
total_duplicate_domains = len(duplicate_domains)
total_users_without_email = len(users_without_email)
@ -189,7 +217,16 @@ class Command(BaseCommand):
{termColors.ENDC}"""
)
def print_summary_status_findings(self, domains_without_status, outlier_statuses):
def print_summary_status_findings(self,
domains_without_status: list[str],
outlier_statuses: list[str]
):
"""Called at the end of the script execution to print out a summary of
status anomolies in the imported Verisign data. Currently, we check for:
- domains without a status
- any statuses not accounted for in our status mappings (see
get_mapped_status() function)
"""
total_domains_without_status = len(domains_without_status)
total_outlier_statuses = len(outlier_statuses)
if total_domains_without_status > 0:
@ -235,9 +272,11 @@ class Command(BaseCommand):
domain_statuses_filename,
**options,
):
"""Load the data files and create the DomainInvitations."""
"""Parse the data files and create TransitionDomains."""
sep = options.get("sep")
# If --resetTable was used, prompt user to confirm
# deletion of table data
if options.get("resetTable"):
confirm_reset = query_yes_no(
f"""
@ -255,11 +294,15 @@ class Command(BaseCommand):
)
TransitionDomain.objects.all().delete()
# Get --debug argument
debug_on = options.get("debug")
# Get --LimitParse argument
debug_max_entries_to_parse = int(
options.get("limitParse")
) # set to 0 to parse all entries
# print message to terminal about which args are in use
self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse)
# STEP 1:
@ -300,6 +343,7 @@ class Command(BaseCommand):
# track of total rows parsed)
total_rows_parsed = 0
# Start parsing the main file and create TransitionDomain objects
logger.info("Reading domain-contacts data file %s", domain_contacts_filename)
with open(domain_contacts_filename, "r") as domain_contacts_file:
for row in csv.reader(domain_contacts_file, delimiter=sep):
@ -308,7 +352,7 @@ class Command(BaseCommand):
# fields are just domain, userid, role
# lowercase the domain names
new_entry_domainName = row[0].lower()
userId = row[1]
user_id = row[1]
new_entry_status = TransitionDomain.StatusChoices.READY
new_entry_email = ""
@ -327,12 +371,12 @@ class Command(BaseCommand):
else:
new_entry_status = mapped_status
if userId not in user_emails_dictionary:
if user_id not in user_emails_dictionary:
# this user has no e-mail...this should never happen
if userId not in users_without_email:
users_without_email.append(userId)
if user_id not in users_without_email:
users_without_email.append(user_id)
else:
new_entry_email = user_emails_dictionary[userId]
new_entry_email = user_emails_dictionary[user_id]
# Check for duplicate data in the file we are
# parsing so we do not add duplicates