mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-05-31 17:53:56 +02:00
linted
Signed-off-by: CocoByte <nicolle.leclair@gmail.com>
This commit is contained in:
parent
c971d838fe
commit
86aeb2e140
1 changed files with 33 additions and 29 deletions
|
@ -30,7 +30,7 @@ class termColors:
|
||||||
BackgroundLightYellow = "\033[103m"
|
BackgroundLightYellow = "\033[103m"
|
||||||
|
|
||||||
|
|
||||||
def query_yes_no(question: str, default="yes") -> dict[str,bool]:
|
def query_yes_no(question: str, default="yes") -> bool:
|
||||||
"""Ask a yes/no question via raw_input() and return their answer.
|
"""Ask a yes/no question via raw_input() and return their answer.
|
||||||
|
|
||||||
"question" is a string that is presented to the user.
|
"question" is a string that is presented to the user.
|
||||||
|
@ -81,16 +81,14 @@ class Command(BaseCommand):
|
||||||
for testing purposes, but USE WITH CAUTION
|
for testing purposes, but USE WITH CAUTION
|
||||||
"""
|
"""
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"domain_contacts_filename",
|
"domain_contacts_filename", help="Data file with domain contact information"
|
||||||
help="Data file with domain contact information"
|
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"contacts_filename",
|
"contacts_filename",
|
||||||
help="Data file with contact information",
|
help="Data file with contact information",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"domain_statuses_filename",
|
"domain_statuses_filename", help="Data file with domain status information"
|
||||||
help="Data file with domain status information"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
parser.add_argument("--sep", default="|", help="Delimiter character")
|
parser.add_argument("--sep", default="|", help="Delimiter character")
|
||||||
|
@ -107,7 +105,9 @@ class Command(BaseCommand):
|
||||||
action=argparse.BooleanOptionalAction,
|
action=argparse.BooleanOptionalAction,
|
||||||
)
|
)
|
||||||
|
|
||||||
def print_debug_mode_statements(self, debug_on: bool, debug_max_entries_to_parse: int):
|
def print_debug_mode_statements(
|
||||||
|
self, debug_on: bool, debug_max_entries_to_parse: int
|
||||||
|
):
|
||||||
"""Prints additional terminal statements to indicate if --debug
|
"""Prints additional terminal statements to indicate if --debug
|
||||||
or --limitParse are in use"""
|
or --limitParse are in use"""
|
||||||
if debug_on:
|
if debug_on:
|
||||||
|
@ -117,7 +117,7 @@ class Command(BaseCommand):
|
||||||
Detailed print statements activated.
|
Detailed print statements activated.
|
||||||
{termColors.ENDC}
|
{termColors.ENDC}
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
if debug_max_entries_to_parse > 0:
|
if debug_max_entries_to_parse > 0:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"""{termColors.OKCYAN}
|
f"""{termColors.OKCYAN}
|
||||||
|
@ -127,9 +127,11 @@ class Command(BaseCommand):
|
||||||
Detailed print statements activated.
|
Detailed print statements activated.
|
||||||
{termColors.ENDC}
|
{termColors.ENDC}
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_domain_user_dict(self, domain_statuses_filename: str, sep: str) -> defaultdict(str):
|
def get_domain_user_dict(
|
||||||
|
self, domain_statuses_filename: str, sep: str
|
||||||
|
) -> defaultdict[str, str]:
|
||||||
"""Creates a mapping of domain name -> status"""
|
"""Creates a mapping of domain name -> status"""
|
||||||
domain_status_dictionary = defaultdict(str)
|
domain_status_dictionary = defaultdict(str)
|
||||||
logger.info("Reading domain statuses data file %s", domain_statuses_filename)
|
logger.info("Reading domain statuses data file %s", domain_statuses_filename)
|
||||||
|
@ -141,9 +143,11 @@ class Command(BaseCommand):
|
||||||
logger.info("Loaded statuses for %d domains", len(domain_status_dictionary))
|
logger.info("Loaded statuses for %d domains", len(domain_status_dictionary))
|
||||||
return domain_status_dictionary
|
return domain_status_dictionary
|
||||||
|
|
||||||
def get_user_emails_dict(self, contacts_filename: str, sep) -> defaultdict(str):
|
def get_user_emails_dict(self,
|
||||||
|
contacts_filename: str,
|
||||||
|
sep) -> defaultdict[str, str]:
|
||||||
"""Creates mapping of userId -> emails"""
|
"""Creates mapping of userId -> emails"""
|
||||||
user_emails_dictionary = defaultdict(list)
|
user_emails_dictionary = defaultdict(str)
|
||||||
logger.info("Reading domain-contacts data file %s", contacts_filename)
|
logger.info("Reading domain-contacts data file %s", contacts_filename)
|
||||||
with open(contacts_filename, "r") as contacts_file:
|
with open(contacts_filename, "r") as contacts_file:
|
||||||
for row in csv.reader(contacts_file, delimiter=sep):
|
for row in csv.reader(contacts_file, delimiter=sep):
|
||||||
|
@ -153,7 +157,8 @@ class Command(BaseCommand):
|
||||||
logger.info("Loaded emails for %d users", len(user_emails_dictionary))
|
logger.info("Loaded emails for %d users", len(user_emails_dictionary))
|
||||||
return user_emails_dictionary
|
return user_emails_dictionary
|
||||||
|
|
||||||
def get_mapped_status(self, status_to_map: str) -> str:
|
def get_mapped_status(self,
|
||||||
|
status_to_map: str):
|
||||||
"""
|
"""
|
||||||
Given a verisign domain status, return a corresponding
|
Given a verisign domain status, return a corresponding
|
||||||
status defined for our domains.
|
status defined for our domains.
|
||||||
|
@ -169,15 +174,16 @@ class Command(BaseCommand):
|
||||||
"created": TransitionDomain.StatusChoices.READY,
|
"created": TransitionDomain.StatusChoices.READY,
|
||||||
"ok": TransitionDomain.StatusChoices.READY,
|
"ok": TransitionDomain.StatusChoices.READY,
|
||||||
}
|
}
|
||||||
return status_maps.get(status_to_map)
|
mapped_status = status_maps.get(status_to_map)
|
||||||
|
return mapped_status
|
||||||
|
|
||||||
def print_summary_duplications(
|
def print_summary_duplications(
|
||||||
self,
|
self,
|
||||||
duplicate_domain_user_combos: list[TransitionDomain],
|
duplicate_domain_user_combos: list[TransitionDomain],
|
||||||
duplicate_domains: list[TransitionDomain],
|
duplicate_domains: list[TransitionDomain],
|
||||||
users_without_email: list[str]
|
users_without_email: list[str],
|
||||||
):
|
):
|
||||||
"""Called at the end of the script execution to print out a summary of
|
"""Called at the end of the script execution to print out a summary of
|
||||||
data anomalies in the imported Verisign data. Currently, we check for:
|
data anomalies in the imported Verisign data. Currently, we check for:
|
||||||
- duplicate domains
|
- duplicate domains
|
||||||
- duplicate domain - user pairs
|
- duplicate domain - user pairs
|
||||||
|
@ -217,11 +223,10 @@ class Command(BaseCommand):
|
||||||
{termColors.ENDC}"""
|
{termColors.ENDC}"""
|
||||||
)
|
)
|
||||||
|
|
||||||
def print_summary_status_findings(self,
|
def print_summary_status_findings(
|
||||||
domains_without_status: list[str],
|
self, domains_without_status: list[str], outlier_statuses: list[str]
|
||||||
outlier_statuses: list[str]
|
):
|
||||||
):
|
"""Called at the end of the script execution to print out a summary of
|
||||||
"""Called at the end of the script execution to print out a summary of
|
|
||||||
status anomolies in the imported Verisign data. Currently, we check for:
|
status anomolies in the imported Verisign data. Currently, we check for:
|
||||||
- domains without a status
|
- domains without a status
|
||||||
- any statuses not accounted for in our status mappings (see
|
- any statuses not accounted for in our status mappings (see
|
||||||
|
@ -264,7 +269,6 @@ class Command(BaseCommand):
|
||||||
{termColors.ENDC}"""
|
{termColors.ENDC}"""
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def handle( # noqa: C901
|
def handle( # noqa: C901
|
||||||
self,
|
self,
|
||||||
domain_contacts_filename,
|
domain_contacts_filename,
|
||||||
|
@ -275,7 +279,7 @@ class Command(BaseCommand):
|
||||||
"""Parse the data files and create TransitionDomains."""
|
"""Parse the data files and create TransitionDomains."""
|
||||||
sep = options.get("sep")
|
sep = options.get("sep")
|
||||||
|
|
||||||
# If --resetTable was used, prompt user to confirm
|
# If --resetTable was used, prompt user to confirm
|
||||||
# deletion of table data
|
# deletion of table data
|
||||||
if options.get("resetTable"):
|
if options.get("resetTable"):
|
||||||
confirm_reset = query_yes_no(
|
confirm_reset = query_yes_no(
|
||||||
|
@ -356,7 +360,7 @@ class Command(BaseCommand):
|
||||||
|
|
||||||
new_entry_status = TransitionDomain.StatusChoices.READY
|
new_entry_status = TransitionDomain.StatusChoices.READY
|
||||||
new_entry_email = ""
|
new_entry_email = ""
|
||||||
new_entry_emailSent = False # set to False by default
|
new_entry_emailSent = False # set to False by default
|
||||||
|
|
||||||
if new_entry_domainName not in domain_status_dictionary:
|
if new_entry_domainName not in domain_status_dictionary:
|
||||||
# this domain has no status...default to "Create"
|
# this domain has no status...default to "Create"
|
||||||
|
@ -425,8 +429,8 @@ class Command(BaseCommand):
|
||||||
logger.info(
|
logger.info(
|
||||||
f"{termColors.OKCYAN}"
|
f"{termColors.OKCYAN}"
|
||||||
f"Updating entry: {existing_entry}"
|
f"Updating entry: {existing_entry}"
|
||||||
f"Status: {existing_entry.status} > {new_entry_status}"
|
f"Status: {existing_entry.status} > {new_entry_status}" # noqa
|
||||||
f"Email Sent: {existing_entry.email_sent} > {new_entry_emailSent}"
|
f"Email Sent: {existing_entry.email_sent} > {new_entry_emailSent}" # noqa
|
||||||
f"{termColors.ENDC}"
|
f"{termColors.ENDC}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -482,7 +486,7 @@ class Command(BaseCommand):
|
||||||
updated {total_updated_domain_entries} transition domain entries
|
updated {total_updated_domain_entries} transition domain entries
|
||||||
{termColors.ENDC}
|
{termColors.ENDC}
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
# Print a summary of findings (duplicate entries,
|
# Print a summary of findings (duplicate entries,
|
||||||
# missing data..etc.)
|
# missing data..etc.)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue