Parsing org data

This commit is contained in:
zandercymatics 2023-11-16 13:16:01 -07:00
parent e0e0950d3e
commit 35a0ed751e
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
5 changed files with 418 additions and 48 deletions

View file

@ -751,6 +751,257 @@ class FileDataHolder:
full_filename = date + "." + filename_without_date
return (full_filename, can_infer)
class OrganizationDataLoader:
"""Saves organization data onto Transition Domains. Handles file parsing."""
def __init__(self, options: TransitionDomainArguments):
# Globally stores event logs and organizes them
self.parse_logs = FileTransitionLog()
self.debug = options.debug
options.pattern_map_params = [
(
EnumFilenames.DOMAIN_ADDITIONAL,
options.domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
options.organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
]
# Reads and parses organization data
self.parsed_data = ExtraTransitionDomain(options)
# options.infer_filenames will always be false when not SETTING.DEBUG
self.parsed_data.parse_all_files(options.infer_filenames)
self.tds_to_update = []
self.tds_failed_to_update = []
def update_organization_data_for_all(self):
"""Updates org data for all TransitionDomains"""
all_transition_domains = TransitionDomain.objects.all()
if len(all_transition_domains) < 1:
logger.error(
f"{TerminalColors.FAIL}"
"No TransitionDomains exist. Cannot update."
f"{TerminalColors.ENDC}"
)
return None
# Store all actions we want to perform in tds_to_update
self.prepare_transition_domains(all_transition_domains)
# Then if we don't run into any exceptions, bulk_update it
self.bulk_update_transition_domains(self.tds_to_update)
def prepare_transition_domains(self, transition_domains):
for item in transition_domains:
try:
updated = self.parse_org_data(item.domain_name, item)
self.tds_to_update.append(updated)
if self.debug:
logger.info(item.display_transition_domain())
logger.info(
f"{TerminalColors.OKCYAN}"
f"{item.display_transition_domain()}"
f"{TerminalColors.ENDC}"
)
except Exception as err:
logger.error(err)
self.tds_failed_to_update.append(item)
if self.debug:
logger.error(
f"{TerminalColors.YELLOW}"
f"{item.display_transition_domain()}"
f"{TerminalColors.ENDC}"
)
if len(self.tds_failed_to_update) > 0:
logger.error(
"Failed to update. An exception was encountered "
f"on the following TransitionDomains: {[item for item in self.tds_failed_to_update]}"
)
raise Exception("Failed to update TransitionDomains")
if not self.debug:
logger.info(
f"Ready to update {len(self.tds_to_update)} TransitionDomains."
)
else:
logger.info(
f"Ready to update {len(self.tds_to_update)} TransitionDomains: {[item for item in self.tds_failed_to_update]}"
)
def bulk_update_transition_domains(self, update_list):
logger.info(
f"{TerminalColors.MAGENTA}"
"Beginning mass TransitionDomain update..."
f"{TerminalColors.ENDC}"
)
changed_fields = [
"address_line",
"city",
"state_territory",
"zipcode",
"country_code",
]
TransitionDomain.objects.bulk_update(update_list, changed_fields)
if not self.debug:
logger.info(
f"{TerminalColors.OKGREEN}"
f"Updated {len(self.tds_to_update)} TransitionDomains."
f"{TerminalColors.ENDC}"
)
else:
logger.info(
f"{TerminalColors.OKGREEN}"
f"Updated {len(self.tds_to_update)} TransitionDomains: {[item for item in self.tds_failed_to_update]}"
f"{TerminalColors.ENDC}"
)
def parse_org_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain:
"""Grabs organization_name from the parsed files and associates it
with a transition_domain object, then returns that object."""
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
org_info = self.get_org_info(domain_name)
if org_info is None:
self.parse_logs.create_log_item(
EnumFilenames.ORGANIZATION_ADHOC,
LogCode.ERROR,
f"Could not add organization_name on {domain_name}, no data exists.",
domain_name,
not self.debug,
)
return transition_domain
# Add street info
transition_domain.address_line = org_info.orgstreet
transition_domain.city = org_info.orgcity
transition_domain.state_territory = org_info.orgstate
transition_domain.zipcode = org_info.orgzip
transition_domain.country_code = org_info.orgcountrycode
# Log what happened to each field
changed_fields = [
("address_line", transition_domain.address_line),
("city", transition_domain.city),
("state_territory", transition_domain.state_territory),
("zipcode", transition_domain.zipcode),
("country_code", transition_domain.country_code),
]
self.log_add_or_changed_values(EnumFilenames.AUTHORITY_ADHOC, changed_fields, domain_name)
return transition_domain
def get_org_info(self, domain_name) -> OrganizationAdhoc:
"""Maps an id given in get_domain_data to a organization_adhoc
record which has its corresponding definition"""
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
org_id = domain_info.orgid
return self.get_organization_adhoc(org_id)
def get_organization_adhoc(self, desired_id) -> OrganizationAdhoc:
"""Grabs a corresponding row within the ORGANIZATION_ADHOC file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.ORGANIZATION_ADHOC, desired_id)
def get_domain_data(self, desired_id) -> DomainAdditionalData:
"""Grabs a corresponding row within the DOMAIN_ADDITIONAL file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.DOMAIN_ADDITIONAL, desired_id)
def get_object_by_id(self, file_type: EnumFilenames, desired_id):
"""Returns a field in a dictionary based off the type and id.
vars:
file_type: (constant) EnumFilenames -> Which data file to target.
An example would be `EnumFilenames.DOMAIN_ADHOC`.
desired_id: str -> Which id you want to search on.
An example would be `"12"` or `"igorville.gov"`
Explanation:
Each data file has an associated type (file_type) for tracking purposes.
Each file_type is a dictionary which
contains a dictionary of row[id_field]: object.
In practice, this would look like:
EnumFilenames.AUTHORITY_ADHOC: {
"1": AuthorityAdhoc(...),
"2": AuthorityAdhoc(...),
...
}
desired_id will then specify which id to grab. If we wanted "1",
then this function will return the value of id "1".
So, `AuthorityAdhoc(...)`
"""
# Grabs a dict associated with the file_type.
# For example, EnumFilenames.DOMAIN_ADDITIONAL.
desired_type = self.parsed_data.file_data.get(file_type)
if desired_type is None:
self.parse_logs.create_log_item(
file_type,
LogCode.ERROR,
f"Type {file_type} does not exist",
)
return None
# Grab the value given an Id within that file_type dict.
# For example, "igorville.gov".
obj = desired_type.data.get(desired_id)
if obj is None:
self.parse_logs.create_log_item(
file_type,
LogCode.ERROR,
f"Id {desired_id} does not exist for {file_type.value[0]}",
)
return obj
def log_add_or_changed_values(self, file_type, values_to_check, domain_name):
for field_name, value in values_to_check:
str_exists = value is not None and value.strip() != ""
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
file_type,
field_name,
value,
domain_name,
str_exists,
)
def _add_or_change_message(self, file_type, var_name, changed_value, domain_name, is_update=False):
"""Creates a log instance when a property
is successfully changed on a given TransitionDomain."""
if not is_update:
self.parse_logs.create_log_item(
file_type,
LogCode.INFO,
f"Added {var_name} as '{changed_value}' on {domain_name}",
domain_name,
not self.debug,
)
else:
self.parse_logs.create_log_item(
file_type,
LogCode.WARNING,
f"Updated existing {var_name} to '{changed_value}' on {domain_name}",
domain_name,
not self.debug,
)
class ExtraTransitionDomain:
"""Helper class to aid in storing TransitionDomain data spread across
@ -775,52 +1026,49 @@ class ExtraTransitionDomain:
# metadata about each file and associate it with an enum.
# That way if we want the data located at the agency_adhoc file,
# we can just call EnumFilenames.AGENCY_ADHOC.
pattern_map_params = [
(
EnumFilenames.AGENCY_ADHOC,
options.agency_adhoc_filename,
AgencyAdhoc,
"agencyid",
),
(
EnumFilenames.DOMAIN_ADDITIONAL,
options.domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.DOMAIN_ESCROW,
options.domain_escrow_filename,
DomainEscrow,
"domainname",
),
(
EnumFilenames.DOMAIN_ADHOC,
options.domain_adhoc_filename,
DomainTypeAdhoc,
"domaintypeid",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
options.organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
]
if (
options.pattern_map_params is None or options.pattern_map_params == []
):
options.pattern_map_params = [
(
EnumFilenames.AGENCY_ADHOC,
options.agency_adhoc_filename,
AgencyAdhoc,
"agencyid",
),
(
EnumFilenames.DOMAIN_ADDITIONAL,
options.domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.DOMAIN_ESCROW,
options.domain_escrow_filename,
DomainEscrow,
"domainname",
),
(
EnumFilenames.DOMAIN_ADHOC,
options.domain_adhoc_filename,
DomainTypeAdhoc,
"domaintypeid",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
options.organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
]
self.file_data = self.populate_file_data(pattern_map_params)
self.file_data = self.populate_file_data(options.pattern_map_params)
# TODO - revise comment
def populate_file_data(self, pattern_map_params):