diff --git a/src/registrar/management/commands/load_extra_transition_domain.py b/src/registrar/management/commands/load_extra_transition_domain.py index 77ca55291..307a3b3c7 100644 --- a/src/registrar/management/commands/load_extra_transition_domain.py +++ b/src/registrar/management/commands/load_extra_transition_domain.py @@ -65,14 +65,19 @@ class Command(BaseCommand): ) parser.add_argument("--sep", default="|", help="Delimiter character") - def handle(self, *args, **options): - self.data = ExtraTransitionDomain( - agency_adhoc_filename=options['agency_adhoc_filename'], - domain_additional_filename=options['domain_additional_filename'], - domain_adhoc_filename=options['domain_adhoc_filename'], - organization_adhoc_filename=options['organization_adhoc_filename'], - directory=options['directory'], - seperator=options['sep'] - ) - + def handle(self, **options): + try: + self.domain_object = ExtraTransitionDomain( + agency_adhoc_filename=options['agency_adhoc_filename'], + domain_additional_filename=options['domain_additional_filename'], + domain_adhoc_filename=options['domain_adhoc_filename'], + organization_adhoc_filename=options['organization_adhoc_filename'], + directory=options['directory'], + seperator=options['sep'] + ) + self.domain_object.parse_all_files() + except Exception as err: + logger.error(f"Could not load additional data. Error: {err}") + else: + diff --git a/src/registrar/management/commands/utility/extra_transition_domain.py b/src/registrar/management/commands/utility/extra_transition_domain.py index 9b9c27ac6..39dde48b2 100644 --- a/src/registrar/management/commands/utility/extra_transition_domain.py +++ b/src/registrar/management/commands/utility/extra_transition_domain.py @@ -13,104 +13,98 @@ from epp_data_containers import AgencyAdhoc, DomainAdditionalData, DomainTypeAdh logger = logging.getLogger(__name__) class EnumFilenames(Enum): - AGENCY_ADHOC = "agency.adhoc.dotgov.txt" - DOMAIN_ADDITIONAL = "domainadditionaldatalink.adhoc.dotgov.txt" - DOMAIN_ADHOC = "domaintypes.adhoc.dotgov.txt" - ORGANIZATION_ADHOC = "organization.adhoc.dotgov.txt" + """Returns a tuple mapping for (filetype, default_file_name). + + For instance, AGENCY_ADHOC = ("agency_adhoc", "agency.adhoc.dotgov.txt") + """ + AGENCY_ADHOC = ("agency_adhoc", "agency.adhoc.dotgov.txt") + DOMAIN_ADDITIONAL = ("domain_additional", "domainadditionaldatalink.adhoc.dotgov.txt") + DOMAIN_ADHOC = ("domain_adhoc", "domaintypes.adhoc.dotgov.txt") + ORGANIZATION_ADHOC = ("organization_adhoc", "organization.adhoc.dotgov.txt") @dataclass class PatternMap(): - def __init__(self, filename, regex, datatype): - self.filename = filename - self.regex = regex - self.datatype = datatype + def __init__(self, filename: str, regex, data_type, data=[]): + self.regex = regex + self.data_type = data_type + self.data = data + + # returns (filename, inferred_successfully) + _infer = self._infer_filename(self.regex, filename) + self.filename = _infer[0] + self.could_infer = _infer[1] + + + def _infer_filename(self, regex, default_file_name): + if not isinstance(regex, re.Pattern): + return (self.filename, False) + + match = regex.match(self.filename) + if not match: + return (self.filename, False) + + date = match.group(1) + filename_without_date = match.group(2) + + can_infer = filename_without_date == default_file_name + if not can_infer: + return (self.filename, False) + + full_filename = date + filename_without_date + return (full_filename, can_infer) class ExtraTransitionDomain(): filenames = EnumFilenames strip_date_regex = re.compile(r'\d+\.(.+)') - filename_pattern_mapping = { - # filename - regex to use when encountered - filenames.AGENCY_ADHOC: strip_date_regex, - filenames.DOMAIN_ADDITIONAL: strip_date_regex, - filenames.DOMAIN_ADHOC: strip_date_regex, - filenames.ORGANIZATION_ADHOC: strip_date_regex - } def __init__(self, - agency_adhoc_filename=filenames.AGENCY_ADHOC, - domain_additional_filename=filenames.DOMAIN_ADDITIONAL, - domain_adhoc_filename=filenames.DOMAIN_ADHOC, - organization_adhoc_filename=filenames.ORGANIZATION_ADHOC, + agency_adhoc_filename=filenames.AGENCY_ADHOC[1], + domain_additional_filename=filenames.DOMAIN_ADDITIONAL[1], + domain_adhoc_filename=filenames.DOMAIN_ADHOC[1], + organization_adhoc_filename=filenames.ORGANIZATION_ADHOC[1], directory="migrationdata", seperator="|" ): self.directory = directory self.seperator = seperator self.all_files = glob.glob(f"{directory}/*") - self.filename_dicts = [] + # Create a set with filenames as keys for quick lookup + self.all_files_set = {os.path.basename(file) for file in self.all_files} - self.agency_adhoc: List[AgencyAdhoc] = [] - self.domain_additional: List[DomainAdditionalData] = [] - self.domain_adhoc: List[DomainTypeAdhoc] = [] - self.organization_adhoc: List[OrganizationAdhoc] = [] + self.csv_data = { + self.filenames.AGENCY_ADHOC: PatternMap(agency_adhoc_filename, self.strip_date_regex, AgencyAdhoc), + self.filenames.DOMAIN_ADDITIONAL: PatternMap(domain_additional_filename, self.strip_date_regex, DomainAdditionalData), + self.filenames.DOMAIN_ADHOC: PatternMap(domain_adhoc_filename, self.strip_date_regex, DomainTypeAdhoc), + self.filenames.ORGANIZATION_ADHOC: PatternMap(organization_adhoc_filename, self.strip_date_regex, OrganizationAdhoc) + } - # Generate filename dictionaries - for filename, enum_pair in [ - (agency_adhoc_filename, self.filenames.AGENCY_ADHOC), - (domain_additional_filename, self.filenames.DOMAIN_ADDITIONAL), - (domain_adhoc_filename, self.filenames.DOMAIN_ADHOC), - (organization_adhoc_filename, self.filenames.ORGANIZATION_ADHOC) - ]: - # Generates a dictionary that associates the enum type to - # the requested filename, and checks if its the default type. - self.filename_dicts.append(self._create_filename_dict(filename, enum_pair)) - def parse_all_files(self, seperator): - for file in self.all_files: - filename = os.path.basename(file) - for item in self.filename_dicts: - if filename == item.get("filename"): - match item.get("default_filename"): - case self.filenames.AGENCY_ADHOC: - self.agency_adhoc = self._read_csv_file(filename, seperator, AgencyAdhoc) - case self.filenames.DOMAIN_ADDITIONAL: - self.domain_additional = self._read_csv_file(filename, seperator, DomainAdditionalData) - case self.filenames.DOMAIN_ADHOC: - self.domain_adhoc = self._read_csv_file(filename, seperator, DomainTypeAdhoc) - case self.filenames.ORGANIZATION_ADHOC: - self.organization_adhoc = self._read_csv_file(filename, seperator, OrganizationAdhoc) - case _: - logger.warning("Could not find default mapping") - break + def parse_all_files(self): + """Clears all preexisting data then parses each related CSV file""" + self.clear_csv_data() + for item in self.csv_data: + file_type: PatternMap = item.value + filename = file_type.filename + + if filename in self.all_files_set: + file_type.data = self._read_csv_file( + self.all_files_set[filename], + self.seperator, + file_type.data_type + ) + else: + # Log if we can't find the desired file + logger.warning(f"Could not find file: {filename}") + + + def clear_csv_data(self): + for item in self.csv_data: + file_type: PatternMap = item.value + file_type.data = [] def _read_csv_file(self, file, seperator, dataclass_type): with open(file, "r", encoding="utf-8") as requested_file: reader = csv.DictReader(requested_file, delimiter=seperator) return [dataclass_type(**row) for row in reader] - - def _create_filename_dict(self, filename, default_filename): - regex = self.filename_pattern_mapping.get(filename) - - # returns (filename, inferred_successfully) - infer = self._infer_filename(regex, filename) - filename_dict = { - "filename": infer[0], - "default_filename": default_filename, - "is_default": filename == default_filename, - "could_infer": infer[1] - } - return filename_dict - - def _infer_filename(self, regex, current_file_name): - if regex is None: - return (current_file_name, False) - - match = regex.match(current_file_name) - - if match is None: - return (None, False) - - filename_without_date = match.group(1) - return (match, filename_without_date == current_file_name)