manage.get.gov/src/registrar/management/commands/utility/extra_transition_domain_helper.py
2023-11-07 19:35:22 -06:00

950 lines
No EOL
35 KiB
Python

""""""
import csv
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import glob
import re
import logging
import os
from typing import List, Tuple
from registrar.models.transition_domain import TransitionDomain
from .epp_data_containers import (
AgencyAdhoc,
DomainAdditionalData,
DomainEscrow,
DomainTypeAdhoc,
OrganizationAdhoc,
AuthorityAdhoc,
EnumFilenames,
)
from .transition_domain_arguments import TransitionDomainArguments
from .terminal_helper import TerminalColors, TerminalHelper, LogCode
logger = logging.getLogger(__name__)
class FileTransitionLog:
"""Container for storing event logs. Used to lessen
the complexity of storing multiple logs across multiple
variables.
self.logs: dict -> {
EnumFilenames.DOMAIN_ADHOC: List[LogItem],
EnumFilenames.AGENCY_ADHOC: List[LogItem],
EnumFilenames.ORGANIZATION_ADHOC: List[LogItem],
EnumFilenames.DOMAIN_ADDITIONAL: List[LogItem],
}
"""
def __init__(self):
self.logs = {}
class LogItem:
"""Used for storing data about logger information."""
def __init__(self, file_type, code, message, domain_name):
self.file_type = file_type
self.code = code
self.message = message
self.domain_name = domain_name
def add_log(self, file_type, code, message, domain_name):
"""Adds a log item to self.logs
file_type -> Which enum to associate with,
ex. EnumFilenames.DOMAIN_ADHOC
code -> Log severity or other metadata, ex. LogCode.ERROR
message -> Message to display
domain_name -> Name of the domain, i.e. "igorville.gov"
"""
log = self.LogItem(file_type, code, message, domain_name)
dict_name = (file_type, domain_name)
self._add_to_log_list(dict_name, log)
def create_log_item(
self, file_type, code, message, domain_name=None, add_to_list=True, minimal_logging=True
):
"""Creates and returns an LogItem object.
add_to_list: bool -> If enabled, add it to the logs array.
"""
log = self.LogItem(file_type, code, message, domain_name)
if not add_to_list:
return log
dict_name = (file_type, domain_name)
self._add_to_log_list(dict_name, log)
restrict_type = []
if minimal_logging:
restrict_type = [LogCode.INFO, LogCode.WARNING]
TerminalHelper.print_conditional(
log.code not in restrict_type,
log.message,
log.code,
)
return log
def _add_to_log_list(self, log_name, log):
if log_name not in self.logs:
self.logs[log_name] = [log]
else:
self.logs[log_name].append(log)
def display_all_logs(self):
"""Logs every LogItem contained in this object"""
for parent_log in self.logs:
for child_log in parent_log:
TerminalHelper.print_conditional(
True,
child_log.message,
child_log.severity
)
def display_logs_by_domain_name(self, domain_name, restrict_type=LogCode.DEFAULT):
"""Displays all logs of a given domain_name.
Will log with the correct severity depending on code.
domain_name: str -> The domain to target, such as "igorville.gov"
restrict_type: LogCode -> Determines if only errors of a certain
type should be displayed, such as LogCode.ERROR.
"""
for file_type in EnumFilenames:
domain_logs = self.get_logs(file_type, domain_name)
if domain_logs is None:
return None
for log in domain_logs:
TerminalHelper.print_conditional(
restrict_type != log.code,
log.message,
log.code
)
def get_logs(self, file_type, domain_name):
"""Grabs the logs associated with
a particular file_type and domain_name"""
log_name = (file_type, domain_name)
return self.logs.get(log_name)
class LoadExtraTransitionDomain:
"""Grabs additional data for TransitionDomains."""
def __init__(self, options: TransitionDomainArguments):
# Globally stores event logs and organizes them
self.parse_logs = FileTransitionLog()
self.debug = options.debug
# Reads and parses migration files
self.parsed_data_container = ExtraTransitionDomain(options)
self.parsed_data_container.parse_all_files(options.infer_filenames)
def update_transition_domain_models(self):
"""Updates TransitionDomain objects based off the file content
given in self.parsed_data_container"""
all_transition_domains = TransitionDomain.objects.all()
if not all_transition_domains.exists():
raise ValueError("No TransitionDomain objects exist.")
updated_transition_domains = []
failed_transition_domains = []
for transition_domain in all_transition_domains:
domain_name = transition_domain.domain_name
updated_transition_domain = transition_domain
try:
# STEP 1: Parse organization data
updated_transition_domain = self.parse_org_data(
domain_name, transition_domain
)
# STEP 2: Parse domain type data
updated_transition_domain = self.parse_domain_type_data(
domain_name, transition_domain
)
# STEP 3: Parse agency data
updated_transition_domain = self.parse_agency_data(
domain_name, transition_domain
)
# STEP 4: Parse creation and expiration data
updated_transition_domain = self.parse_creation_expiration_data(
domain_name, transition_domain
)
# Check if the instance has changed before saving
#if updated_transition_domain.__dict__ != transition_domain.__dict__:
updated_transition_domain.save()
updated_transition_domains.append(updated_transition_domain)
logger.info(
f"{TerminalColors.OKCYAN}"
f"Successfully updated {domain_name}"
f"{TerminalColors.ENDC}"
)
# If we run into an exception on this domain,
# Just skip over it and log that it happened.
# Q: Should we just throw an exception?
except Exception as err:
logger.debug(err)
logger.error(
f"{TerminalColors.FAIL}"
f"Exception encountered on {domain_name}. Could not update."
f"{TerminalColors.ENDC}"
)
failed_transition_domains.append(domain_name)
if self.debug:
# Display misc errors (not associated to a domain)
self.parse_logs.display_logs_by_domain_name(None)
failed_count = len(failed_transition_domains)
if failed_count == 0:
TerminalHelper.print_conditional(self.debug, f"{TerminalHelper.array_as_string(updated_transition_domains)}")
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============
Updated {len(updated_transition_domains)} transition domain entries:
{TerminalColors.ENDC}
"""
)
else:
# TODO - update
TerminalHelper.print_conditional(self.debug, f"{TerminalHelper.array_as_string(updated_transition_domains)}")
logger.error(
f"""{TerminalColors.FAIL}
============= FINISHED WITH ERRORS ===============
Updated {len(updated_transition_domains)} transition domain entries,
Failed to update {failed_count} transition domain entries
{TerminalColors.ENDC}
"""
)
# TODO
if TransitionDomain.objects.all().count() != len(updated_transition_domains):
logger.error("Something bad happened")
def parse_creation_expiration_data(self, domain_name, transition_domain):
"""Grabs expiration_date from the parsed files and associates it
with a transition_domain object, then returns that object."""
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
info = self.get_domain_escrow_info(domain_name)
if info is None:
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ESCROW,
LogCode.ERROR,
"Could not add epp_creation_date and epp_expiration_date "
f"on {domain_name}, no data exists.",
domain_name,
not self.debug
)
return transition_domain
creation_exists = (
transition_domain.epp_creation_date is not None
)
expiration_exists = (
transition_domain.epp_expiration_date is not None
)
transition_domain.epp_creation_date = info.creationdate
transition_domain.epp_expiration_date = info.expirationdate
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
EnumFilenames.DOMAIN_ESCROW,
"epp_creation_date",
transition_domain.epp_creation_date,
domain_name,
creation_exists,
)
self._add_or_change_message(
EnumFilenames.DOMAIN_ESCROW,
"epp_expiration_date",
transition_domain.epp_expiration_date,
domain_name,
expiration_exists,
)
return transition_domain
def parse_agency_data(self, domain_name, transition_domain) -> TransitionDomain:
"""Grabs federal_agency from the parsed files and associates it
with a transition_domain object, then returns that object."""
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
info = self.get_agency_info(domain_name)
if info is None:
self.parse_logs.create_log_item(
EnumFilenames.AGENCY_ADHOC,
LogCode.ERROR,
f"Could not add federal_agency on {domain_name}, no data exists.",
domain_name,
not self.debug
)
return transition_domain
agency_exists = (
transition_domain.federal_agency is not None
and transition_domain.federal_agency.strip() != ""
)
if not info.active.lower() == "y":
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR,
f"Could not add inactive agency {info.agencyname} on {domain_name}",
domain_name,
not self.debug
)
return transition_domain
if not info.isfederal.lower() == "y":
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR,
f"Could not add non-federal agency {info.agencyname} on {domain_name}",
domain_name,
not self.debug
)
return transition_domain
transition_domain.federal_agency = info.agencyname
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
EnumFilenames.AGENCY_ADHOC,
"federal_agency",
transition_domain.federal_agency,
domain_name,
agency_exists,
)
return transition_domain
def parse_domain_type_data(
self, domain_name, transition_domain: TransitionDomain
) -> TransitionDomain:
"""Grabs organization_type and federal_type from the parsed files
and associates it with a transition_domain object, then returns that object."""
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
info = self.get_domain_type_info(domain_name)
if info is None:
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR,
f"Could not add domain_type on {domain_name}, no data exists.",
domain_name,
not self.debug
)
return transition_domain
# This data is stored as follows: FEDERAL - Judicial
# For all other records, it is stored as so: Interstate
# We can infer if it is federal or not based on this fact.
domain_type = info.domaintype.split("-")
domain_type_length = len(domain_type)
if domain_type_length < 1 or domain_type_length > 2:
raise ValueError("Found invalid data on DOMAIN_ADHOC")
# Then, just grab the organization type.
new_organization_type = domain_type[0].strip()
# Check if this domain_type is active or not.
# If not, we don't want to add this.
if not info.active.lower() == "y":
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR,
f"Could not add inactive domain_type {domain_type[0]} on {domain_name}",
domain_name,
not self.debug
)
return transition_domain
# Are we updating data that already exists,
# or are we adding new data in its place?
organization_type_exists = (
transition_domain.organization_type is not None
and transition_domain.organization_type.strip() != ""
)
federal_type_exists = (
transition_domain.federal_type is not None
and transition_domain.federal_type.strip() != ""
)
# If we get two records, then we know it is federal.
# needs to be lowercase for federal type
is_federal = domain_type_length == 2
if is_federal:
new_federal_type = domain_type[1].strip()
transition_domain.organization_type = new_organization_type
transition_domain.federal_type = new_federal_type
else:
transition_domain.organization_type = new_organization_type
transition_domain.federal_type = None
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
EnumFilenames.DOMAIN_ADHOC,
"organization_type",
transition_domain.organization_type,
domain_name,
organization_type_exists,
)
self._add_or_change_message(
EnumFilenames.DOMAIN_ADHOC,
"federal_type",
transition_domain.federal_type,
domain_name,
federal_type_exists,
)
return transition_domain
def parse_org_data(
self, domain_name, transition_domain: TransitionDomain
) -> TransitionDomain:
"""Grabs organization_name from the parsed files and associates it
with a transition_domain object, then returns that object."""
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
org_info = self.get_org_info(domain_name)
if org_info is None:
self.parse_logs.create_log_item(
EnumFilenames.ORGANIZATION_ADHOC,
LogCode.ERROR,
f"Could not add organization_name on {domain_name}, no data exists.",
domain_name,
not self.debug
)
return transition_domain
desired_property_exists = (
transition_domain.organization_name is not None
and transition_domain.organization_name.strip() != ""
)
transition_domain.organization_name = org_info.orgname
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
EnumFilenames.ORGANIZATION_ADHOC,
"organization_name",
transition_domain.organization_name,
domain_name,
desired_property_exists,
)
return transition_domain
def _add_or_change_message(
self, file_type, var_name, changed_value, domain_name, is_update=False
):
"""Creates a log instance when a property
is successfully changed on a given TransitionDomain."""
if not is_update:
self.parse_logs.create_log_item(
file_type,
LogCode.INFO,
f"Added {var_name} as '{changed_value}' on {domain_name}",
domain_name,
not self.debug
)
else:
self.parse_logs.create_log_item(
file_type,
LogCode.WARNING,
f"Updated existing {var_name} to '{changed_value}' on {domain_name}",
domain_name,
not self.debug
)
# Property getters, i.e. orgid or domaintypeid
def get_org_info(self, domain_name) -> OrganizationAdhoc:
"""Maps an id given in get_domain_data to a organization_adhoc
record which has its corresponding definition"""
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
org_id = domain_info.orgid
return self.get_organization_adhoc(org_id)
def get_domain_type_info(self, domain_name) -> DomainTypeAdhoc:
"""Maps an id given in get_domain_data to a domain_type_adhoc
record which has its corresponding definition"""
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
type_id = domain_info.domaintypeid
return self.get_domain_adhoc(type_id)
def get_agency_info(self, domain_name) -> AgencyAdhoc:
"""Maps an id given in get_domain_data to a agency_adhoc
record which has its corresponding definition"""
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
# The agency record is within the authority adhoc
authority_id = domain_info.authorityid
authority = self.get_authority_adhoc(authority_id)
type_id = None
if authority is not None:
type_id = authority.agencyid
return self.get_agency_adhoc(type_id)
def get_authority_info(self, domain_name):
"""Maps an id given in get_domain_data to a authority_adhoc
record which has its corresponding definition"""
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
type_id = domain_info.authorityid
return self.get_authority_adhoc(type_id)
def get_domain_escrow_info(self, domain_name):
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
type_id = domain_info.domainname
return self.get_domain_escrow(type_id)
# Object getters, i.e. DomainAdditionalData or OrganizationAdhoc
def get_domain_data(self, desired_id) -> DomainAdditionalData:
"""Grabs a corresponding row within the DOMAIN_ADDITIONAL file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.DOMAIN_ADDITIONAL, desired_id)
def get_organization_adhoc(self, desired_id) -> OrganizationAdhoc:
"""Grabs a corresponding row within the ORGANIZATION_ADHOC file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.ORGANIZATION_ADHOC, desired_id)
def get_domain_adhoc(self, desired_id) -> DomainTypeAdhoc:
"""Grabs a corresponding row within the DOMAIN_ADHOC file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.DOMAIN_ADHOC, desired_id)
def get_agency_adhoc(self, desired_id) -> AgencyAdhoc:
"""Grabs a corresponding row within the AGENCY_ADHOC file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.AGENCY_ADHOC, desired_id)
def get_authority_adhoc(self, desired_id) -> AuthorityAdhoc:
"""Grabs a corresponding row within the AUTHORITY_ADHOC file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.AUTHORITY_ADHOC, desired_id)
def get_domain_escrow(self, desired_id) -> DomainEscrow:
"""Grabs a corresponding row within the DOMAIN_ESCROW file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.DOMAIN_ESCROW, desired_id)
# TODO - renamed / needs a return section
def get_object_by_id(self, file_type: EnumFilenames, desired_id):
"""Returns a field in a dictionary based off the type and id.
vars:
file_type: (constant) EnumFilenames -> Which data file to target.
An example would be `EnumFilenames.DOMAIN_ADHOC`.
desired_id: str -> Which id you want to search on.
An example would be `"12"` or `"igorville.gov"`
Explanation:
Each data file has an associated type (file_type) for tracking purposes.
Each file_type is a dictionary which
contains a dictionary of row[id_field]: object.
In practice, this would look like:
EnumFilenames.AUTHORITY_ADHOC: {
"1": AuthorityAdhoc(...),
"2": AuthorityAdhoc(...),
...
}
desired_id will then specify which id to grab. If we wanted "1",
then this function will return the value of id "1".
So, `AuthorityAdhoc(...)`
"""
# Grabs a dict associated with the file_type.
# For example, EnumFilenames.DOMAIN_ADDITIONAL.
desired_type = self.parsed_data_container.file_data.get(file_type)
if desired_type is None:
self.parse_logs.create_log_item(
file_type, LogCode.ERROR, f"Type {file_type} does not exist",
)
return None
# Grab the value given an Id within that file_type dict.
# For example, "igorville.gov".
obj = desired_type.data.get(desired_id)
if obj is None:
self.parse_logs.create_log_item(
file_type, LogCode.ERROR, f"Id {desired_id} does not exist"
)
return obj
# TODO - change name
@dataclass
class PatternMap:
"""Helper class that holds data and metadata about a requested file.
filename: str -> The desired filename to target. If no filename is given,
it is assumed that you are passing in a filename pattern and it will look
for a filename that matches the given postfix you pass in.
regex: re.Pattern -> Defines what regex you want to use when inferring
filenames. If none, no matching occurs.
data_type: type -> Metadata about the desired type for data.
id_field: str -> Defines which field should act as the id in data.
This is necessary as we store lists of "data_type" in ExtraTransitionDomain as follows:
{
id_field: data_type(...),
id_field: data_type(...),
...
}
"""
def __init__(
self,
filename: str,
regex: re.Pattern,
data_type: type,
id_field: str,
):
# Metadata #
## Filename inference metadata ##
self.regex = regex
self.could_infer = False
## "data" object metadata ##
### Where the data is sourced from ###
self.filename = filename
### What type the data is ###
self.data_type = data_type
### What the id should be in the holding dict ###
# TODO - rename to id_field_name
self.id_field = id_field
# Object data #
self.data = {}
def try_infer_filename(self, current_file_name, default_file_name):
"""Tries to match a given filename to a regex,
then uses that match to generate the filename."""
# returns (filename, inferred_successfully)
return self._infer_filename(self.regex, current_file_name, default_file_name)
def _infer_filename(self, regex: re.Pattern, matched_file_name, default_file_name):
if not isinstance(regex, re.Pattern):
return (self.filename, False)
match = regex.match(matched_file_name)
if not match:
return (self.filename, False)
total_groups = len(match.groups())
# If no matches exist or if we have too many
# matches, then we shouldn't infer
if total_groups == 0 or total_groups > 2:
return (self.filename, False)
# If only one match is returned,
# it means that our default matches our request
if total_groups == 1:
return (self.filename, True)
# Otherwise, if two are returned, then
# its likely the pattern we want
date = match.group(1)
filename_without_date = match.group(2)
# After stripping out the date,
# do the two filenames match?
can_infer = filename_without_date == default_file_name
if not can_infer:
return (self.filename, False)
# If they do, recreate the filename and return it
full_filename = date + "." + filename_without_date
return (full_filename, can_infer)
class ExtraTransitionDomain:
"""Helper class to aid in storing TransitionDomain data spread across
multiple files."""
strip_date_regex = re.compile(r"(?:.*\/)?(\d+)\.(.+)")
def __init__(self, options: TransitionDomainArguments):
# Add a slash if the last character isn't one
if options.directory and options.directory[-1] != "/":
options.directory += "/"
self.directory = options.directory
self.seperator = options.sep
self.all_files = glob.glob(f"{self.directory}*")
# Create a set with filenames as keys for quick lookup
self.all_files_set = {os.path.basename(file) for file in self.all_files}
# Used for a container of values at each filename.
# Instead of tracking each in a seperate variable, we can declare
# metadata about each file and associate it with an enum.
# That way if we want the data located at the agency_adhoc file,
# we can just call EnumFilenames.AGENCY_ADHOC.
pattern_map_params = [
(
EnumFilenames.AGENCY_ADHOC,
options.agency_adhoc_filename,
AgencyAdhoc,
"agencyid",
),
(
EnumFilenames.DOMAIN_ADDITIONAL,
options.domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.DOMAIN_ESCROW,
options.domain_escrow_filename,
DomainEscrow,
"domainname",
),
(
EnumFilenames.DOMAIN_ADHOC,
options.domain_adhoc_filename,
DomainTypeAdhoc,
"domaintypeid",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
options.organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
]
self.file_data = self.populate_file_data(pattern_map_params)
# TODO - revise comment
def populate_file_data(
self, pattern_map_params: List[Tuple[EnumFilenames, str, type, str]]
):
"""Populates the self.file_data field given a set
of tuple params.
pattern_map_params must adhere to this format:
[
(file_type, filename, data_type, id_field),
]
vars:
file_type (EnumFilenames) -> The name of the dictionary.
Defined as a value on EnumFilenames, such as
EnumFilenames.AGENCY_ADHOC
filename (str) -> The filepath of the given
"file_type", such as migrationdata/test123.txt
data_type (type) -> The type of data to be read
at the location of the filename. For instance,
each row of test123.txt may return data of type AgencyAdhoc
id_field (str) -> Given the "data_type" of each row,
this specifies what the "id" of that row is.
For example, "agencyid". This is used so we can
store each record in a dictionary rather than
a list of values.
return example:
EnumFilenames.AUTHORITY_ADHOC: PatternMap(
authority_adhoc_filename,
self.strip_date_regex,
AuthorityAdhoc,
"authorityid",
),
"""
file_data = {}
for file_type, filename, data_type, id_field in pattern_map_params:
file_data[file_type] = PatternMap(
filename,
self.strip_date_regex,
data_type,
id_field,
)
return file_data
def parse_all_files(self, infer_filenames=True):
"""Clears all preexisting data then parses each related CSV file.
infer_filenames: bool -> Determines if we should try to
infer the filename if a default is passed in
"""
self.clear_file_data()
for name, value in self.file_data.items():
is_domain_escrow = name == EnumFilenames.DOMAIN_ESCROW
filename = f"{value.filename}"
if filename in self.all_files_set:
_file = f"{self.directory}{value.filename}"
value.data = self.parse_csv_file(
_file,
self.seperator,
value.data_type,
value.id_field,
is_domain_escrow,
)
else:
if not infer_filenames:
logger.error(f"Could not find file: {filename}")
continue
# Infer filename logic #
# This mode is used for internal development use and testing only. Rather than having
# to manually define the filename each time, we can infer what the filename
# actually is.
# Not intended for use outside of that, as it is better to assume
# the end-user wants to be specific.
logger.warning(f"Attempting to infer filename: {filename}")
for filename in self.all_files:
default_name = name.value[1]
match = value.try_infer_filename(filename, default_name)
filename = match[0]
can_infer = match[1]
if can_infer:
break
if filename in self.all_files_set:
logger.info(f"Infer success. Found file {filename}")
_file = f"{self.directory}{filename}"
value.data = self.parse_csv_file(
_file,
self.seperator,
value.data_type,
value.id_field,
is_domain_escrow,
)
continue
# Log if we can't find the desired file
logger.error(f"Could not find file: {filename}")
def clear_file_data(self):
for item in self.file_data.values():
file_type: PatternMap = item
file_type.data = {}
def parse_csv_file(
self,
file,
seperator,
dataclass_type,
id_field,
is_domain_escrow=False
):
# Domain escrow is an edge case
if is_domain_escrow:
item_to_return = self._read_domain_escrow(
file,
seperator
)
return item_to_return
else:
item_to_return = self._read_csv_file(
file,
seperator,
dataclass_type,
id_field
)
return item_to_return
# Domain escrow is an edgecase given that its structured differently data-wise.
def _read_domain_escrow(self, file, seperator):
dict_data = {}
with open(file, "r", encoding="utf-8-sig") as requested_file:
reader = csv.reader(requested_file, delimiter=seperator)
# clean the rows of any whitespace around delimiters
for row in reader: yield (c.strip() for c in row)
for row in reader:
domain_name = row[0]
date_format = "%Y-%m-%dT%H:%M:%SZ"
# TODO - add error handling
creation_date = datetime.strptime(row[7], date_format)
expiration_date = datetime.strptime(row[11], date_format)
dict_data[domain_name] = DomainEscrow(
domain_name, creation_date, expiration_date
)
return dict_data
def _read_csv_file(self, file, seperator, dataclass_type, id_field):
with open(file, "r", encoding="utf-8-sig") as requested_file:
reader = csv.DictReader(requested_file, delimiter=seperator)
dict_data = {}
# clean the rows of any whitespace around delimiters
for row in reader: yield (c.strip() for c in row)
for row in reader:
if None in row:
logger.info("Skipping row with None key")
logger.info(dataclass_type)
for key, value in row.items():
logger.info(f"key: {key} value: {value}")
TerminalHelper.prompt_for_execution(False, "COnintue?", "DEBUG")
continue
row_id = row[id_field]
# To maintain pairity with the load_transition_domain
# script, we store this data in lowercase.
if id_field == "domainname" and row_id is not None:
row_id = row_id.lower()
dict_data[row_id] = dataclass_type(**row)
# dict_data = {row[id_field]: dataclass_type(**row) for row in reader}
return dict_data