Linting + minimize logging to a degree

This commit is contained in:
zandercymatics 2023-11-08 17:37:59 -07:00
parent 7d798e0e43
commit 72d95f6fad
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
11 changed files with 507 additions and 373 deletions

View file

@ -17,6 +17,7 @@ logger = logging.getLogger(__name__)
# Example command for running this script:
# docker compose run -T app ./manage.py agency_data_extractor 20231009.agency.adhoc.dotgov.txt --dir /app/tmp --debug
class Command(BaseCommand):
help = """Loads data for domains that are in transition
(populates transition_domain model objects)."""
@ -26,40 +27,42 @@ class Command(BaseCommand):
parser.add_argument(
"agency_data_filename", help="Data file with agency information"
)
parser.add_argument(
"--dir", default="migrationdata", help="Desired directory"
)
parser.add_argument("--dir", default="migrationdata", help="Desired directory")
parser.add_argument("--sep", default="|", help="Delimiter character")
parser.add_argument("--debug", help="Prints additional debug statements to the terminal", action=argparse.BooleanOptionalAction)
parser.add_argument(
"--debug",
help="Prints additional debug statements to the terminal",
action=argparse.BooleanOptionalAction,
)
parser.add_argument("--prompt", action=argparse.BooleanOptionalAction)
@staticmethod
def extract_agencies(
agency_data_filepath: str,
sep: str,
debug: bool
) -> [str]:
"""Extracts all the agency names from the provided
def extract_agencies(agency_data_filepath: str, sep: str, debug: bool) -> [str]:
"""Extracts all the agency names from the provided
agency file (skips any duplicates) and returns those
names in an array"""
agency_names = []
logger.info(f"{TerminalColors.OKCYAN}Reading agency data file {agency_data_filepath}{TerminalColors.ENDC}")
logger.info(
f"{TerminalColors.OKCYAN}Reading agency data file {agency_data_filepath}{TerminalColors.ENDC}"
)
with open(agency_data_filepath, "r") as agency_data_filepath: # noqa
for row in csv.reader(agency_data_filepath, delimiter=sep):
agency_name = row[1]
TerminalHelper.print_conditional(debug, f"Checking: {agency_name}")
if agency_name not in agency_names:
agency_names.append(agency_name)
logger.info(f"{TerminalColors.OKCYAN}Checked {len(agency_names)} agencies{TerminalColors.ENDC}")
logger.info(
f"{TerminalColors.OKCYAN}Checked {len(agency_names)} agencies{TerminalColors.ENDC}"
)
return agency_names
@staticmethod
def compare_agency_lists(provided_agencies: [str],
existing_agencies: [str],
debug: bool):
def compare_agency_lists(
provided_agencies: [str], existing_agencies: [str], debug: bool
):
"""
Compares new_agencies with existing_agencies and
Compares new_agencies with existing_agencies and
provides the equivalent of an outer-join on the two
(printed to the terminal)
"""
@ -69,27 +72,37 @@ class Command(BaseCommand):
for agency in provided_agencies:
if agency not in existing_agencies and agency not in new_agencies:
new_agencies.append(agency)
TerminalHelper.print_conditional(debug, f"{TerminalColors.YELLOW}Found new agency: {agency}{TerminalColors.ENDC}")
TerminalHelper.print_conditional(
debug,
f"{TerminalColors.YELLOW}Found new agency: {agency}{TerminalColors.ENDC}",
)
possibly_unused_agencies = []
# 2 - Get all new agencies that we don't already have (We might want to ADD these to our list)
for agency in existing_agencies:
if agency not in provided_agencies and agency not in possibly_unused_agencies:
if (
agency not in provided_agencies
and agency not in possibly_unused_agencies
):
possibly_unused_agencies.append(agency)
TerminalHelper.print_conditional(debug, f"{TerminalColors.YELLOW}Possibly unused agency detected: {agency}{TerminalColors.ENDC}")
TerminalHelper.print_conditional(
debug,
f"{TerminalColors.YELLOW}Possibly unused agency detected: {agency}{TerminalColors.ENDC}",
)
matched_agencies = []
for agency in provided_agencies:
if agency in existing_agencies:
matched_agencies.append(agency)
TerminalHelper.print_conditional(debug, f"{TerminalColors.YELLOW}Matched agencies: {agency}{TerminalColors.ENDC}")
TerminalHelper.print_conditional(
debug,
f"{TerminalColors.YELLOW}Matched agencies: {agency}{TerminalColors.ENDC}",
)
# Print the summary of findings
# 1 - Print the list of agencies in the NEW list, which we do not already have
# 2 - Print the list of agencies that we currently have, which are NOT in the new list (these might be eligible for removal?) TODO: would we ever want to remove existing agencies?
new_agencies_as_string = "{}".format(
",\n ".join(map(str, new_agencies))
)
new_agencies_as_string = "{}".format(",\n ".join(map(str, new_agencies)))
possibly_unused_agencies_as_string = "{}".format(
",\n ".join(map(str, possibly_unused_agencies))
)
@ -97,7 +110,8 @@ class Command(BaseCommand):
",\n ".join(map(str, matched_agencies))
)
logger.info(f"""
logger.info(
f"""
{TerminalColors.OKGREEN}
======================== SUMMARY OF FINDINGS ============================
{len(provided_agencies)} AGENCIES WERE PROVIDED in the agency file.
@ -117,13 +131,12 @@ class Command(BaseCommand):
These agencies are in our system, but not in the provided agency file:
{TerminalColors.YELLOW}{possibly_unused_agencies_as_string}
{TerminalColors.ENDC}
""")
"""
)
@staticmethod
def print_agency_list(agencies, filename):
full_agency_list_as_string = "{}".format(
",\n".join(map(str, agencies))
)
full_agency_list_as_string = "{}".format(",\n".join(map(str, agencies)))
logger.info(
f"\n{TerminalColors.YELLOW}"
f"\n{full_agency_list_as_string}"
@ -146,19 +159,22 @@ class Command(BaseCommand):
prompt = options.get("prompt")
dir = options.get("dir")
agency_data_file = dir+"/"+agency_data_filename
agency_data_file = dir + "/" + agency_data_filename
new_agencies = self.extract_agencies(agency_data_file, sep, debug)
hard_coded_agencies = DomainApplication.AGENCIES
transition_domain_agencies = TransitionDomain.objects.all().values_list('federal_agency', flat=True).distinct()
transition_domain_agencies = (
TransitionDomain.objects.all()
.values_list("federal_agency", flat=True)
.distinct()
)
print(transition_domain_agencies)
merged_agencies = new_agencies
for agency in hard_coded_agencies:
if agency not in merged_agencies:
merged_agencies.append(agency)
merged_transition_agencies = new_agencies
for agency in transition_domain_agencies:
if agency not in merged_transition_agencies:
@ -168,73 +184,90 @@ class Command(BaseCommand):
# OPTION to compare the agency file to our hard-coded list
if prompt:
prompt_successful = TerminalHelper.query_yes_no(f"\n\n{TerminalColors.FAIL}Check {agency_data_filename} against our (hard-coded) dropdown list of agencies?{TerminalColors.ENDC}")
prompt_successful = TerminalHelper.query_yes_no(
f"\n\n{TerminalColors.FAIL}Check {agency_data_filename} against our (hard-coded) dropdown list of agencies?{TerminalColors.ENDC}"
)
if prompt_successful or not prompt:
self.compare_agency_lists(new_agencies, hard_coded_agencies, debug)
# OPTION to compare the agency file to Transition Domains
if prompt:
prompt_successful = TerminalHelper.query_yes_no(f"\n\n{TerminalColors.FAIL}Check {agency_data_filename} against Transition Domain contents?{TerminalColors.ENDC}")
prompt_successful = TerminalHelper.query_yes_no(
f"\n\n{TerminalColors.FAIL}Check {agency_data_filename} against Transition Domain contents?{TerminalColors.ENDC}"
)
if prompt_successful or not prompt:
self.compare_agency_lists(new_agencies, transition_domain_agencies, debug)
# OPTION to print out the full list of agencies from the agency file
if prompt:
prompt_successful = TerminalHelper.query_yes_no(f"\n\n{TerminalColors.FAIL}Would you like to print the full list of agencies from the given agency file?{TerminalColors.ENDC}")
prompt_successful = TerminalHelper.query_yes_no(
f"\n\n{TerminalColors.FAIL}Would you like to print the full list of agencies from the given agency file?{TerminalColors.ENDC}"
)
if prompt_successful or not prompt:
logger.info(
f"\n{TerminalColors.OKGREEN}"
f"\n======================== FULL LIST OF IMPORTED AGENCIES ============================"
f"\nThese are all the agencies provided by the given agency file."
f"\n\n{len(new_agencies)} TOTAL\n\n"
f"\n{TerminalColors.OKGREEN}"
f"\n======================== FULL LIST OF IMPORTED AGENCIES ============================"
f"\nThese are all the agencies provided by the given agency file."
f"\n\n{len(new_agencies)} TOTAL\n\n"
)
self.print_agency_list(new_agencies, "Imported_Agencies")
# OPTION to print out the full list of agencies from the agency file
if prompt:
prompt_successful = TerminalHelper.query_yes_no(f"{TerminalColors.FAIL}Would you like to print the full list of agencies from the dropdown?{TerminalColors.ENDC}")
prompt_successful = TerminalHelper.query_yes_no(
f"{TerminalColors.FAIL}Would you like to print the full list of agencies from the dropdown?{TerminalColors.ENDC}"
)
if prompt_successful or not prompt:
logger.info(
f"\n{TerminalColors.OKGREEN}"
f"\n======================== FULL LIST OF AGENCIES IN DROPDOWN ============================"
f"\nThese are all the agencies hard-coded in our system for the dropdown list."
f"\n\n{len(hard_coded_agencies)} TOTAL\n\n"
f"\n{TerminalColors.OKGREEN}"
f"\n======================== FULL LIST OF AGENCIES IN DROPDOWN ============================"
f"\nThese are all the agencies hard-coded in our system for the dropdown list."
f"\n\n{len(hard_coded_agencies)} TOTAL\n\n"
)
self.print_agency_list(hard_coded_agencies, "Dropdown_Agencies")
# OPTION to print out the full list of agencies from the agency file
if prompt:
prompt_successful = TerminalHelper.query_yes_no(f"{TerminalColors.FAIL}Would you like to print the full list of agencies from the dropdown?{TerminalColors.ENDC}")
if prompt_successful or not prompt:
logger.info(
f"\n{TerminalColors.OKGREEN}"
f"\n======================== FULL LIST OF AGENCIES IN TRANSITION DOMAIN ============================"
f"\nThese are all the agencies in the Transition Domains table."
f"\n\n{len(transition_domain_agencies)} TOTAL\n\n"
)
self.print_agency_list(transition_domain_agencies, "Transition_Domain_Agencies")
# OPTION to print out the full list of agencies from the agency file
if prompt:
prompt_successful = TerminalHelper.query_yes_no(f"{TerminalColors.FAIL}Would you like to print the MERGED list of agencies (dropdown + agency file)?{TerminalColors.ENDC}")
prompt_successful = TerminalHelper.query_yes_no(
f"{TerminalColors.FAIL}Would you like to print the full list of agencies from the dropdown?{TerminalColors.ENDC}"
)
if prompt_successful or not prompt:
logger.info(
f"\n{TerminalColors.OKGREEN}"
f"\n======================== MERGED LISTS (dropdown + agency file) ============================"
f"\nThese are all the agencies our dropdown plus all the agencies in the agency file."
f"\n\n{len(merged_agencies)} TOTAL\n\n"
f"\n{TerminalColors.OKGREEN}"
f"\n======================== FULL LIST OF AGENCIES IN TRANSITION DOMAIN ============================"
f"\nThese are all the agencies in the Transition Domains table."
f"\n\n{len(transition_domain_agencies)} TOTAL\n\n"
)
self.print_agency_list(
transition_domain_agencies, "Transition_Domain_Agencies"
)
# OPTION to print out the full list of agencies from the agency file
if prompt:
prompt_successful = TerminalHelper.query_yes_no(
f"{TerminalColors.FAIL}Would you like to print the MERGED list of agencies (dropdown + agency file)?{TerminalColors.ENDC}"
)
if prompt_successful or not prompt:
logger.info(
f"\n{TerminalColors.OKGREEN}"
f"\n======================== MERGED LISTS (dropdown + agency file) ============================"
f"\nThese are all the agencies our dropdown plus all the agencies in the agency file."
f"\n\n{len(merged_agencies)} TOTAL\n\n"
)
self.print_agency_list(merged_agencies, "Merged_Dropdown_Agency_List")
# OPTION to print out the full list of agencies from the agency file
# OPTION to print out the full list of agencies from the agency file
if prompt:
prompt_successful = TerminalHelper.query_yes_no(f"{TerminalColors.FAIL}Would you like to print the MERGED list of agencies (dropdown + agency file)?{TerminalColors.ENDC}")
prompt_successful = TerminalHelper.query_yes_no(
f"{TerminalColors.FAIL}Would you like to print the MERGED list of agencies (dropdown + agency file)?{TerminalColors.ENDC}"
)
if prompt_successful or not prompt:
logger.info(
f"\n{TerminalColors.OKGREEN}"
f"\n======================== MERGED LISTS (transition domain + agency file) ============================"
f"\nThese are all the agencies our transition domains table plus all the agencies in the agency file."
f"\n\n{len(merged_agencies)} TOTAL\n\n"
f"\n{TerminalColors.OKGREEN}"
f"\n======================== MERGED LISTS (transition domain + agency file) ============================"
f"\nThese are all the agencies our transition domains table plus all the agencies in the agency file."
f"\n\n{len(merged_agencies)} TOTAL\n\n"
)
self.print_agency_list(
merged_transition_agencies, "Merged_Transition_Domain_Agency_List"
)
self.print_agency_list(merged_transition_agencies, "Merged_Transition_Domain_Agency_List")

View file

@ -64,9 +64,9 @@ class Command(BaseCommand):
)
parser.add_argument(
"--infer_filenames",
"--infer_filenames",
action=argparse.BooleanOptionalAction,
help="Determines if we should infer filenames or not. Recommended to be enabled only in a development or testing setting."
help="Determines if we should infer filenames or not. Recommended to be enabled only in a development or testing setting.",
)
parser.add_argument(
@ -74,7 +74,7 @@ class Command(BaseCommand):
)
parser.add_argument(
"--domain_contacts_filename",
help="Data file with domain contact information"
help="Data file with domain contact information",
)
parser.add_argument(
"--contacts_filename",
@ -82,7 +82,7 @@ class Command(BaseCommand):
)
parser.add_argument(
"--domain_statuses_filename",
help="Data file with domain status information"
help="Data file with domain status information",
)
parser.add_argument(
"--agency_adhoc_filename",
@ -347,10 +347,12 @@ class Command(BaseCommand):
# If it does, update the options
options[key] = value
except Exception as err:
logger.error(f"""{TerminalColors.FAIL}There was an error loading the JSON responsible
logger.error(
f"""{TerminalColors.FAIL}There was an error loading the JSON responsible
for providing filepaths.
{TerminalColors.ENDC}
""")
"""
)
raise err
sep = args.sep
@ -369,33 +371,36 @@ class Command(BaseCommand):
) # set to 0 to parse all entries
## Variables for Additional TransitionDomain Information ##
# Main script filenames - these do not have defaults
domain_contacts_filename = None
try:
domain_contacts_filename = directory + options.get("domain_contacts_filename")
domain_contacts_filename = directory + options.get(
"domain_contacts_filename"
)
except TypeError as err:
logger.error(
f"Invalid filename of '{args.domain_contacts_filename}'"
f"Invalid filename of '{args.domain_contacts_filename}'"
" was provided for domain_contacts_filename"
)
contacts_filename = None
try:
contacts_filename = directory + options.get("contacts_filename")
except TypeError as err:
logger.error(
f"Invalid filename of '{args.contacts_filename}'"
f"Invalid filename of '{args.contacts_filename}'"
" was provided for contacts_filename"
)
domain_statuses_filename = None
try:
domain_statuses_filename = directory + options.get("domain_statuses_filename")
domain_statuses_filename = directory + options.get(
"domain_statuses_filename"
)
except TypeError as err:
logger.error(
f"Invalid filename of '{args.domain_statuses_filename}'"
f"Invalid filename of '{args.domain_statuses_filename}'"
" was provided for domain_statuses_filename"
)
@ -468,7 +473,10 @@ class Command(BaseCommand):
new_entry_email = ""
new_entry_emailSent = False # set to False by default
TerminalHelper.print_conditional(True, f"Processing item {total_rows_parsed}: {new_entry_domain_name}")
TerminalHelper.print_conditional(
True,
f"Processing item {total_rows_parsed}: {new_entry_domain_name}",
)
# PART 1: Get the status
if new_entry_domain_name not in domain_status_dictionary:
@ -608,7 +616,6 @@ class Command(BaseCommand):
)
self.print_summary_status_findings(domains_without_status, outlier_statuses)
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============

View file

@ -28,8 +28,7 @@ logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = """ """ # TODO: update this!
help = """ """ # TODO: update this!
# ======================================================
# ================== ARGUMENTS ===================
@ -45,7 +44,7 @@ class Command(BaseCommand):
The location of the files used for load_transition_domain migration script
EXAMPLE USAGE:
> --migrationDirectory /app/tmp
--migrationJSON
The name of the JSON file used for load_transition_domain migration script
EXAMPLE USAGE:
@ -129,7 +128,6 @@ class Command(BaseCommand):
action=argparse.BooleanOptionalAction,
)
# ======================================================
# =============== DATA ANALYSIS ==================
# ======================================================
@ -252,7 +250,6 @@ class Command(BaseCommand):
"""
)
# ======================================================
# ================= MIGRATIONS ===================
# ======================================================
@ -266,17 +263,13 @@ class Command(BaseCommand):
prompts_enabled: bool,
debug_max_entries_to_parse: int,
):
if file_directory and file_directory[-1] != "/":
file_directory += "/"
json_filepath = migration_json_filename
"""Runs the load_transition_domain script"""
# Create the command string
command_script = "load_transition_domain"
command_string = (
f"./manage.py {command_script} "
f"{json_filepath} "
)
command_string = f"./manage.py {command_script} " f"{json_filepath} "
if sep is not None and sep != "|":
command_string += f"--sep {sep} "
if reset_table:
@ -306,7 +299,7 @@ class Command(BaseCommand):
resetTable=reset_table,
debug=debug_on,
limitParse=debug_max_entries_to_parse,
directory=file_directory
directory=file_directory,
)
def run_transfer_script(self, debug_on: bool, prompts_enabled: bool):
@ -326,7 +319,7 @@ class Command(BaseCommand):
)
# TODO: make this somehow run inside TerminalHelper prompt
if proceed or not prompts_enabled:
call_command(command_script)
call_command(command_script)
def run_send_invites_script(self, debug_on: bool, prompts_enabled: bool):
"""Runs the send_domain_invitations script"""

