Migrate logs

This commit is contained in:
zandercymatics 2023-11-01 16:04:14 -06:00
parent 3bb7ea623f
commit d88e36e5a3
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
2 changed files with 178 additions and 70 deletions

View file

@ -5,7 +5,7 @@ Regarding our dataclasses:
Not intended to be used as models but rather as an alternative to storing as a dictionary. Not intended to be used as models but rather as an alternative to storing as a dictionary.
By keeping it as a dataclass instead of a dictionary, we can maintain data consistency. By keeping it as a dataclass instead of a dictionary, we can maintain data consistency.
""" """
from dataclasses import dataclass from dataclasses import dataclass, field
from datetime import date from datetime import date
from enum import Enum from enum import Enum
from typing import List, Optional from typing import List, Optional
@ -15,69 +15,69 @@ from typing import List, Optional
class AgencyAdhoc: class AgencyAdhoc:
"""Defines the structure given in the AGENCY_ADHOC file""" """Defines the structure given in the AGENCY_ADHOC file"""
agencyid: Optional[int] = None agencyid: Optional[int] = field(default=None, repr=True)
agencyname: Optional[str] = None agencyname: Optional[str] = field(default=None, repr=True)
active: Optional[str] = None active: Optional[str] = field(default=None, repr=True)
isfederal: Optional[str] = None isfederal: Optional[str] = field(default=None, repr=True)
@dataclass @dataclass
class DomainAdditionalData: class DomainAdditionalData:
"""Defines the structure given in the DOMAIN_ADDITIONAL file""" """Defines the structure given in the DOMAIN_ADDITIONAL file"""
domainname: Optional[str] = None domainname: Optional[str] = field(default=None, repr=True)
domaintypeid: Optional[int] = None domaintypeid: Optional[int] = field(default=None, repr=True)
authorityid: Optional[int] = None authorityid: Optional[int] = field(default=None, repr=True)
orgid: Optional[int] = None orgid: Optional[int] = field(default=None, repr=True)
securitycontactemail: Optional[str] = None securitycontactemail: Optional[str] = field(default=None, repr=True)
dnsseckeymonitor: Optional[str] = None dnsseckeymonitor: Optional[str] = field(default=None, repr=True)
domainpurpose: Optional[str] = None domainpurpose: Optional[str] = field(default=None, repr=True)
@dataclass @dataclass
class DomainTypeAdhoc: class DomainTypeAdhoc:
"""Defines the structure given in the DOMAIN_ADHOC file""" """Defines the structure given in the DOMAIN_ADHOC file"""
domaintypeid: Optional[int] = None domaintypeid: Optional[int] = field(default=None, repr=True)
domaintype: Optional[str] = None domaintype: Optional[str] = field(default=None, repr=True)
code: Optional[str] = None code: Optional[str] = field(default=None, repr=True)
active: Optional[str] = None active: Optional[str] = field(default=None, repr=True)
@dataclass @dataclass
class OrganizationAdhoc: class OrganizationAdhoc:
"""Defines the structure given in the ORGANIZATION_ADHOC file""" """Defines the structure given in the ORGANIZATION_ADHOC file"""
orgid: Optional[int] = None orgid: Optional[int] = field(default=None, repr=True)
orgname: Optional[str] = None orgname: Optional[str] = field(default=None, repr=True)
orgstreet: Optional[str] = None orgstreet: Optional[str] = field(default=None, repr=True)
orgcity: Optional[str] = None orgcity: Optional[str] = field(default=None, repr=True)
orgstate: Optional[str] = None orgstate: Optional[str] = field(default=None, repr=True)
orgzip: Optional[str] = None orgzip: Optional[str] = field(default=None, repr=True)
orgcountrycode: Optional[str] = None orgcountrycode: Optional[str] = field(default=None, repr=True)
@dataclass @dataclass
class AuthorityAdhoc: class AuthorityAdhoc:
"""Defines the structure given in the AUTHORITY_ADHOC file""" """Defines the structure given in the AUTHORITY_ADHOC file"""
authorityid: Optional[int] = None authorityid: Optional[int] = field(default=None, repr=True)
firstname: Optional[str] = None firstname: Optional[str] = field(default=None, repr=True)
middlename: Optional[str] = None middlename: Optional[str] = field(default=None, repr=True)
lastname: Optional[str] = None lastname: Optional[str] = field(default=None, repr=True)
email: Optional[str] = None email: Optional[str] = field(default=None, repr=True)
phonenumber: Optional[str] = None phonenumber: Optional[str] = field(default=None, repr=True)
agencyid: Optional[int] = None agencyid: Optional[int] = field(default=None, repr=True)
addlinfo: Optional[List[str]] = None addlinfo: Optional[List[str]] = field(default=None, repr=True)
@dataclass @dataclass
class DomainEscrow: class DomainEscrow:
"""Defines the structure given in the DOMAIN_ESCROW file""" """Defines the structure given in the DOMAIN_ESCROW file"""
domainname: Optional[str] = None domainname: Optional[str] = field(default=None, repr=True)
creationdate: Optional[date] = None creationdate: Optional[date] = field(default=None, repr=True)
expirationdate: Optional[date] = None expirationdate: Optional[date] = field(default=None, repr=True)
class EnumFilenames(Enum): class EnumFilenames(Enum):

