diff --git a/src/registrar/management/commands/load_extra_transition_domain.py b/src/registrar/management/commands/load_extra_transition_domain.py new file mode 100644 index 000000000..77ca55291 --- /dev/null +++ b/src/registrar/management/commands/load_extra_transition_domain.py @@ -0,0 +1,78 @@ +"""""" +import csv +import glob +import re +import logging + +import os +from typing import List +from enum import Enum +from django.core.management import BaseCommand +from .utility.extra_transition_domain import ExtraTransitionDomain + + +logger = logging.getLogger(__name__) + +class EnumFilenames(Enum): + AGENCY_ADHOC = "agency.adhoc.dotgov.txt" + DOMAIN_ADDITIONAL = "domainadditionaldatalink.adhoc.dotgov.txt" + DOMAIN_ADHOC = "domaintypes.adhoc.dotgov.txt" + ORGANIZATION_ADHOC = "organization.adhoc.dotgov.txt" + +class Command(BaseCommand): + help = "" + + filenames = EnumFilenames + + strip_date_regex = re.compile(r'\d+\.(.+)') + # While the prefix of these files typically includes the date, + # the rest of them following a predefined pattern. Define this here, + # and search for that to infer what is wanted. + filename_pattern_mapping = { + # filename - regex to use when encountered + filenames.AGENCY_ADHOC: strip_date_regex, + filenames.DOMAIN_ADDITIONAL: strip_date_regex, + filenames.DOMAIN_ADHOC: strip_date_regex, + filenames.ORGANIZATION_ADHOC: strip_date_regex + } + + def add_arguments(self, parser): + """Add filename arguments.""" + parser.add_argument( + "--directory", + default="migrationdata", + help="Desired directory" + ) + parser.add_argument( + "--agency_adhoc_filename", + default=self.filenames.AGENCY_ADHOC, + help="Defines the filename for agency adhocs", + ) + parser.add_argument( + "--domain_additional_filename", + default=self.filenames.DOMAIN_ADDITIONAL, + help="Defines the filename for additional domain data", + ) + parser.add_argument( + "--domain_adhoc_filename", + default=self.filenames.DOMAIN_ADHOC, + help="Defines the filename for domain type adhocs", + ) + parser.add_argument( + "--organization_adhoc_filename", + default=self.filenames.ORGANIZATION_ADHOC, + help="Defines the filename for domain type adhocs", + ) + parser.add_argument("--sep", default="|", help="Delimiter character") + + def handle(self, *args, **options): + self.data = ExtraTransitionDomain( + agency_adhoc_filename=options['agency_adhoc_filename'], + domain_additional_filename=options['domain_additional_filename'], + domain_adhoc_filename=options['domain_adhoc_filename'], + organization_adhoc_filename=options['organization_adhoc_filename'], + directory=options['directory'], + seperator=options['sep'] + ) + + diff --git a/src/registrar/management/commands/load_transition_domain.py b/src/registrar/management/commands/load_transition_domain.py index 206589c33..624418fe9 100644 --- a/src/registrar/management/commands/load_transition_domain.py +++ b/src/registrar/management/commands/load_transition_domain.py @@ -88,6 +88,9 @@ class Command(BaseCommand): parser.add_argument( "domain_statuses_filename", help="Data file with domain status information" ) + parser.add_argument( + "--loadExtraData", default=True, help="Determines if additional metadata should be applied" + ) parser.add_argument("--sep", default="|", help="Delimiter character") @@ -306,6 +309,7 @@ class Command(BaseCommand): ): """Parse the data files and create TransitionDomains.""" sep = options.get("sep") + load_extra_data = options.get("loadExtraData") # If --resetTable was used, prompt user to confirm # deletion of table data @@ -322,6 +326,9 @@ class Command(BaseCommand): # print message to terminal about which args are in use self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse) + + if load_extra_data: + # STEP 1: # Create mapping of domain name -> status diff --git a/src/registrar/management/commands/utility/epp_data_containers.py b/src/registrar/management/commands/utility/epp_data_containers.py new file mode 100644 index 000000000..8bc7a9b4d --- /dev/null +++ b/src/registrar/management/commands/utility/epp_data_containers.py @@ -0,0 +1,41 @@ +from dataclasses import dataclass +from typing import Optional + +@dataclass +class AgencyAdhoc(): + """Defines the structure given in the given AGENCY_ADHOC file""" + agencyid: Optional[int] = None + agencyname: Optional[str] = None + active: Optional[bool] = None + isfederal: Optional[bool] = None + + +@dataclass +class DomainAdditionalData(): + """Defines the structure given in the given DOMAIN_ADDITIONAL file""" + domainname: Optional[str] = None + domaintypeid: Optional[int] = None + authorityid: Optional[int] = None + orgid: Optional[int] = None + securitycontact_email: Optional[str] = None + dnsseckeymonitor: Optional[str] = None + domainpurpose: Optional[str] = None + +@dataclass +class DomainTypeAdhoc(): + """Defines the structure given in the given DOMAIN_ADHOC file""" + domaintypeid: Optional[int] = None + domaintype: Optional[str] = None + code: Optional[str] = None + active: Optional[bool] = None + +@dataclass +class OrganizationAdhoc(): + """Defines the structure given in the given ORGANIZATION_ADHOC file""" + orgid: Optional[int] = None + orgname: Optional[str] = None + orgstreet: Optional[str] = None + orgcity: Optional[str] = None + orgstate: Optional[str] = None + orgzip: Optional[str] = None + orgcountrycode: Optional[str] = None \ No newline at end of file diff --git a/src/registrar/management/commands/utility/extra_transition_domain.py b/src/registrar/management/commands/utility/extra_transition_domain.py new file mode 100644 index 000000000..9b9c27ac6 --- /dev/null +++ b/src/registrar/management/commands/utility/extra_transition_domain.py @@ -0,0 +1,116 @@ +"""""" +import csv +from dataclasses import dataclass +import glob +import re +import logging + +import os +from typing import List +from enum import Enum +from epp_data_containers import AgencyAdhoc, DomainAdditionalData, DomainTypeAdhoc, OrganizationAdhoc + +logger = logging.getLogger(__name__) + +class EnumFilenames(Enum): + AGENCY_ADHOC = "agency.adhoc.dotgov.txt" + DOMAIN_ADDITIONAL = "domainadditionaldatalink.adhoc.dotgov.txt" + DOMAIN_ADHOC = "domaintypes.adhoc.dotgov.txt" + ORGANIZATION_ADHOC = "organization.adhoc.dotgov.txt" + +@dataclass +class PatternMap(): + def __init__(self, filename, regex, datatype): + self.filename = filename + self.regex = regex + self.datatype = datatype + + +class ExtraTransitionDomain(): + filenames = EnumFilenames + strip_date_regex = re.compile(r'\d+\.(.+)') + filename_pattern_mapping = { + # filename - regex to use when encountered + filenames.AGENCY_ADHOC: strip_date_regex, + filenames.DOMAIN_ADDITIONAL: strip_date_regex, + filenames.DOMAIN_ADHOC: strip_date_regex, + filenames.ORGANIZATION_ADHOC: strip_date_regex + } + + def __init__(self, + agency_adhoc_filename=filenames.AGENCY_ADHOC, + domain_additional_filename=filenames.DOMAIN_ADDITIONAL, + domain_adhoc_filename=filenames.DOMAIN_ADHOC, + organization_adhoc_filename=filenames.ORGANIZATION_ADHOC, + directory="migrationdata", + seperator="|" + ): + self.directory = directory + self.seperator = seperator + self.all_files = glob.glob(f"{directory}/*") + self.filename_dicts = [] + + self.agency_adhoc: List[AgencyAdhoc] = [] + self.domain_additional: List[DomainAdditionalData] = [] + self.domain_adhoc: List[DomainTypeAdhoc] = [] + self.organization_adhoc: List[OrganizationAdhoc] = [] + + # Generate filename dictionaries + for filename, enum_pair in [ + (agency_adhoc_filename, self.filenames.AGENCY_ADHOC), + (domain_additional_filename, self.filenames.DOMAIN_ADDITIONAL), + (domain_adhoc_filename, self.filenames.DOMAIN_ADHOC), + (organization_adhoc_filename, self.filenames.ORGANIZATION_ADHOC) + ]: + # Generates a dictionary that associates the enum type to + # the requested filename, and checks if its the default type. + self.filename_dicts.append(self._create_filename_dict(filename, enum_pair)) + + def parse_all_files(self, seperator): + for file in self.all_files: + filename = os.path.basename(file) + for item in self.filename_dicts: + if filename == item.get("filename"): + match item.get("default_filename"): + case self.filenames.AGENCY_ADHOC: + self.agency_adhoc = self._read_csv_file(filename, seperator, AgencyAdhoc) + case self.filenames.DOMAIN_ADDITIONAL: + self.domain_additional = self._read_csv_file(filename, seperator, DomainAdditionalData) + case self.filenames.DOMAIN_ADHOC: + self.domain_adhoc = self._read_csv_file(filename, seperator, DomainTypeAdhoc) + case self.filenames.ORGANIZATION_ADHOC: + self.organization_adhoc = self._read_csv_file(filename, seperator, OrganizationAdhoc) + case _: + logger.warning("Could not find default mapping") + break + + def _read_csv_file(self, file, seperator, dataclass_type): + with open(file, "r", encoding="utf-8") as requested_file: + reader = csv.DictReader(requested_file, delimiter=seperator) + return [dataclass_type(**row) for row in reader] + + + def _create_filename_dict(self, filename, default_filename): + regex = self.filename_pattern_mapping.get(filename) + + # returns (filename, inferred_successfully) + infer = self._infer_filename(regex, filename) + filename_dict = { + "filename": infer[0], + "default_filename": default_filename, + "is_default": filename == default_filename, + "could_infer": infer[1] + } + return filename_dict + + def _infer_filename(self, regex, current_file_name): + if regex is None: + return (current_file_name, False) + + match = regex.match(current_file_name) + + if match is None: + return (None, False) + + filename_without_date = match.group(1) + return (match, filename_without_date == current_file_name)