View file

@ -39,7 +39,6 @@ class Command(BaseCommand):
help="Sets max number of entries to load, set to 0 to load all entries",
)
# ======================================================
# ===================== PRINTING ======================
# ======================================================
@ -66,13 +65,14 @@ class Command(BaseCommand):
{TerminalColors.ENDC}
""",
)
def parse_limit_reached(self,
debug_max_entries_to_parse: bool,
total_rows_parsed: int
) -> bool:
if (debug_max_entries_to_parse > 0
and total_rows_parsed >= debug_max_entries_to_parse):
def parse_limit_reached(
self, debug_max_entries_to_parse: bool, total_rows_parsed: int
) -> bool:
if (
debug_max_entries_to_parse > 0
and total_rows_parsed >= debug_max_entries_to_parse
):
logger.info(
f"""{TerminalColors.YELLOW}
----PARSE LIMIT REACHED. HALTING PARSER.----
@ -81,7 +81,7 @@ class Command(BaseCommand):
)
return True
return False
def print_summary_of_findings(
self,
domains_to_create,
@ -156,16 +156,15 @@ class Command(BaseCommand):
""",
)
# ======================================================
# =================== DOMAIN =====================
# ======================================================
def update_or_create_domain(self,
transition_domain: TransitionDomain,
debug_on: bool) -> (Domain, bool):
""" Given a transition domain, either finds & updates an existing
def update_or_create_domain(
self, transition_domain: TransitionDomain, debug_on: bool
) -> (Domain, bool):
"""Given a transition domain, either finds & updates an existing
corresponding domain, or creates a new corresponding domain in
the Domain table.
the Domain table.
Returns the corresponding Domain object and a boolean
that is TRUE if that Domain was newly created.
@ -176,7 +175,7 @@ class Command(BaseCommand):
transition_domain_status = transition_domain.status
transition_domain_creation_date = transition_domain.epp_creation_date
transition_domain_expiration_date = transition_domain.epp_expiration_date
domain_exists = Domain.objects.filter(name=transition_domain_name).exists()
if domain_exists:
try:
@ -197,7 +196,7 @@ class Command(BaseCommand):
transition_domain, target_domain, debug_on
)
# TODO: not all domains need to be updated (the information is the same). Need to bubble this up to the final report.
# update dates (creation and expiration)
if transition_domain_creation_date is not None:
# TODO: added this because I ran into a situation where the created_at date was null (violated a key constraint). How do we want to handle this case?
@ -257,7 +256,6 @@ class Command(BaseCommand):
expiration_date=transition_domain_expiration_date,
)
return (target_domain, True)
def update_domain_status(
self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool
@ -289,7 +287,6 @@ class Command(BaseCommand):
return True
return False
# ======================================================
# ================ DOMAIN INVITATION ==================
# ======================================================
@ -336,23 +333,27 @@ class Command(BaseCommand):
# ======================================================
# ================ DOMAIN INFORMATION =================
# ======================================================
def update_domain_information(self, current: DomainInformation, target: DomainInformation, debug_on: bool) -> bool:
def update_domain_information(
self, current: DomainInformation, target: DomainInformation, debug_on: bool
) -> bool:
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.OKCYAN}"
f"Updating: {current}"
f"{TerminalColors.ENDC}"), # noqa
(
f"{TerminalColors.OKCYAN}"
f"Updating: {current}"
f"{TerminalColors.ENDC}"
), # noqa
)
updated = False
fields_to_update = [
'organization_type',
'federal_type',
'federal_agency',
"organization_name"
]
"organization_type",
"federal_type",
"federal_agency",
"organization_name",
]
defaults = {field: getattr(target, field) for field in fields_to_update}
if current != target:
current = target
@ -360,14 +361,19 @@ class Command(BaseCommand):
updated = True
return updated
def try_add_domain_information(self):
pass
def create_new_domain_info(self,
transition_domain: TransitionDomain,
domain: Domain) -> DomainInformation:
def create_new_domain_info(
self,
transition_domain: TransitionDomain,
domain: Domain,
agency_choices,
fed_choices,
org_choices,
debug_on,
) -> DomainInformation:
org_type = transition_domain.organization_type
fed_type = transition_domain.federal_type
fed_agency = transition_domain.federal_agency
@ -387,77 +393,95 @@ class Command(BaseCommand):
org_type = ("city", "City")
case "Independent Intrastate":
org_type = ("special_district", "Special district")
valid_org_type = org_type in [(name, value) for name, value in DomainApplication.OrganizationChoices.choices]
valid_fed_type = fed_type in [value for name, value in DomainApplication.BranchChoices.choices]
valid_fed_agency = fed_agency in DomainApplication.AGENCIES
valid_org_type = org_type in org_choices
valid_fed_type = fed_type in fed_choices
valid_fed_agency = fed_agency in agency_choices
default_creator, _ = User.objects.get_or_create(username="System")
new_domain_info_data = {
'domain': domain,
'organization_name': transition_domain.organization_name,
"domain": domain,
"organization_name": transition_domain.organization_name,
"creator": default_creator,
}
if valid_org_type:
new_domain_info_data['organization_type'] = org_type[0]
else:
new_domain_info_data["organization_type"] = org_type[0]
elif debug_on:
logger.debug(f"No org type found on {domain.name}")
if valid_fed_type:
new_domain_info_data['federal_type'] = fed_type.lower()
pass
else:
new_domain_info_data["federal_type"] = fed_type.lower()
elif debug_on:
logger.debug(f"No federal type found on {domain.name}")
if valid_fed_agency:
new_domain_info_data['federal_agency'] = fed_agency
else:
new_domain_info_data["federal_agency"] = fed_agency
elif debug_on:
logger.debug(f"No federal agency found on {domain.name}")
new_domain_info = DomainInformation(**new_domain_info_data)
# DEBUG:
# DEBUG:
TerminalHelper.print_conditional(
True,
(f"{TerminalColors.MAGENTA}"
f"Created Domain Information template for: {new_domain_info}"
f"{TerminalColors.ENDC}"), # noqa
(
f"{TerminalColors.MAGENTA}"
f"Created Domain Information template for: {new_domain_info}"
f"{TerminalColors.ENDC}"
), # noqa
)
return new_domain_info
def update_or_create_domain_information(self,
transition_domain: TransitionDomain,
debug_on: bool) -> (DomainInformation, bool):
def update_or_create_domain_information(
self,
transition_domain: TransitionDomain,
agency_choices,
fed_choices,
org_choices,
debug_on: bool,
) -> (DomainInformation, bool):
transition_domain_name = transition_domain.domain_name
# Get associated domain
domain_data = Domain.objects.filter(name=transition_domain.domain_name)
if not domain_data.exists():
logger.warn(
f"{TerminalColors.FAIL}"
f"WARNING: No Domain exists for:"
f"{transition_domain_name}"
f"{TerminalColors.ENDC}\n"
)
f"{TerminalColors.FAIL}"
f"WARNING: No Domain exists for:"
f"{transition_domain_name}"
f"{TerminalColors.ENDC}\n"
)
return (None, None, False)
domain = domain_data.get()
template_domain_information = self.create_new_domain_info(transition_domain, domain)
template_domain_information = self.create_new_domain_info(
transition_domain,
domain,
agency_choices,
fed_choices,
org_choices,
debug_on,
)
target_domain_information = None
domain_information_exists = DomainInformation.objects.filter(domain__name=transition_domain_name).exists()
domain_information_exists = DomainInformation.objects.filter(
domain__name=transition_domain_name
).exists()
if domain_information_exists:
try:
# get the existing domain information object
target_domain_information = DomainInformation.objects.get(domain__name=transition_domain_name)
target_domain_information = DomainInformation.objects.get(
domain__name=transition_domain_name
)
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.FAIL}"
f"Found existing entry in Domain Information table for:"
f"{transition_domain_name}"
f"{TerminalColors.ENDC}"), # noqa
(
f"{TerminalColors.FAIL}"
f"Found existing entry in Domain Information table for:"
f"{transition_domain_name}"
f"{TerminalColors.ENDC}"
), # noqa
)
# for existing entry, update the status to
@ -466,7 +490,7 @@ class Command(BaseCommand):
target_domain_information, template_domain_information, debug_on
)
# TODO: not all domains need to be updated (the information is the same). Need to bubble this up to the final report.
return (target_domain_information, domain, False)
except DomainInformation.MultipleObjectsReturned:
# This should never happen (just like with the Domain Table).
@ -493,15 +517,15 @@ class Command(BaseCommand):
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.OKCYAN}"
f"Adding domain information for: "
f"{transition_domain_name}"
f"{TerminalColors.ENDC}"),
(
f"{TerminalColors.OKCYAN}"
f"Adding domain information for: "
f"{transition_domain_name}"
f"{TerminalColors.ENDC}"
),
)
return (target_domain_information, domain, True)
# ======================================================
# ===================== HANDLE ========================
# ======================================================
@ -536,7 +560,6 @@ class Command(BaseCommand):
# domain invitations to ADD
domain_invitations_to_create = []
# if we are limiting our parse (for testing purposes, keep
# track of total rows parsed)
total_rows_parsed = 0
@ -566,7 +589,7 @@ class Command(BaseCommand):
TerminalHelper.print_conditional(
debug_on,
f"{TerminalColors.OKCYAN}"
"Processing Transition Domain: "
"Processing Transition Domain: "
f"{transition_domain_name}, {transition_domain_status}, {transition_domain_email}"
f", {transition_domain_creation_date}, {transition_domain_expiration_date}"
f"{TerminalColors.ENDC}", # noqa
@ -574,7 +597,9 @@ class Command(BaseCommand):
# ======================================================
# ====================== DOMAIN =======================
target_domain, was_created = self.update_or_create_domain(transition_domain, debug_on)
target_domain, was_created = self.update_or_create_domain(
transition_domain, debug_on
)
debug_string = ""
if target_domain is None:
@ -603,12 +628,12 @@ class Command(BaseCommand):
# ---------------- UPDATED ----------------
updated_domain_entries.append(transition_domain.domain_name)
debug_string = f"updated domain: {target_domain}"
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.OKCYAN} {debug_string} {TerminalColors.ENDC}"),
)
)
# ======================================================
# ================ DOMAIN INVITATIONS ==================
@ -633,26 +658,36 @@ class Command(BaseCommand):
if self.parse_limit_reached(debug_max_entries_to_parse, total_rows_parsed):
break
# First, save all Domain objects to the database
Domain.objects.bulk_create(domains_to_create)
#DomainInvitation.objects.bulk_create(domain_invitations_to_create)
# DomainInvitation.objects.bulk_create(domain_invitations_to_create)
# TODO: this is to resolve an error where bulk_create
# doesn't save to database in a way that invitation objects can
# utilize.
# utilize.
# Then, create DomainInvitation objects
for invitation in domain_invitations_to_create:
logger.info(f"Pairing invite to its domain...{invitation}")
if debug_on:
logger.info(f"Pairing invite to its domain...{invitation}")
existing_domain = Domain.objects.filter(name=invitation.domain.name)
# Make sure the related Domain object is saved
if existing_domain.exists():
invitation.domain = existing_domain.get()
else:
# Raise an err for now
raise Exception(f"Domain {existing_domain} wants to be added but doesn't exist in the DB")
raise Exception(
f"Domain {existing_domain} wants to be added but doesn't exist in the DB"
)
invitation.save()
valid_org_choices = [
(name, value)
for name, value in DomainApplication.OrganizationChoices.choices
]
valid_fed_choices = [
value for name, value in DomainApplication.BranchChoices.choices
]
valid_agency_choices = DomainApplication.AGENCIES
# ======================================================
# ================= DOMAIN INFORMATION =================
logger.info(
@ -661,31 +696,54 @@ class Command(BaseCommand):
{TerminalColors.ENDC}"""
)
for transition_domain in TransitionDomain.objects.all():
target_domain_information, associated_domain, was_created = self.update_or_create_domain_information(transition_domain, debug_on)
(
target_domain_information,
associated_domain,
was_created,
) = self.update_or_create_domain_information(
transition_domain,
valid_agency_choices,
valid_fed_choices,
valid_org_choices,
debug_on,
)
debug_string = ""
if target_domain_information is None:
# ---------------- SKIPPED ----------------
skipped_domain_information_entries.append(target_domain_information)
debug_string = f"skipped domain information: {target_domain_information}"
debug_string = (
f"skipped domain information: {target_domain_information}"
)
elif was_created:
# DEBUG:
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.OKCYAN}"
f"Checking duplicates for: {target_domain_information}"
f"{TerminalColors.ENDC}"), # noqa
(
f"{TerminalColors.OKCYAN}"
f"Checking duplicates for: {target_domain_information}"
f"{TerminalColors.ENDC}"
), # noqa
)
# ---------------- DUPLICATE ----------------
# The unique key constraint does not allow multiple domain
# information objects to share the same domain
existing_domain_information_in_to_create = next(
(x for x in domain_information_to_create if x.domain.name == target_domain_information.domain.name),
(
x
for x in domain_information_to_create
if x.domain.name == target_domain_information.domain.name
),
None,
)
# TODO: this is redundant. Currently debugging....running into unique key constraint error....
existing_domain_info = DomainInformation.objects.filter(domain__name=target_domain_information.domain.name).exists()
if existing_domain_information_in_to_create is not None or existing_domain_info:
existing_domain_info = DomainInformation.objects.filter(
domain__name=target_domain_information.domain.name
).exists()
if (
existing_domain_information_in_to_create is not None
or existing_domain_info
):
debug_string = f"""{TerminalColors.YELLOW}
Duplicate Detected: {existing_domain_information_in_to_create}.
Cannot add duplicate Domain Information object
@ -693,19 +751,23 @@ class Command(BaseCommand):
else:
# ---------------- CREATED ----------------
domain_information_to_create.append(target_domain_information)
debug_string = f"created domain information: {target_domain_information}"
debug_string = (
f"created domain information: {target_domain_information}"
)
elif not was_created:
# ---------------- UPDATED ----------------
updated_domain_information.append(target_domain_information)
debug_string = f"updated domain information: {target_domain_information}"
debug_string = (
f"updated domain information: {target_domain_information}"
)
else:
debug_string = f"domain information already exists and matches incoming data (NO CHANGES MADE): {target_domain_information}"
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.OKCYAN}{debug_string}{TerminalColors.ENDC}"),
)
)
# ------------------ Parse limit reached? ------------------
# Check parse limit and exit loop if parse limit has been reached
@ -713,11 +775,13 @@ class Command(BaseCommand):
break
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.YELLOW}"
f"Trying to add: {domain_information_to_create}"
f"{TerminalColors.ENDC}"),
)
debug_on,
(
f"{TerminalColors.YELLOW}"
f"Trying to add: {domain_information_to_create}"
f"{TerminalColors.ENDC}"
),
)
DomainInformation.objects.bulk_create(domain_information_to_create)
self.print_summary_of_findings(

View file

@ -15,69 +15,69 @@ from typing import List, Optional
class AgencyAdhoc:
"""Defines the structure given in the AGENCY_ADHOC file"""
agencyid: Optional[int] = field(default=None, repr=True)
agencyname: Optional[str] = field(default=None, repr=True)
active: Optional[str] = field(default=None, repr=True)
isfederal: Optional[str] = field(default=None, repr=True)
agencyid: Optional[int] = field(default=None, repr=True)
agencyname: Optional[str] = field(default=None, repr=True)
active: Optional[str] = field(default=None, repr=True)
isfederal: Optional[str] = field(default=None, repr=True)
@dataclass
class DomainAdditionalData:
"""Defines the structure given in the DOMAIN_ADDITIONAL file"""
domainname: Optional[str] = field(default=None, repr=True)
domaintypeid: Optional[int] = field(default=None, repr=True)
authorityid: Optional[int] = field(default=None, repr=True)
orgid: Optional[int] = field(default=None, repr=True)
securitycontactemail: Optional[str] = field(default=None, repr=True)
dnsseckeymonitor: Optional[str] = field(default=None, repr=True)
domainpurpose: Optional[str] = field(default=None, repr=True)
domainname: Optional[str] = field(default=None, repr=True)
domaintypeid: Optional[int] = field(default=None, repr=True)
authorityid: Optional[int] = field(default=None, repr=True)
orgid: Optional[int] = field(default=None, repr=True)
securitycontactemail: Optional[str] = field(default=None, repr=True)
dnsseckeymonitor: Optional[str] = field(default=None, repr=True)
domainpurpose: Optional[str] = field(default=None, repr=True)
@dataclass
class DomainTypeAdhoc:
"""Defines the structure given in the DOMAIN_ADHOC file"""
domaintypeid: Optional[int] = field(default=None, repr=True)
domaintype: Optional[str] = field(default=None, repr=True)
code: Optional[str] = field(default=None, repr=True)
active: Optional[str] = field(default=None, repr=True)
domaintypeid: Optional[int] = field(default=None, repr=True)
domaintype: Optional[str] = field(default=None, repr=True)
code: Optional[str] = field(default=None, repr=True)
active: Optional[str] = field(default=None, repr=True)
@dataclass
class OrganizationAdhoc:
"""Defines the structure given in the ORGANIZATION_ADHOC file"""
orgid: Optional[int] = field(default=None, repr=True)
orgname: Optional[str] = field(default=None, repr=True)
orgstreet: Optional[str] = field(default=None, repr=True)
orgcity: Optional[str] = field(default=None, repr=True)
orgstate: Optional[str] = field(default=None, repr=True)
orgzip: Optional[str] = field(default=None, repr=True)
orgcountrycode: Optional[str] = field(default=None, repr=True)
orgid: Optional[int] = field(default=None, repr=True)
orgname: Optional[str] = field(default=None, repr=True)
orgstreet: Optional[str] = field(default=None, repr=True)
orgcity: Optional[str] = field(default=None, repr=True)
orgstate: Optional[str] = field(default=None, repr=True)
orgzip: Optional[str] = field(default=None, repr=True)
orgcountrycode: Optional[str] = field(default=None, repr=True)
@dataclass
class AuthorityAdhoc:
"""Defines the structure given in the AUTHORITY_ADHOC file"""
authorityid: Optional[int] = field(default=None, repr=True)
firstname: Optional[str] = field(default=None, repr=True)
middlename: Optional[str] = field(default=None, repr=True)
lastname: Optional[str] = field(default=None, repr=True)
email: Optional[str] = field(default=None, repr=True)
phonenumber: Optional[str] = field(default=None, repr=True)
agencyid: Optional[int] = field(default=None, repr=True)
addlinfo: Optional[List[str]] = field(default=None, repr=True)
authorityid: Optional[int] = field(default=None, repr=True)
firstname: Optional[str] = field(default=None, repr=True)
middlename: Optional[str] = field(default=None, repr=True)
lastname: Optional[str] = field(default=None, repr=True)
email: Optional[str] = field(default=None, repr=True)
phonenumber: Optional[str] = field(default=None, repr=True)
agencyid: Optional[int] = field(default=None, repr=True)
addlinfo: Optional[List[str]] = field(default=None, repr=True)
@dataclass
class DomainEscrow:
"""Defines the structure given in the DOMAIN_ESCROW file"""
domainname: Optional[str] = field(default=None, repr=True)
creationdate: Optional[date] = field(default=None, repr=True)
expirationdate: Optional[date] = field(default=None, repr=True)
domainname: Optional[str] = field(default=None, repr=True)
creationdate: Optional[date] = field(default=None, repr=True)
expirationdate: Optional[date] = field(default=None, repr=True)
class EnumFilenames(Enum):
@ -89,27 +89,12 @@ class EnumFilenames(Enum):
# We are sourcing data from many different locations, so its better to track this
# as an Enum rather than multiple spread out variables.
# We store the "type" as [0], and we store the "default_filepath" as [1].
AGENCY_ADHOC = (
"agency_adhoc",
"agency.adhoc.dotgov.txt"
)
AGENCY_ADHOC = ("agency_adhoc", "agency.adhoc.dotgov.txt")
DOMAIN_ADDITIONAL = (
"domain_additional",
"domainadditionaldatalink.adhoc.dotgov.txt",
)
DOMAIN_ESCROW = (
"domain_escrow",
"escrow_domains.daily.dotgov.GOV.txt"
)
DOMAIN_ADHOC = (
"domain_adhoc",
"domaintypes.adhoc.dotgov.txt"
)
ORGANIZATION_ADHOC = (
"organization_adhoc",
"organization.adhoc.dotgov.txt"
)
AUTHORITY_ADHOC = (
"authority_adhoc",
"authority.adhoc.dotgov.txt"
)
DOMAIN_ESCROW = ("domain_escrow", "escrow_domains.daily.dotgov.GOV.txt")
DOMAIN_ADHOC = ("domain_adhoc", "domaintypes.adhoc.dotgov.txt")
ORGANIZATION_ADHOC = ("organization_adhoc", "organization.adhoc.dotgov.txt")
AUTHORITY_ADHOC = ("authority_adhoc", "authority.adhoc.dotgov.txt")

View file

@ -69,10 +69,15 @@ class FileTransitionLog:
log = self.LogItem(file_type, code, message, domain_name)
dict_name = (file_type, domain_name)
self._add_to_log_list(dict_name, log)
def create_log_item(
self, file_type, code, message, domain_name=None, add_to_list=True, minimal_logging=True
self,
file_type,
code,
message,
domain_name=None,
add_to_list=True,
minimal_logging=True,
):
"""Creates and returns an LogItem object.
@ -81,10 +86,10 @@ class FileTransitionLog:
log = self.LogItem(file_type, code, message, domain_name)
if not add_to_list:
return log
dict_name = (file_type, domain_name)
self._add_to_log_list(dict_name, log)
restrict_type = []
if minimal_logging:
restrict_type = [LogCode.INFO, LogCode.WARNING]
@ -99,7 +104,7 @@ class FileTransitionLog:
def _add_to_log_list(self, log_name, log):
if log_name not in self.logs:
self.logs[log_name] = [log]
else:
else:
self.logs[log_name].append(log)
def display_all_logs(self):
@ -107,9 +112,7 @@ class FileTransitionLog:
for parent_log in self.logs:
for child_log in parent_log:
TerminalHelper.print_conditional(
True,
child_log.message,
child_log.severity
True, child_log.message, child_log.severity
)
def display_logs_by_domain_name(self, domain_name, restrict_type=LogCode.DEFAULT):
@ -125,16 +128,14 @@ class FileTransitionLog:
domain_logs = self.get_logs(file_type, domain_name)
if domain_logs is None:
return None
for log in domain_logs:
TerminalHelper.print_conditional(
restrict_type != log.code,
log.message,
log.code
restrict_type != log.code, log.message, log.code
)
def get_logs(self, file_type, domain_name):
"""Grabs the logs associated with
"""Grabs the logs associated with
a particular file_type and domain_name"""
log_name = (file_type, domain_name)
return self.logs.get(log_name)
@ -213,19 +214,20 @@ class LoadExtraTransitionDomain:
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============
Updated {len(updated_transition_domains)} transition domain entries:
{[domain for domain in updated_transition_domains]}
Updated {len(updated_transition_domains)} transition domain entries
{TerminalColors.ENDC}
"""
)
else:
# TODO - update
TerminalHelper.print_conditional(self.debug, f"{TerminalHelper.array_as_string(updated_transition_domains)}")
TerminalHelper.print_conditional(
self.debug,
f"{TerminalHelper.array_as_string(updated_transition_domains)}",
)
logger.error(
f"""{TerminalColors.FAIL}
============= FINISHED WITH ERRORS ===============
Updated {len(updated_transition_domains)} transition domain entries:
{[domain for domain in updated_transition_domains]}
Updated {len(updated_transition_domains)} transition domain entries,
Failed to update {failed_count} transition domain entries:
{[domain for domain in failed_transition_domains]}
{TerminalColors.ENDC}
@ -237,7 +239,8 @@ class LoadExtraTransitionDomain:
total_transition_domains = len(updated_transition_domains)
total_updates_made = TransitionDomain.objects.all().count()
if total_transition_domains != total_updates_made:
logger.error(f"""{TerminalColors.FAIL}
logger.error(
f"""{TerminalColors.FAIL}
WARNING: something went wrong processing domain information data.
Total Transition Domains expecting a data update: {total_transition_domains}
@ -248,7 +251,8 @@ class LoadExtraTransitionDomain:
corrupt data. Please check logs to diagnose.
----- TERMINATING ----
""")
"""
)
sys.exit()
def parse_creation_expiration_data(self, domain_name, transition_domain):
@ -262,19 +266,15 @@ class LoadExtraTransitionDomain:
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ESCROW,
LogCode.ERROR,
"Could not add epp_creation_date and epp_expiration_date "
"Could not add epp_creation_date and epp_expiration_date "
f"on {domain_name}, no data exists.",
domain_name,
not self.debug
not self.debug,
)
return transition_domain
creation_exists = (
transition_domain.epp_creation_date is not None
)
expiration_exists = (
transition_domain.epp_expiration_date is not None
)
creation_exists = transition_domain.epp_creation_date is not None
expiration_exists = transition_domain.epp_expiration_date is not None
transition_domain.epp_creation_date = info.creationdate
transition_domain.epp_expiration_date = info.expirationdate
@ -311,7 +311,7 @@ class LoadExtraTransitionDomain:
LogCode.ERROR,
f"Could not add federal_agency on {domain_name}, no data exists.",
domain_name,
not self.debug
not self.debug,
)
return transition_domain
@ -326,7 +326,7 @@ class LoadExtraTransitionDomain:
LogCode.ERROR,
f"Could not add inactive agency {info.agencyname} on {domain_name}",
domain_name,
not self.debug
not self.debug,
)
return transition_domain
@ -336,7 +336,7 @@ class LoadExtraTransitionDomain:
LogCode.ERROR,
f"Could not add non-federal agency {info.agencyname} on {domain_name}",
domain_name,
not self.debug
not self.debug,
)
return transition_domain
@ -369,7 +369,7 @@ class LoadExtraTransitionDomain:
LogCode.ERROR,
f"Could not add domain_type on {domain_name}, no data exists.",
domain_name,
not self.debug
not self.debug,
)
return transition_domain
@ -392,7 +392,7 @@ class LoadExtraTransitionDomain:
LogCode.ERROR,
f"Could not add inactive domain_type {domain_type[0]} on {domain_name}",
domain_name,
not self.debug
not self.debug,
)
return transition_domain
@ -453,7 +453,7 @@ class LoadExtraTransitionDomain:
LogCode.ERROR,
f"Could not add organization_name on {domain_name}, no data exists.",
domain_name,
not self.debug
not self.debug,
)
return transition_domain
@ -487,7 +487,7 @@ class LoadExtraTransitionDomain:
LogCode.INFO,
f"Added {var_name} as '{changed_value}' on {domain_name}",
domain_name,
not self.debug
not self.debug,
)
else:
self.parse_logs.create_log_item(
@ -495,7 +495,7 @@ class LoadExtraTransitionDomain:
LogCode.WARNING,
f"Updated existing {var_name} to '{changed_value}' on {domain_name}",
domain_name,
not self.debug
not self.debug,
)
# Property getters, i.e. orgid or domaintypeid
@ -523,7 +523,7 @@ class LoadExtraTransitionDomain:
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
# The agency record is within the authority adhoc
authority_id = domain_info.authorityid
authority = self.get_authority_adhoc(authority_id)
@ -542,14 +542,14 @@ class LoadExtraTransitionDomain:
return None
type_id = domain_info.authorityid
return self.get_authority_adhoc(type_id)
def get_domain_escrow_info(self, domain_name):
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
type_id = domain_info.domainname
return self.get_domain_escrow(type_id)
# Object getters, i.e. DomainAdditionalData or OrganizationAdhoc
def get_domain_data(self, desired_id) -> DomainAdditionalData:
"""Grabs a corresponding row within the DOMAIN_ADDITIONAL file,
@ -575,7 +575,7 @@ class LoadExtraTransitionDomain:
"""Grabs a corresponding row within the AUTHORITY_ADHOC file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.AUTHORITY_ADHOC, desired_id)
def get_domain_escrow(self, desired_id) -> DomainEscrow:
"""Grabs a corresponding row within the DOMAIN_ESCROW file,
based off a desired_id"""
@ -615,7 +615,9 @@ class LoadExtraTransitionDomain:
desired_type = self.parsed_data_container.file_data.get(file_type)
if desired_type is None:
self.parse_logs.create_log_item(
file_type, LogCode.ERROR, f"Type {file_type} does not exist",
file_type,
LogCode.ERROR,
f"Type {file_type} does not exist",
)
return None
@ -624,10 +626,13 @@ class LoadExtraTransitionDomain:
obj = desired_type.data.get(desired_id)
if obj is None:
self.parse_logs.create_log_item(
file_type, LogCode.ERROR, f"Id {desired_id} does not exist for {file_type.value[0]}"
file_type,
LogCode.ERROR,
f"Id {desired_id} does not exist for {file_type.value[0]}",
)
return obj
# TODO - change name
@dataclass
class FileDataHolder:
@ -698,18 +703,18 @@ class FileDataHolder:
# matches, then we shouldn't infer
if total_groups == 0 or total_groups > 2:
return (self.filename, False)
# If only one match is returned,
# it means that our default matches our request
if total_groups == 1:
return (self.filename, True)
# Otherwise, if two are returned, then
# its likely the pattern we want
date = match.group(1)
filename_without_date = match.group(2)
# After stripping out the date,
# After stripping out the date,
# do the two filenames match?
can_infer = filename_without_date == default_file_name
if not can_infer:
@ -861,7 +866,7 @@ class ExtraTransitionDomain:
if not infer_filenames:
logger.error(f"Could not find file: {filename}")
continue
# Infer filename logic #
# This mode is used for internal development use and testing only. Rather than having
# to manually define the filename each time, we can infer what the filename
@ -898,26 +903,15 @@ class ExtraTransitionDomain:
file_type.data = {}
def parse_csv_file(
self,
file,
seperator,
dataclass_type,
id_field,
is_domain_escrow=False
self, file, seperator, dataclass_type, id_field, is_domain_escrow=False
):
# Domain escrow is an edge case
if is_domain_escrow:
item_to_return = self._read_domain_escrow(
file,
seperator
)
item_to_return = self._read_domain_escrow(file, seperator)
return item_to_return
else:
item_to_return = self._read_csv_file(
file,
seperator,
dataclass_type,
id_field
file, seperator, dataclass_type, id_field
)
return item_to_return
@ -946,14 +940,16 @@ class ExtraTransitionDomain:
reader = csv.DictReader(requested_file, delimiter=seperator)
for row in reader:
# Checks if we encounter any bad data.
# If we do, we (non-destructively) clean the file
# If we do, we (non-destructively) clean the file
if None in row:
logger.warning(
f"{TerminalColors.YELLOW}"
f"Found bad data in {file}. Attempting to clean."
f"{TerminalColors.ENDC}"
)
updated_file_content = self.replace_bad_seperators(file, f"{seperator}", ";badseperator;")
updated_file_content = self.replace_bad_seperators(
file, f"{seperator}", ";badseperator;"
)
dict_data = {}
break
@ -964,7 +960,7 @@ class ExtraTransitionDomain:
if id_field == "domainname" and row_id is not None:
row_id = row_id.lower()
dict_data[row_id] = dataclass_type(**row)
# After we clean the data, try to parse it again
if updated_file_content:
logger.info(
@ -999,10 +995,10 @@ class ExtraTransitionDomain:
row_id = row_id.lower()
dict_data[row_id] = dataclass_type(**row)
return dict_data
def replace_bad_seperators(self, filename, delimiter, special_character):
with open(filename, "r", encoding="utf-8-sig") as file:
contents = file.read()
new_content = re.sub(rf" \{delimiter} ", special_character, contents)
return new_content
return new_content

View file

@ -4,9 +4,10 @@ import sys
logger = logging.getLogger(__name__)
class LogCode(Enum):
"""Stores the desired log severity
Overview of error codes:
- 1 ERROR
- 2 WARNING
@ -21,6 +22,7 @@ class LogCode(Enum):
DEBUG = 4
DEFAULT = 5
class TerminalColors:
"""Colors for terminal outputs
(makes reading the logs WAY easier)"""
@ -81,7 +83,14 @@ class TerminalHelper:
The "answer" return value is True for "yes" or False for "no".
"""
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False, "e": "exit"}
valid = {
"yes": True,
"y": True,
"ye": True,
"no": False,
"n": False,
"e": "exit",
}
if default is None:
prompt = " [y/n] "
elif default == "yes":
@ -105,22 +114,20 @@ class TerminalHelper:
# @staticmethod
def array_as_string(array_to_convert: []) -> str:
array_as_string = "{}".format(
"\n".join(map(str, array_to_convert))
)
array_as_string = "{}".format("\n".join(map(str, array_to_convert)))
return array_as_string
@staticmethod
def print_conditional(
print_condition: bool,
print_statement: str,
log_severity: LogCode = LogCode.DEFAULT
print_condition: bool,
print_statement: str,
log_severity: LogCode = LogCode.DEFAULT,
):
"""This function reduces complexity of debug statements
in other functions.
It uses the logger to write the given print_statement to the
terminal if print_condition is TRUE.
print_condition: bool -> Prints if print_condition is TRUE
print_statement: str -> The statement to print
@ -181,29 +188,40 @@ class TerminalHelper:
@staticmethod
def get_file_line_count(filepath: str) -> int:
with open(filepath,'r') as file:
with open(filepath, "r") as file:
li = file.readlines()
total_line = len(li)
return total_line
@staticmethod
def print_to_file_conditional(print_condition: bool, filename: str, file_directory: str, file_contents: str):
"""Sometimes logger outputs get insanely huge.
"""
if (print_condition):
def print_to_file_conditional(
print_condition: bool, filename: str, file_directory: str, file_contents: str
):
"""Sometimes logger outputs get insanely huge."""
if print_condition:
# Add a slash if the last character isn't one
if file_directory and file_directory[-1] != "/":
file_directory += "/"
# Assemble filepath
filepath = f"{file_directory}{filename}.txt"
# Write to file
logger.info(f"{TerminalColors.MAGENTA}Writing to file {filepath}...{TerminalColors.ENDC}")
logger.info(
f"{TerminalColors.MAGENTA}Writing to file {filepath}...{TerminalColors.ENDC}"
)
with open(f"{filepath}", "w+") as f:
f.write(file_contents)
@staticmethod
def printProgressBar (iteration, total, prefix = 'Progress:', suffix = 'Complete', decimals = 1, length = 100, fill = '', printEnd = "\r"):
def printProgressBar(
iteration,
total,
prefix="Progress:",
suffix="Complete",
decimals=1,
length=100,
fill="",
printEnd="\r",
):
"""
Call in a loop to create terminal progress bar
@params:
@ -227,10 +245,12 @@ class TerminalHelper:
printProgressBar(i + 1, l, prefix = 'Progress:', suffix = 'Complete', length = 50)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
percent = ("{0:." + str(decimals) + "f}").format(
100 * (iteration / float(total))
)
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd)
bar = fill * filledLength + "-" * (length - filledLength)
print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd)
# Print New Line on Complete
if iteration == total:
print()
if iteration == total:
print()

View file

@ -3,19 +3,20 @@ from typing import Optional
from registrar.management.commands.utility.epp_data_containers import EnumFilenames
@dataclass
class TransitionDomainArguments:
"""Stores arguments for load_transition_domain, structurally a mix
of a dataclass and a regular class, meaning we get a hardcoded
of a dataclass and a regular class, meaning we get a hardcoded
representation of the values we want, while maintaining flexiblity
and reducing boilerplate.
All pre-defined fields are optional but will remain on the model definition.
In this event, they are provided a default value if none is given.
"""
# Maintains an internal kwargs list and sets values
# that match the class definition.
# that match the class definition.
def __init__(self, **kwargs):
self.kwargs = kwargs
for k, v in kwargs.items():
@ -36,14 +37,26 @@ class TransitionDomainArguments:
# Filenames #
## Adhocs ##
agency_adhoc_filename: Optional[str] = field(default=EnumFilenames.AGENCY_ADHOC.value[1], repr=True)
domain_adhoc_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADHOC.value[1], repr=True)
organization_adhoc_filename: Optional[str] = field(default=EnumFilenames.ORGANIZATION_ADHOC.value[1], repr=True)
authority_adhoc_filename: Optional[str] = field(default=EnumFilenames.AUTHORITY_ADHOC.value[1], repr=True)
agency_adhoc_filename: Optional[str] = field(
default=EnumFilenames.AGENCY_ADHOC.value[1], repr=True
)
domain_adhoc_filename: Optional[str] = field(
default=EnumFilenames.DOMAIN_ADHOC.value[1], repr=True
)
organization_adhoc_filename: Optional[str] = field(
default=EnumFilenames.ORGANIZATION_ADHOC.value[1], repr=True
)
authority_adhoc_filename: Optional[str] = field(
default=EnumFilenames.AUTHORITY_ADHOC.value[1], repr=True
)
## Data files ##
domain_escrow_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ESCROW.value[1], repr=True)
domain_additional_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADDITIONAL.value[1], repr=True)
domain_escrow_filename: Optional[str] = field(
default=EnumFilenames.DOMAIN_ESCROW.value[1], repr=True
)
domain_additional_filename: Optional[str] = field(
default=EnumFilenames.DOMAIN_ADDITIONAL.value[1], repr=True
)
domain_contacts_filename: Optional[str] = field(default=None, repr=True)
domain_statuses_filename: Optional[str] = field(default=None, repr=True)
contacts_filename: Optional[str] = field(default=None, repr=True)

View file

@ -13,6 +13,7 @@ from registrar.models import (
from django.core.management import call_command
from unittest.mock import patch
class TestMigrations(TestCase):
def setUp(self):
""" """
@ -46,24 +47,30 @@ class TestMigrations(TestCase):
UserDomainRole.objects.all().delete()
def run_load_domains(self):
with patch('registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit', return_value=True):
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit",
return_value=True,
):
call_command(
"load_transition_domain",
self.migration_json_filename,
directory=self.test_data_file_location
directory=self.test_data_file_location,
)
def run_transfer_domains(self):
call_command("transfer_transition_domains_to_domains")
def run_master_script(self):
with patch('registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit', return_value=True):
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit",
return_value=True,
):
call_command(
"master_domain_migrations",
runMigrations=True,
migrationDirectory=self.test_data_file_location,
migrationJSON=self.migration_json_filename,
disablePrompts=True
disablePrompts=True,
)
def compare_tables(
@ -204,7 +211,7 @@ class TestMigrations(TestCase):
expected_missing_domain_informations,
expected_missing_domain_invitations,
)
def test_load_full_transition_domain(self):
# Load command
self.run_load_domains()
@ -242,7 +249,7 @@ class TestMigrations(TestCase):
federal_type="Executive",
federal_agency="InnoZ",
epp_creation_date=None,
epp_expiration_date=None
epp_expiration_date=None,
),
TransitionDomain(
username="reginald.ratcliff4@test.com",
@ -254,20 +261,21 @@ class TestMigrations(TestCase):
federal_type=None,
federal_agency=None,
epp_creation_date=None,
epp_expiration_date=None
)
epp_expiration_date=None,
),
]
expected_transition_domains = TransitionDomain.objects.filter(username="alexandra.bobbitt5@test.com")
expected_transition_domains = TransitionDomain.objects.filter(
username="alexandra.bobbitt5@test.com"
)
self.assertEqual(expected_transition_domains.count(), 1)
expected_transition_domain = expected_transition_domains.get()
#TransitionDomain.objects.filter(domain_name = "fakewebsite3.gov")
# TransitionDomain.objects.filter(domain_name = "fakewebsite3.gov")
# Afterwards, their values should be what we expect
all_transition_domains = TransitionDomain.objects.all()
for domain in all_transition_domains:
for expected in expected_transition_domains:
# This data gets created when the object is,
# so we should just match it. Not relevant
# to the added data.
@ -277,7 +285,7 @@ class TestMigrations(TestCase):
# Each TransitionDomain should have the correct data
self.assertEqual(domain, expected)
def test_load_full_domain(self):
self.run_load_domains()
self.run_transfer_domains()
@ -323,10 +331,10 @@ class TestMigrations(TestCase):
testdomain = testdomain_domains.get()
self.assertEqual(testdomain.expiration_date, datetime.date(2023, 9, 30))
#self.assertEqual(testdomain.created_at, "test")
# self.assertEqual(testdomain.created_at, "test")
self.assertEqual(testdomain.name, "fakewebsite2.gov")
self.assertEqual(testdomain.state, "on hold")
def test_load_full_domain_information(self):
self.run_load_domains()
self.run_transfer_domains()
@ -355,7 +363,7 @@ class TestMigrations(TestCase):
# Test created Domain Information objects
domain = Domain.objects.filter(name="anomaly.gov").get()
anomaly_domain_infos = DomainInformation.objects.filter(domain=domain)
self.assertEqual(anomaly_domain_infos.count(), 1)
# This domain should be pretty barebones. Something isnt
@ -365,7 +373,7 @@ class TestMigrations(TestCase):
self.assertEqual(anomaly.organization_type, None)
self.assertEqual(anomaly.federal_agency, None)
self.assertEqual(anomaly.federal_type, None)
# Check for the "system" creator user
Users = User.objects.filter(username="System")
self.assertEqual(Users.count(), 1)
@ -380,13 +388,12 @@ class TestMigrations(TestCase):
self.assertEqual(fakewebsite.organization_type, "federal")
self.assertEqual(fakewebsite.federal_agency, "Department of Commerce")
self.assertEqual(fakewebsite.federal_type, "executive")
# Check for the "system" creator user
Users = User.objects.filter(username="System")
self.assertEqual(Users.count(), 1)
self.assertEqual(anomaly.creator, Users.get())
def test_transfer_transition_domains_to_domains(self):
self.run_load_domains()
self.run_transfer_domains()

View file

@ -100,7 +100,7 @@ class DomainPermission(PermissionsLoginMixin):
if DomainInformation.objects.filter(id=pk).exists():
requested_domain = DomainInformation.objects.get(id=pk)
# If no domain_application object exists and we are
# If no domain_application object exists and we are
# coming from the manage_domain dashboard, this is likely
# a transition domain.
domain_application = requested_domain.domain_application