View file

@ -93,6 +93,11 @@ class FileTransitionLog:
self.logs[file_type].append(log) self.logs[file_type].append(log)
return log return log
def display_all_logs(self):
"""Logs every LogItem contained in this object"""
for file_type in self.logs:
self.display_logs(file_type)
def display_logs(self, file_type): def display_logs(self, file_type):
"""Displays all logs in the given file_type in EnumFilenames. """Displays all logs in the given file_type in EnumFilenames.
Will log with the correct severity depending on code. Will log with the correct severity depending on code.
@ -108,6 +113,21 @@ class FileTransitionLog:
case LogCode.DEBUG: case LogCode.DEBUG:
logger.debug(log.message) logger.debug(log.message)
def clear_logs(self):
"""Clears log information"""
self.logs = {
EnumFilenames.DOMAIN_ADHOC: [],
EnumFilenames.AGENCY_ADHOC: [],
EnumFilenames.ORGANIZATION_ADHOC: [],
EnumFilenames.DOMAIN_ADDITIONAL: [],
EnumFilenames.DOMAIN_ESCROW: [],
}
def get_logs(self, file_type):
"""Grabs the logs associated with
a particular file_type"""
return self.logs.get(file_type)
class LoadExtraTransitionDomain: class LoadExtraTransitionDomain:
"""Grabs additional data for TransitionDomains.""" """Grabs additional data for TransitionDomains."""
@ -118,45 +138,89 @@ class LoadExtraTransitionDomain:
arguments = options.args_extra_transition_domain() arguments = options.args_extra_transition_domain()
# Reads and parses migration files # Reads and parses migration files
self.domain_object = ExtraTransitionDomain(**arguments) self.parsed_data_container = ExtraTransitionDomain(**arguments)
self.domain_object.parse_all_files() self.parsed_data_container.parse_all_files()
def create_update_model_logs(self, file_type):
"""Associates runtime logs to the file_type,
such that we can determine where errors occured when
updating a TransitionDomain model."""
logs = self.parse_logs.get_logs(file_type)
self.parsed_data_container.set_logs(file_type, logs)
def update_transition_domain_models(self): def update_transition_domain_models(self):
"""Updates TransitionDomain objects based off the file content """Updates TransitionDomain objects based off the file content
given in self.domain_object""" given in self.parsed_data_container"""
all_transition_domains = TransitionDomain.objects.all() all_transition_domains = TransitionDomain.objects.all()
if not all_transition_domains.exists(): if not all_transition_domains.exists():
raise Exception("No TransitionDomain objects exist.") raise Exception("No TransitionDomain objects exist.")
for transition_domain in all_transition_domains: try:
domain_name = transition_domain.domain_name.upper() for transition_domain in all_transition_domains:
updated_transition_domain = transition_domain domain_name = transition_domain.domain_name.upper()
updated_transition_domain = transition_domain
# STEP 1: Parse organization data # STEP 1: Parse organization data
updated_transition_domain = self.parse_org_data( updated_transition_domain = self.parse_org_data(
domain_name, transition_domain domain_name, transition_domain
) )
self.parse_logs.display_logs(EnumFilenames.ORGANIZATION_ADHOC) # Store the event logs
self.create_update_model_logs(EnumFilenames.ORGANIZATION_ADHOC)
# STEP 2: Parse domain type data # STEP 2: Parse domain type data
updated_transition_domain = self.parse_domain_type_data( updated_transition_domain = self.parse_domain_type_data(
domain_name, transition_domain domain_name, transition_domain
) )
self.parse_logs.display_logs(EnumFilenames.DOMAIN_ADHOC) # Store the event logs
self.create_update_model_logs(EnumFilenames.DOMAIN_ADHOC)
# STEP 3: Parse agency data # STEP 3: Parse agency data
updated_transition_domain = self.parse_agency_data( updated_transition_domain = self.parse_agency_data(
domain_name, transition_domain domain_name, transition_domain
) )
self.parse_logs.display_logs(EnumFilenames.AGENCY_ADHOC) # Store the event logs
self.create_update_model_logs(EnumFilenames.AGENCY_ADHOC)
# STEP 4: Parse creation and expiration data # STEP 4: Parse creation and expiration data
updated_transition_domain = self.parse_creation_expiration_data( updated_transition_domain = self.parse_creation_expiration_data(
domain_name, transition_domain domain_name, transition_domain
) )
self.parse_logs.display_logs(EnumFilenames.DOMAIN_ESCROW) # Store the event logs
self.create_update_model_logs(EnumFilenames.DOMAIN_ADHOC)
updated_transition_domain.save()
logger.info(f"Succesfully updated TransitionDomain {domain_name}")
self.parse_logs.clear_logs()
except Exception as err:
logger.error("Could not update all TransitionDomain objects.")
# Regardless of what occurred, log what happened.
logger.info("======Printing log stack======")
self.parse_logs.display_all_logs()
raise err
else:
self.display_run_summary()
def display_run_summary(self):
"""Prints information about this particular run.
Organizes like data together.
"""
container = self.parsed_data_container
agency_adhoc = container.get_logs_for_type(EnumFilenames.AGENCY_ADHOC)
authority_adhoc = container.get_logs_for_type(EnumFilenames.AUTHORITY_ADHOC)
domain_additional = container.get_logs_for_type(EnumFilenames.DOMAIN_ADDITIONAL)
domain_adhoc = container.get_logs_for_type(EnumFilenames.DOMAIN_ADHOC)
domain_escrow = container.get_logs_for_type(EnumFilenames.DOMAIN_ESCROW)
organization_adhoc = container.get_logs_for_type(EnumFilenames.ORGANIZATION_ADHOC)
variable_data = []
for file_type in self.parsed_data_container.file_data:
# Grab all logs for
logs = self.parsed_data_container.get_logs_for_type(file_type)
variable_data.append(logs)
#agency_adhoc, authority_adhoc, domain_additional, domain_adhoc, domain_escrow, organization_adhoc = variable_data
updated_transition_domain.save()
def parse_creation_expiration_data(self, domain_name, transition_domain): def parse_creation_expiration_data(self, domain_name, transition_domain):
"""Grabs expiration_date from the parsed files and associates it """Grabs expiration_date from the parsed files and associates it
@ -384,14 +448,14 @@ class LoadExtraTransitionDomain:
if not is_update: if not is_update:
self.parse_logs.create_log_item( self.parse_logs.create_log_item(
file_type, file_type,
LogCode.DEBUG, LogCode.INFO,
f"Added {var_name} as '{changed_value}' on {domain_name}", f"Added {var_name} as '{changed_value}' on {domain_name}",
domain_name, domain_name,
) )
else: else:
self.parse_logs.create_log_item( self.parse_logs.create_log_item(
file_type, file_type,
LogCode.INFO, LogCode.WARNING,
f"Updated existing {var_name} to '{changed_value}' on {domain_name}", f"Updated existing {var_name} to '{changed_value}' on {domain_name}",
domain_name, domain_name,
) )
@ -501,7 +565,7 @@ class LoadExtraTransitionDomain:
""" """
# Grabs a dict associated with the file_type. # Grabs a dict associated with the file_type.
# For example, EnumFilenames.DOMAIN_ADDITIONAL. # For example, EnumFilenames.DOMAIN_ADDITIONAL.
desired_type = self.domain_object.file_data.get(file_type) desired_type = self.parsed_data_container.file_data.get(file_type)
if desired_type is None: if desired_type is None:
self.parse_logs.create_log_item( self.parse_logs.create_log_item(
file_type, LogCode.ERROR, f"Type {file_type} does not exist" file_type, LogCode.ERROR, f"Type {file_type} does not exist"
@ -565,7 +629,7 @@ class PatternMap:
# Object data # # Object data #
self.data = {} self.data = {}
self.logs = [] self.logs = {}
def try_infer_filename(self, current_file_name, default_file_name): def try_infer_filename(self, current_file_name, default_file_name):
"""Tries to match a given filename to a regex, """Tries to match a given filename to a regex,
@ -790,16 +854,34 @@ class ExtraTransitionDomain:
file_type.data = {} file_type.data = {}
def parse_csv_file( def parse_csv_file(
self, file, seperator, dataclass_type, id_field, is_domain_escrow=False self,
file_type,
file,
seperator,
dataclass_type,
id_field,
is_domain_escrow=False
): ):
# Domain escrow is an edge case # Domain escrow is an edge case
if is_domain_escrow: if is_domain_escrow:
return self._read_domain_escrow(file, seperator) item_to_return = self._read_domain_escrow(
file_type,
file,
seperator
)
return item_to_return
else: else:
return self._read_csv_file(file, seperator, dataclass_type, id_field) item_to_return = self._read_csv_file(
file_type,
file,
seperator,
dataclass_type,
id_field
)
return item_to_return
# Domain escrow is an edgecase given that its structured differently data-wise. # Domain escrow is an edgecase given that its structured differently data-wise.
def _read_domain_escrow(self, file, seperator): def _read_domain_escrow(self, file_type, file, seperator):
dict_data = {} dict_data = {}
with open(file, "r", encoding="utf-8-sig") as requested_file: with open(file, "r", encoding="utf-8-sig") as requested_file:
reader = csv.reader(requested_file, delimiter=seperator) reader = csv.reader(requested_file, delimiter=seperator)
@ -813,9 +895,13 @@ class ExtraTransitionDomain:
dict_data[domain_name] = DomainEscrow( dict_data[domain_name] = DomainEscrow(
domain_name, creation_date, expiration_date domain_name, creation_date, expiration_date
) )
# Given this row_id, create a default log object.
# So that we can track logs on it later.
self.set_log(file_type, domain_name, [])
return dict_data return dict_data
def _read_csv_file(self, file, seperator, dataclass_type, id_field): def _read_csv_file(self, file_type, file, seperator, dataclass_type, id_field):
with open(file, "r", encoding="utf-8-sig") as requested_file: with open(file, "r", encoding="utf-8-sig") as requested_file:
reader = csv.DictReader(requested_file, delimiter=seperator) reader = csv.DictReader(requested_file, delimiter=seperator)
""" """
@ -832,5 +918,27 @@ class ExtraTransitionDomain:
continue continue
row_id = row[id_field] row_id = row[id_field]
dict_data[row_id] = dataclass_type(**row) dict_data[row_id] = dataclass_type(**row)
# Given this row_id, create a default log object.
# So that we can track logs on it later.
self.set_log(file_type, row_id, [])
# dict_data = {row[id_field]: dataclass_type(**row) for row in reader} # dict_data = {row[id_field]: dataclass_type(**row) for row in reader}
return dict_data return dict_data
# Logging logic #
def get_logs_for_type(self, file_type):
"""Returns all logs for the given file_type"""
return self.file_data.get(file_type).logs
def get_log(self, file_type, item_id):
"""Returns a log of a particular id"""
logs = self.get_logs_for_type(file_type)
return logs.get(item_id)
def set_logs_for_type(self, file_type, logs):
"""Sets all logs for a given file_type"""
self.file_data[file_type] = logs
def set_log(self, file_type, item_id, log):
"""Creates a single log item under a given file_type"""
self.file_data.get(file_type)[item_id] = log