This commit is contained in:
zandercymatics 2023-10-26 14:43:39 -06:00
parent bbc3ce7f45
commit 00f44f2f84
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
2 changed files with 85 additions and 86 deletions

View file

@ -65,8 +65,9 @@ class Command(BaseCommand):
) )
parser.add_argument("--sep", default="|", help="Delimiter character") parser.add_argument("--sep", default="|", help="Delimiter character")
def handle(self, *args, **options): def handle(self, **options):
self.data = ExtraTransitionDomain( try:
self.domain_object = ExtraTransitionDomain(
agency_adhoc_filename=options['agency_adhoc_filename'], agency_adhoc_filename=options['agency_adhoc_filename'],
domain_additional_filename=options['domain_additional_filename'], domain_additional_filename=options['domain_additional_filename'],
domain_adhoc_filename=options['domain_adhoc_filename'], domain_adhoc_filename=options['domain_adhoc_filename'],
@ -74,5 +75,9 @@ class Command(BaseCommand):
directory=options['directory'], directory=options['directory'],
seperator=options['sep'] seperator=options['sep']
) )
self.domain_object.parse_all_files()
except Exception as err:
logger.error(f"Could not load additional data. Error: {err}")
else:

View file

@ -13,104 +13,98 @@ from epp_data_containers import AgencyAdhoc, DomainAdditionalData, DomainTypeAdh
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class EnumFilenames(Enum): class EnumFilenames(Enum):
AGENCY_ADHOC = "agency.adhoc.dotgov.txt" """Returns a tuple mapping for (filetype, default_file_name).
DOMAIN_ADDITIONAL = "domainadditionaldatalink.adhoc.dotgov.txt"
DOMAIN_ADHOC = "domaintypes.adhoc.dotgov.txt" For instance, AGENCY_ADHOC = ("agency_adhoc", "agency.adhoc.dotgov.txt")
ORGANIZATION_ADHOC = "organization.adhoc.dotgov.txt" """
AGENCY_ADHOC = ("agency_adhoc", "agency.adhoc.dotgov.txt")
DOMAIN_ADDITIONAL = ("domain_additional", "domainadditionaldatalink.adhoc.dotgov.txt")
DOMAIN_ADHOC = ("domain_adhoc", "domaintypes.adhoc.dotgov.txt")
ORGANIZATION_ADHOC = ("organization_adhoc", "organization.adhoc.dotgov.txt")
@dataclass @dataclass
class PatternMap(): class PatternMap():
def __init__(self, filename, regex, datatype):
self.filename = filename
self.regex = regex
self.datatype = datatype
def __init__(self, filename: str, regex, data_type, data=[]):
self.regex = regex
self.data_type = data_type
self.data = data
# returns (filename, inferred_successfully)
_infer = self._infer_filename(self.regex, filename)
self.filename = _infer[0]
self.could_infer = _infer[1]
def _infer_filename(self, regex, default_file_name):
if not isinstance(regex, re.Pattern):
return (self.filename, False)
match = regex.match(self.filename)
if not match:
return (self.filename, False)
date = match.group(1)
filename_without_date = match.group(2)
can_infer = filename_without_date == default_file_name
if not can_infer:
return (self.filename, False)
full_filename = date + filename_without_date
return (full_filename, can_infer)
class ExtraTransitionDomain(): class ExtraTransitionDomain():
filenames = EnumFilenames filenames = EnumFilenames
strip_date_regex = re.compile(r'\d+\.(.+)') strip_date_regex = re.compile(r'\d+\.(.+)')
filename_pattern_mapping = {
# filename - regex to use when encountered
filenames.AGENCY_ADHOC: strip_date_regex,
filenames.DOMAIN_ADDITIONAL: strip_date_regex,
filenames.DOMAIN_ADHOC: strip_date_regex,
filenames.ORGANIZATION_ADHOC: strip_date_regex
}
def __init__(self, def __init__(self,
agency_adhoc_filename=filenames.AGENCY_ADHOC, agency_adhoc_filename=filenames.AGENCY_ADHOC[1],
domain_additional_filename=filenames.DOMAIN_ADDITIONAL, domain_additional_filename=filenames.DOMAIN_ADDITIONAL[1],
domain_adhoc_filename=filenames.DOMAIN_ADHOC, domain_adhoc_filename=filenames.DOMAIN_ADHOC[1],
organization_adhoc_filename=filenames.ORGANIZATION_ADHOC, organization_adhoc_filename=filenames.ORGANIZATION_ADHOC[1],
directory="migrationdata", directory="migrationdata",
seperator="|" seperator="|"
): ):
self.directory = directory self.directory = directory
self.seperator = seperator self.seperator = seperator
self.all_files = glob.glob(f"{directory}/*") self.all_files = glob.glob(f"{directory}/*")
self.filename_dicts = [] # Create a set with filenames as keys for quick lookup
self.all_files_set = {os.path.basename(file) for file in self.all_files}
self.agency_adhoc: List[AgencyAdhoc] = [] self.csv_data = {
self.domain_additional: List[DomainAdditionalData] = [] self.filenames.AGENCY_ADHOC: PatternMap(agency_adhoc_filename, self.strip_date_regex, AgencyAdhoc),
self.domain_adhoc: List[DomainTypeAdhoc] = [] self.filenames.DOMAIN_ADDITIONAL: PatternMap(domain_additional_filename, self.strip_date_regex, DomainAdditionalData),
self.organization_adhoc: List[OrganizationAdhoc] = [] self.filenames.DOMAIN_ADHOC: PatternMap(domain_adhoc_filename, self.strip_date_regex, DomainTypeAdhoc),
self.filenames.ORGANIZATION_ADHOC: PatternMap(organization_adhoc_filename, self.strip_date_regex, OrganizationAdhoc)
}
# Generate filename dictionaries
for filename, enum_pair in [
(agency_adhoc_filename, self.filenames.AGENCY_ADHOC),
(domain_additional_filename, self.filenames.DOMAIN_ADDITIONAL),
(domain_adhoc_filename, self.filenames.DOMAIN_ADHOC),
(organization_adhoc_filename, self.filenames.ORGANIZATION_ADHOC)
]:
# Generates a dictionary that associates the enum type to
# the requested filename, and checks if its the default type.
self.filename_dicts.append(self._create_filename_dict(filename, enum_pair))
def parse_all_files(self, seperator): def parse_all_files(self):
for file in self.all_files: """Clears all preexisting data then parses each related CSV file"""
filename = os.path.basename(file) self.clear_csv_data()
for item in self.filename_dicts: for item in self.csv_data:
if filename == item.get("filename"): file_type: PatternMap = item.value
match item.get("default_filename"): filename = file_type.filename
case self.filenames.AGENCY_ADHOC:
self.agency_adhoc = self._read_csv_file(filename, seperator, AgencyAdhoc) if filename in self.all_files_set:
case self.filenames.DOMAIN_ADDITIONAL: file_type.data = self._read_csv_file(
self.domain_additional = self._read_csv_file(filename, seperator, DomainAdditionalData) self.all_files_set[filename],
case self.filenames.DOMAIN_ADHOC: self.seperator,
self.domain_adhoc = self._read_csv_file(filename, seperator, DomainTypeAdhoc) file_type.data_type
case self.filenames.ORGANIZATION_ADHOC: )
self.organization_adhoc = self._read_csv_file(filename, seperator, OrganizationAdhoc) else:
case _: # Log if we can't find the desired file
logger.warning("Could not find default mapping") logger.warning(f"Could not find file: {filename}")
break
def clear_csv_data(self):
for item in self.csv_data:
file_type: PatternMap = item.value
file_type.data = []
def _read_csv_file(self, file, seperator, dataclass_type): def _read_csv_file(self, file, seperator, dataclass_type):
with open(file, "r", encoding="utf-8") as requested_file: with open(file, "r", encoding="utf-8") as requested_file:
reader = csv.DictReader(requested_file, delimiter=seperator) reader = csv.DictReader(requested_file, delimiter=seperator)
return [dataclass_type(**row) for row in reader] return [dataclass_type(**row) for row in reader]
def _create_filename_dict(self, filename, default_filename):
regex = self.filename_pattern_mapping.get(filename)
# returns (filename, inferred_successfully)
infer = self._infer_filename(regex, filename)
filename_dict = {
"filename": infer[0],
"default_filename": default_filename,
"is_default": filename == default_filename,
"could_infer": infer[1]
}
return filename_dict
def _infer_filename(self, regex, current_file_name):
if regex is None:
return (current_file_name, False)
match = regex.match(current_file_name)
if match is None:
return (None, False)
filename_without_date = match.group(1)
return (match, filename_without_date == current_file_name)