Bug fixes, better logging

This commit is contained in:
zandercymatics 2023-11-02 11:06:54 -06:00
parent d88e36e5a3
commit f804b0aa0c
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
3 changed files with 143 additions and 143 deletions

View file

@ -23,19 +23,11 @@ from .epp_data_containers import (
)
from .transition_domain_arguments import TransitionDomainArguments
from .terminal_helper import TerminalColors, TerminalHelper, LogCode
logger = logging.getLogger(__name__)
class LogCode(Enum):
"""Stores the desired log severity"""
ERROR = 1
WARNING = 2
INFO = 3
DEBUG = 4
class FileTransitionLog:
"""Container for storing event logs. Used to lessen
the complexity of storing multiple logs across multiple
@ -50,13 +42,7 @@ class FileTransitionLog:
"""
def __init__(self):
self.logs = {
EnumFilenames.DOMAIN_ADHOC: [],
EnumFilenames.AGENCY_ADHOC: [],
EnumFilenames.ORGANIZATION_ADHOC: [],
EnumFilenames.DOMAIN_ADDITIONAL: [],
EnumFilenames.DOMAIN_ESCROW: [],
}
self.logs = {}
class LogItem:
"""Used for storing data about logger information."""
@ -70,14 +56,19 @@ class FileTransitionLog:
def add_log(self, file_type, code, message, domain_name):
"""Adds a log item to self.logs
file_type -> Which array to add to,
file_type -> Which enum to associate with,
ex. EnumFilenames.DOMAIN_ADHOC
code -> Log severity or other metadata, ex. LogCode.ERROR
message -> Message to display
domain_name -> Name of the domain, i.e. "igorville.gov"
"""
self.logs[file_type].append(self.LogItem(file_type, code, message, domain_name))
log = self.LogItem(file_type, code, message, domain_name)
dict_name = (file_type, domain_name)
self._add_to_log_list(dict_name, log)
def create_log_item(
self, file_type, code, message, domain_name=None, add_to_list=True
@ -89,51 +80,61 @@ class FileTransitionLog:
log = self.LogItem(file_type, code, message, domain_name)
if not add_to_list:
return log
else:
self.logs[file_type].append(log)
dict_name = (file_type, domain_name)
self._add_to_log_list(dict_name, log)
return log
def _add_to_log_list(self, log_name, log):
if log_name not in self.logs:
self.logs[log_name] = [log]
else:
self.logs[log_name].append(log)
def display_all_logs(self):
"""Logs every LogItem contained in this object"""
for file_type in self.logs:
self.display_logs(file_type)
for parent_log in self.logs:
for child_log in parent_log:
TerminalHelper.print_conditional(
True,
child_log.message,
child_log.severity
)
def display_logs(self, file_type):
"""Displays all logs in the given file_type in EnumFilenames.
def display_logs_by_domain_name(self, domain_name, restrict_type=LogCode.DEFAULT):
"""Displays all logs of a given domain_name.
Will log with the correct severity depending on code.
"""
for log in self.logs.get(file_type):
match log.code:
case LogCode.ERROR:
logger.error(log.message)
case LogCode.WARNING:
logger.warning(log.message)
case LogCode.INFO:
logger.info(log.message)
case LogCode.DEBUG:
logger.debug(log.message)
def clear_logs(self):
"""Clears log information"""
self.logs = {
EnumFilenames.DOMAIN_ADHOC: [],
EnumFilenames.AGENCY_ADHOC: [],
EnumFilenames.ORGANIZATION_ADHOC: [],
EnumFilenames.DOMAIN_ADDITIONAL: [],
EnumFilenames.DOMAIN_ESCROW: [],
}
def get_logs(self, file_type):
domain_name: str -> The domain to target, such as "igorville.gov"
restrict_type: LogCode -> Determines if only errors of a certain
type should be displayed, such as LogCode.ERROR.
"""
for file_type in EnumFilenames:
domain_logs = self.get_logs(file_type, domain_name)
if domain_logs is None:
return None
for log in domain_logs:
TerminalHelper.print_conditional(
restrict_type != log.code,
log.message,
log.code
)
def get_logs(self, file_type, domain_name):
"""Grabs the logs associated with
a particular file_type"""
return self.logs.get(file_type)
a particular file_type and domain_name"""
log_name = (file_type, domain_name)
return self.logs.get(log_name)
class LoadExtraTransitionDomain:
"""Grabs additional data for TransitionDomains."""
def __init__(self, options: TransitionDomainArguments):
# Stores event logs and organizes them
# Globally stores event logs and organizes them
self.parse_logs = FileTransitionLog()
arguments = options.args_extra_transition_domain()
@ -141,87 +142,65 @@ class LoadExtraTransitionDomain:
self.parsed_data_container = ExtraTransitionDomain(**arguments)
self.parsed_data_container.parse_all_files()
def create_update_model_logs(self, file_type):
"""Associates runtime logs to the file_type,
such that we can determine where errors occured when
updating a TransitionDomain model."""
logs = self.parse_logs.get_logs(file_type)
self.parsed_data_container.set_logs(file_type, logs)
def update_transition_domain_models(self):
"""Updates TransitionDomain objects based off the file content
given in self.parsed_data_container"""
all_transition_domains = TransitionDomain.objects.all()
if not all_transition_domains.exists():
raise Exception("No TransitionDomain objects exist.")
try:
for transition_domain in all_transition_domains:
domain_name = transition_domain.domain_name.upper()
updated_transition_domain = transition_domain
raise ValueError("No TransitionDomain objects exist.")
for transition_domain in all_transition_domains:
domain_name = transition_domain.domain_name.upper()
updated_transition_domain = transition_domain
try:
# STEP 1: Parse organization data
updated_transition_domain = self.parse_org_data(
domain_name, transition_domain
)
# Store the event logs
self.create_update_model_logs(EnumFilenames.ORGANIZATION_ADHOC)
# STEP 2: Parse domain type data
updated_transition_domain = self.parse_domain_type_data(
domain_name, transition_domain
)
# Store the event logs
self.create_update_model_logs(EnumFilenames.DOMAIN_ADHOC)
# STEP 3: Parse agency data
updated_transition_domain = self.parse_agency_data(
domain_name, transition_domain
)
# Store the event logs
self.create_update_model_logs(EnumFilenames.AGENCY_ADHOC)
# STEP 4: Parse creation and expiration data
updated_transition_domain = self.parse_creation_expiration_data(
domain_name, transition_domain
)
# Store the event logs
self.create_update_model_logs(EnumFilenames.DOMAIN_ADHOC)
updated_transition_domain.save()
self.parse_logs.display_logs_by_domain_name(domain_name)
logger.info(
f"{TerminalColors.OKCYAN}"
f"Successfully updated {domain_name}"
f"{TerminalColors.ENDC}"
)
logger.info(f"Succesfully updated TransitionDomain {domain_name}")
self.parse_logs.clear_logs()
except Exception as err:
logger.error("Could not update all TransitionDomain objects.")
# Regardless of what occurred, log what happened.
logger.info("======Printing log stack======")
self.parse_logs.display_all_logs()
# If we run into an exception on this domain,
# Just skip over it and log that it happened.
except Exception as err:
logger.debug(err)
logger.info(
f"{TerminalColors.FAIL}"
f"Exception encountered on {domain_name}. Could not update."
f"{TerminalColors.ENDC}"
)
raise err
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============
Created 123 transition domain entries,
updated 123 transition domain entries
{TerminalColors.ENDC}
"""
)
raise err
else:
self.display_run_summary()
def display_run_summary(self):
"""Prints information about this particular run.
Organizes like data together.
"""
container = self.parsed_data_container
agency_adhoc = container.get_logs_for_type(EnumFilenames.AGENCY_ADHOC)
authority_adhoc = container.get_logs_for_type(EnumFilenames.AUTHORITY_ADHOC)
domain_additional = container.get_logs_for_type(EnumFilenames.DOMAIN_ADDITIONAL)
domain_adhoc = container.get_logs_for_type(EnumFilenames.DOMAIN_ADHOC)
domain_escrow = container.get_logs_for_type(EnumFilenames.DOMAIN_ESCROW)
organization_adhoc = container.get_logs_for_type(EnumFilenames.ORGANIZATION_ADHOC)
variable_data = []
for file_type in self.parsed_data_container.file_data:
# Grab all logs for
logs = self.parsed_data_container.get_logs_for_type(file_type)
variable_data.append(logs)
#agency_adhoc, authority_adhoc, domain_additional, domain_adhoc, domain_escrow, organization_adhoc = variable_data
def parse_creation_expiration_data(self, domain_name, transition_domain):
"""Grabs expiration_date from the parsed files and associates it
with a transition_domain object, then returns that object."""
@ -485,8 +464,16 @@ class LoadExtraTransitionDomain:
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
type_id = domain_info.orgid
return self.get_domain_adhoc(type_id)
# The agency record is within the authority adhoc
authority_id = domain_info.authorityid
authority = self.get_authority_adhoc(authority_id)
type_id = None
if authority is not None:
type_id = authority.agencyid
return self.get_agency_adhoc(type_id)
def get_authority_info(self, domain_name):
"""Maps an id given in get_domain_data to a authority_adhoc
@ -629,7 +616,6 @@ class PatternMap:
# Object data #
self.data = {}
self.logs = {}
def try_infer_filename(self, current_file_name, default_file_name):
"""Tries to match a given filename to a regex,
@ -821,7 +807,7 @@ class ExtraTransitionDomain:
# Infer filename logic #
# This mode is used for development and testing only. Rather than having
# to manually define the filename each time, we can infer what the filename
# actually is.
# actually is.
# Not intended for use outside of that, as it is better to assume
# the end-user wants to be specific.
@ -855,7 +841,6 @@ class ExtraTransitionDomain:
def parse_csv_file(
self,
file_type,
file,
seperator,
dataclass_type,
@ -865,14 +850,12 @@ class ExtraTransitionDomain:
# Domain escrow is an edge case
if is_domain_escrow:
item_to_return = self._read_domain_escrow(
file_type,
file,
seperator
)
return item_to_return
else:
item_to_return = self._read_csv_file(
file_type,
file,
seperator,
dataclass_type,
@ -881,7 +864,7 @@ class ExtraTransitionDomain:
return item_to_return
# Domain escrow is an edgecase given that its structured differently data-wise.
def _read_domain_escrow(self, file_type, file, seperator):
def _read_domain_escrow(self, file, seperator):
dict_data = {}
with open(file, "r", encoding="utf-8-sig") as requested_file:
reader = csv.reader(requested_file, delimiter=seperator)
@ -895,13 +878,9 @@ class ExtraTransitionDomain:
dict_data[domain_name] = DomainEscrow(
domain_name, creation_date, expiration_date
)
# Given this row_id, create a default log object.
# So that we can track logs on it later.
self.set_log(file_type, domain_name, [])
return dict_data
def _read_csv_file(self, file_type, file, seperator, dataclass_type, id_field):
def _read_csv_file(self, file, seperator, dataclass_type, id_field):
with open(file, "r", encoding="utf-8-sig") as requested_file:
reader = csv.DictReader(requested_file, delimiter=seperator)
"""
@ -913,32 +892,12 @@ class ExtraTransitionDomain:
for row in reader:
if None in row:
print("Skipping row with None key")
# for key, value in row.items():
# print(f"key: {key} value: {value}")
print(dataclass_type)
for key, value in row.items():
print(f"key: {key} value: {value}")
continue
row_id = row[id_field]
dict_data[row_id] = dataclass_type(**row)
# Given this row_id, create a default log object.
# So that we can track logs on it later.
self.set_log(file_type, row_id, [])
# dict_data = {row[id_field]: dataclass_type(**row) for row in reader}
return dict_data
# Logging logic #
def get_logs_for_type(self, file_type):
"""Returns all logs for the given file_type"""
return self.file_data.get(file_type).logs
def get_log(self, file_type, item_id):
"""Returns a log of a particular id"""
logs = self.get_logs_for_type(file_type)
return logs.get(item_id)
def set_logs_for_type(self, file_type, logs):
"""Sets all logs for a given file_type"""
self.file_data[file_type] = logs
def set_log(self, file_type, item_id, log):
"""Creates a single log item under a given file_type"""
self.file_data.get(file_type)[item_id] = log