Merge branch 'main' into za/1241-imported-ao-data

This commit is contained in:
zandercymatics 2023-11-13 11:22:43 -07:00
commit 91019ee9b3
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
83 changed files with 819 additions and 2013 deletions

View file

@ -111,9 +111,7 @@ class FileTransitionLog:
"""Logs every LogItem contained in this object"""
for parent_log in self.logs:
for child_log in parent_log:
TerminalHelper.print_conditional(
True, child_log.message, child_log.severity
)
TerminalHelper.print_conditional(True, child_log.message, child_log.severity)
def display_logs_by_domain_name(self, domain_name, restrict_type=LogCode.DEFAULT):
"""Displays all logs of a given domain_name.
@ -130,9 +128,7 @@ class FileTransitionLog:
return None
for log in domain_logs:
TerminalHelper.print_conditional(
restrict_type != log.code, log.message, log.code
)
TerminalHelper.print_conditional(restrict_type != log.code, log.message, log.code)
def get_logs(self, file_type, domain_name):
"""Grabs the logs associated with
@ -166,38 +162,21 @@ class LoadExtraTransitionDomain:
updated_transition_domain = transition_domain
try:
# STEP 1: Parse organization data
updated_transition_domain = self.parse_org_data(
domain_name, transition_domain
)
updated_transition_domain = self.parse_org_data(domain_name, transition_domain)
# STEP 2: Parse domain type data
updated_transition_domain = self.parse_domain_type_data(
domain_name, transition_domain
)
updated_transition_domain = self.parse_domain_type_data(domain_name, transition_domain)
# STEP 3: Parse authority data
updated_transition_domain = self.parse_authority_data(
domain_name, transition_domain
)
# STEP 3: Parse agency data
updated_transition_domain = self.parse_agency_data(domain_name, transition_domain)
# STEP 4: Parse agency data
updated_transition_domain = self.parse_agency_data(
domain_name, transition_domain
)
# STEP 5: Parse creation and expiration data
updated_transition_domain = self.parse_creation_expiration_data(
domain_name, transition_domain
)
# STEP 4: Parse creation and expiration data
updated_transition_domain = self.parse_creation_expiration_data(domain_name, transition_domain)
# Check if the instance has changed before saving
updated_transition_domain.save()
updated_transition_domains.append(updated_transition_domain)
logger.info(
f"{TerminalColors.OKCYAN}"
f"Successfully updated {domain_name}"
f"{TerminalColors.ENDC}"
)
logger.info(f"{TerminalColors.OKCYAN}" f"Successfully updated {domain_name}" f"{TerminalColors.ENDC}")
# If we run into an exception on this domain,
# Just skip over it and log that it happened.
@ -272,8 +251,7 @@ class LoadExtraTransitionDomain:
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ESCROW,
LogCode.ERROR,
"Could not add epp_creation_date and epp_expiration_date "
f"on {domain_name}, no data exists.",
"Could not add epp_creation_date and epp_expiration_date " f"on {domain_name}, no data exists.",
domain_name,
not self.debug,
)
@ -375,10 +353,7 @@ class LoadExtraTransitionDomain:
)
return transition_domain
agency_exists = (
transition_domain.federal_agency is not None
and transition_domain.federal_agency.strip() != ""
)
agency_exists = transition_domain.federal_agency is not None and transition_domain.federal_agency.strip() != ""
if not isinstance(info.active, str) or not info.active.lower() == "y":
self.parse_logs.create_log_item(
@ -393,12 +368,11 @@ class LoadExtraTransitionDomain:
if not isinstance(info.isfederal, str) or not info.isfederal.lower() == "y":
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR,
f"Could not add non-federal agency {info.agencyname} on {domain_name}",
LogCode.INFO,
f"Adding non-federal agency {info.agencyname} on {domain_name}",
domain_name,
not self.debug,
)
return transition_domain
transition_domain.federal_agency = info.agencyname
@ -414,9 +388,7 @@ class LoadExtraTransitionDomain:
return transition_domain
def parse_domain_type_data(
self, domain_name, transition_domain: TransitionDomain
) -> TransitionDomain:
def parse_domain_type_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain:
"""Grabs organization_type and federal_type from the parsed files
and associates it with a transition_domain object, then returns that object."""
if not isinstance(transition_domain, TransitionDomain):
@ -461,12 +433,10 @@ class LoadExtraTransitionDomain:
# Are we updating data that already exists,
# or are we adding new data in its place?
organization_type_exists = (
transition_domain.organization_type is not None
and transition_domain.organization_type.strip() != ""
transition_domain.organization_type is not None and transition_domain.organization_type.strip() != ""
)
federal_type_exists = (
transition_domain.federal_type is not None
and transition_domain.federal_type.strip() != ""
transition_domain.federal_type is not None and transition_domain.federal_type.strip() != ""
)
# If we get two records, then we know it is federal.
@ -500,9 +470,7 @@ class LoadExtraTransitionDomain:
return transition_domain
def parse_org_data(
self, domain_name, transition_domain: TransitionDomain
) -> TransitionDomain:
def parse_org_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain:
"""Grabs organization_name from the parsed files and associates it
with a transition_domain object, then returns that object."""
if not isinstance(transition_domain, TransitionDomain):
@ -520,8 +488,7 @@ class LoadExtraTransitionDomain:
return transition_domain
desired_property_exists = (
transition_domain.organization_name is not None
and transition_domain.organization_name.strip() != ""
transition_domain.organization_name is not None and transition_domain.organization_name.strip() != ""
)
transition_domain.organization_name = org_info.orgname
@ -538,9 +505,7 @@ class LoadExtraTransitionDomain:
return transition_domain
def _add_or_change_message(
self, file_type, var_name, changed_value, domain_name, is_update=False
):
def _add_or_change_message(self, file_type, var_name, changed_value, domain_name, is_update=False):
"""Creates a log instance when a property
is successfully changed on a given TransitionDomain."""
if not is_update:
@ -744,6 +709,10 @@ class FileDataHolder:
# Object data #
self.data: Dict[str, type] = {}
# This is used ONLY for development purposes. This behaviour
# is controlled by the --infer_filename flag which is defaulted
# to false. The purpose of this check is to speed up development,
# but it cannot be used by the enduser
def try_infer_filename(self, current_file_name, default_file_name):
"""Tries to match a given filename to a regex,
then uses that match to generate the filename."""
@ -909,7 +878,6 @@ class ExtraTransitionDomain:
infer_filenames: bool -> Determines if we should try to
infer the filename if a default is passed in
"""
self.clear_file_data()
for name, value in self.file_data.items():
is_domain_escrow = name == EnumFilenames.DOMAIN_ESCROW
filename = f"{value.filename}"
@ -924,8 +892,9 @@ class ExtraTransitionDomain:
)
else:
if not infer_filenames:
logger.error(f"Could not find file: {filename}")
continue
raise FileNotFoundError(
f"{TerminalColors.FAIL}" f"Could not find file {filename} for {name}" f"{TerminalColors.ENDC}"
)
# Infer filename logic #
# This mode is used for
@ -956,25 +925,22 @@ class ExtraTransitionDomain:
is_domain_escrow,
)
continue
# Log if we can't find the desired file
logger.error(f"Could not find file: {filename}")
raise FileNotFoundError(
f"{TerminalColors.FAIL}" f"Could not find file {filename} for {name}" f"{TerminalColors.ENDC}"
)
def clear_file_data(self):
for item in self.file_data.values():
file_type: FileDataHolder = item
file_type.data = {}
def parse_csv_file(
self, file, seperator, dataclass_type, id_field, is_domain_escrow=False
):
def parse_csv_file(self, file, seperator, dataclass_type, id_field, is_domain_escrow=False):
# Domain escrow is an edge case
if is_domain_escrow:
item_to_return = self._read_domain_escrow(file, seperator)
return item_to_return
else:
item_to_return = self._read_csv_file(
file, seperator, dataclass_type, id_field
)
item_to_return = self._read_csv_file(file, seperator, dataclass_type, id_field)
return item_to_return
# Domain escrow is an edgecase given that its structured differently data-wise.
@ -989,9 +955,7 @@ class ExtraTransitionDomain:
creation_date = datetime.strptime(row[7], date_format)
expiration_date = datetime.strptime(row[11], date_format)
dict_data[domain_name] = DomainEscrow(
domain_name, creation_date, expiration_date
)
dict_data[domain_name] = DomainEscrow(domain_name, creation_date, expiration_date)
return dict_data
def _grab_row_id(self, row, id_field, file, dataclass_type):
@ -1024,9 +988,7 @@ class ExtraTransitionDomain:
f"Found bad data in {file}. Attempting to clean."
f"{TerminalColors.ENDC}"
)
updated_file_content = self.replace_bad_seperators(
file, f"{seperator}", ";badseperator;"
)
updated_file_content = self.replace_bad_seperators(file, f"{seperator}", ";badseperator;")
dict_data = {}
break
@ -1040,11 +1002,7 @@ class ExtraTransitionDomain:
# After we clean the data, try to parse it again
if updated_file_content:
logger.info(
f"{TerminalColors.MAGENTA}"
f"Retrying load for {file}"
f"{TerminalColors.ENDC}"
)
logger.info(f"{TerminalColors.MAGENTA}" f"Retrying load for {file}" f"{TerminalColors.ENDC}")
# Store the file locally rather than writing to the file.
# This is to avoid potential data corruption.
updated_file = io.StringIO(updated_file_content)
@ -1055,9 +1013,7 @@ class ExtraTransitionDomain:
# is wrong with the file.
if None in row:
logger.error(
f"{TerminalColors.FAIL}"
f"Corrupt data found for {row_id}. Skipping."
f"{TerminalColors.ENDC}"
f"{TerminalColors.FAIL}" f"Corrupt data found for {row_id}. Skipping." f"{TerminalColors.ENDC}"
)
continue