mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-05-21 11:59:24 +02:00
Changes
This commit is contained in:
parent
225437cdb4
commit
afcb0ec15a
4 changed files with 242 additions and 0 deletions
|
@ -0,0 +1,78 @@
|
||||||
|
""""""
|
||||||
|
import csv
|
||||||
|
import glob
|
||||||
|
import re
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import os
|
||||||
|
from typing import List
|
||||||
|
from enum import Enum
|
||||||
|
from django.core.management import BaseCommand
|
||||||
|
from .utility.extra_transition_domain import ExtraTransitionDomain
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class EnumFilenames(Enum):
|
||||||
|
AGENCY_ADHOC = "agency.adhoc.dotgov.txt"
|
||||||
|
DOMAIN_ADDITIONAL = "domainadditionaldatalink.adhoc.dotgov.txt"
|
||||||
|
DOMAIN_ADHOC = "domaintypes.adhoc.dotgov.txt"
|
||||||
|
ORGANIZATION_ADHOC = "organization.adhoc.dotgov.txt"
|
||||||
|
|
||||||
|
class Command(BaseCommand):
|
||||||
|
help = ""
|
||||||
|
|
||||||
|
filenames = EnumFilenames
|
||||||
|
|
||||||
|
strip_date_regex = re.compile(r'\d+\.(.+)')
|
||||||
|
# While the prefix of these files typically includes the date,
|
||||||
|
# the rest of them following a predefined pattern. Define this here,
|
||||||
|
# and search for that to infer what is wanted.
|
||||||
|
filename_pattern_mapping = {
|
||||||
|
# filename - regex to use when encountered
|
||||||
|
filenames.AGENCY_ADHOC: strip_date_regex,
|
||||||
|
filenames.DOMAIN_ADDITIONAL: strip_date_regex,
|
||||||
|
filenames.DOMAIN_ADHOC: strip_date_regex,
|
||||||
|
filenames.ORGANIZATION_ADHOC: strip_date_regex
|
||||||
|
}
|
||||||
|
|
||||||
|
def add_arguments(self, parser):
|
||||||
|
"""Add filename arguments."""
|
||||||
|
parser.add_argument(
|
||||||
|
"--directory",
|
||||||
|
default="migrationdata",
|
||||||
|
help="Desired directory"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--agency_adhoc_filename",
|
||||||
|
default=self.filenames.AGENCY_ADHOC,
|
||||||
|
help="Defines the filename for agency adhocs",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--domain_additional_filename",
|
||||||
|
default=self.filenames.DOMAIN_ADDITIONAL,
|
||||||
|
help="Defines the filename for additional domain data",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--domain_adhoc_filename",
|
||||||
|
default=self.filenames.DOMAIN_ADHOC,
|
||||||
|
help="Defines the filename for domain type adhocs",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--organization_adhoc_filename",
|
||||||
|
default=self.filenames.ORGANIZATION_ADHOC,
|
||||||
|
help="Defines the filename for domain type adhocs",
|
||||||
|
)
|
||||||
|
parser.add_argument("--sep", default="|", help="Delimiter character")
|
||||||
|
|
||||||
|
def handle(self, *args, **options):
|
||||||
|
self.data = ExtraTransitionDomain(
|
||||||
|
agency_adhoc_filename=options['agency_adhoc_filename'],
|
||||||
|
domain_additional_filename=options['domain_additional_filename'],
|
||||||
|
domain_adhoc_filename=options['domain_adhoc_filename'],
|
||||||
|
organization_adhoc_filename=options['organization_adhoc_filename'],
|
||||||
|
directory=options['directory'],
|
||||||
|
seperator=options['sep']
|
||||||
|
)
|
||||||
|
|
||||||
|
|
|
@ -88,6 +88,9 @@ class Command(BaseCommand):
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"domain_statuses_filename", help="Data file with domain status information"
|
"domain_statuses_filename", help="Data file with domain status information"
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--loadExtraData", default=True, help="Determines if additional metadata should be applied"
|
||||||
|
)
|
||||||
|
|
||||||
parser.add_argument("--sep", default="|", help="Delimiter character")
|
parser.add_argument("--sep", default="|", help="Delimiter character")
|
||||||
|
|
||||||
|
@ -306,6 +309,7 @@ class Command(BaseCommand):
|
||||||
):
|
):
|
||||||
"""Parse the data files and create TransitionDomains."""
|
"""Parse the data files and create TransitionDomains."""
|
||||||
sep = options.get("sep")
|
sep = options.get("sep")
|
||||||
|
load_extra_data = options.get("loadExtraData")
|
||||||
|
|
||||||
# If --resetTable was used, prompt user to confirm
|
# If --resetTable was used, prompt user to confirm
|
||||||
# deletion of table data
|
# deletion of table data
|
||||||
|
@ -322,6 +326,9 @@ class Command(BaseCommand):
|
||||||
|
|
||||||
# print message to terminal about which args are in use
|
# print message to terminal about which args are in use
|
||||||
self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse)
|
self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse)
|
||||||
|
|
||||||
|
if load_extra_data:
|
||||||
|
|
||||||
|
|
||||||
# STEP 1:
|
# STEP 1:
|
||||||
# Create mapping of domain name -> status
|
# Create mapping of domain name -> status
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AgencyAdhoc():
|
||||||
|
"""Defines the structure given in the given AGENCY_ADHOC file"""
|
||||||
|
agencyid: Optional[int] = None
|
||||||
|
agencyname: Optional[str] = None
|
||||||
|
active: Optional[bool] = None
|
||||||
|
isfederal: Optional[bool] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class DomainAdditionalData():
|
||||||
|
"""Defines the structure given in the given DOMAIN_ADDITIONAL file"""
|
||||||
|
domainname: Optional[str] = None
|
||||||
|
domaintypeid: Optional[int] = None
|
||||||
|
authorityid: Optional[int] = None
|
||||||
|
orgid: Optional[int] = None
|
||||||
|
securitycontact_email: Optional[str] = None
|
||||||
|
dnsseckeymonitor: Optional[str] = None
|
||||||
|
domainpurpose: Optional[str] = None
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class DomainTypeAdhoc():
|
||||||
|
"""Defines the structure given in the given DOMAIN_ADHOC file"""
|
||||||
|
domaintypeid: Optional[int] = None
|
||||||
|
domaintype: Optional[str] = None
|
||||||
|
code: Optional[str] = None
|
||||||
|
active: Optional[bool] = None
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class OrganizationAdhoc():
|
||||||
|
"""Defines the structure given in the given ORGANIZATION_ADHOC file"""
|
||||||
|
orgid: Optional[int] = None
|
||||||
|
orgname: Optional[str] = None
|
||||||
|
orgstreet: Optional[str] = None
|
||||||
|
orgcity: Optional[str] = None
|
||||||
|
orgstate: Optional[str] = None
|
||||||
|
orgzip: Optional[str] = None
|
||||||
|
orgcountrycode: Optional[str] = None
|
|
@ -0,0 +1,116 @@
|
||||||
|
""""""
|
||||||
|
import csv
|
||||||
|
from dataclasses import dataclass
|
||||||
|
import glob
|
||||||
|
import re
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import os
|
||||||
|
from typing import List
|
||||||
|
from enum import Enum
|
||||||
|
from epp_data_containers import AgencyAdhoc, DomainAdditionalData, DomainTypeAdhoc, OrganizationAdhoc
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class EnumFilenames(Enum):
|
||||||
|
AGENCY_ADHOC = "agency.adhoc.dotgov.txt"
|
||||||
|
DOMAIN_ADDITIONAL = "domainadditionaldatalink.adhoc.dotgov.txt"
|
||||||
|
DOMAIN_ADHOC = "domaintypes.adhoc.dotgov.txt"
|
||||||
|
ORGANIZATION_ADHOC = "organization.adhoc.dotgov.txt"
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class PatternMap():
|
||||||
|
def __init__(self, filename, regex, datatype):
|
||||||
|
self.filename = filename
|
||||||
|
self.regex = regex
|
||||||
|
self.datatype = datatype
|
||||||
|
|
||||||
|
|
||||||
|
class ExtraTransitionDomain():
|
||||||
|
filenames = EnumFilenames
|
||||||
|
strip_date_regex = re.compile(r'\d+\.(.+)')
|
||||||
|
filename_pattern_mapping = {
|
||||||
|
# filename - regex to use when encountered
|
||||||
|
filenames.AGENCY_ADHOC: strip_date_regex,
|
||||||
|
filenames.DOMAIN_ADDITIONAL: strip_date_regex,
|
||||||
|
filenames.DOMAIN_ADHOC: strip_date_regex,
|
||||||
|
filenames.ORGANIZATION_ADHOC: strip_date_regex
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
agency_adhoc_filename=filenames.AGENCY_ADHOC,
|
||||||
|
domain_additional_filename=filenames.DOMAIN_ADDITIONAL,
|
||||||
|
domain_adhoc_filename=filenames.DOMAIN_ADHOC,
|
||||||
|
organization_adhoc_filename=filenames.ORGANIZATION_ADHOC,
|
||||||
|
directory="migrationdata",
|
||||||
|
seperator="|"
|
||||||
|
):
|
||||||
|
self.directory = directory
|
||||||
|
self.seperator = seperator
|
||||||
|
self.all_files = glob.glob(f"{directory}/*")
|
||||||
|
self.filename_dicts = []
|
||||||
|
|
||||||
|
self.agency_adhoc: List[AgencyAdhoc] = []
|
||||||
|
self.domain_additional: List[DomainAdditionalData] = []
|
||||||
|
self.domain_adhoc: List[DomainTypeAdhoc] = []
|
||||||
|
self.organization_adhoc: List[OrganizationAdhoc] = []
|
||||||
|
|
||||||
|
# Generate filename dictionaries
|
||||||
|
for filename, enum_pair in [
|
||||||
|
(agency_adhoc_filename, self.filenames.AGENCY_ADHOC),
|
||||||
|
(domain_additional_filename, self.filenames.DOMAIN_ADDITIONAL),
|
||||||
|
(domain_adhoc_filename, self.filenames.DOMAIN_ADHOC),
|
||||||
|
(organization_adhoc_filename, self.filenames.ORGANIZATION_ADHOC)
|
||||||
|
]:
|
||||||
|
# Generates a dictionary that associates the enum type to
|
||||||
|
# the requested filename, and checks if its the default type.
|
||||||
|
self.filename_dicts.append(self._create_filename_dict(filename, enum_pair))
|
||||||
|
|
||||||
|
def parse_all_files(self, seperator):
|
||||||
|
for file in self.all_files:
|
||||||
|
filename = os.path.basename(file)
|
||||||
|
for item in self.filename_dicts:
|
||||||
|
if filename == item.get("filename"):
|
||||||
|
match item.get("default_filename"):
|
||||||
|
case self.filenames.AGENCY_ADHOC:
|
||||||
|
self.agency_adhoc = self._read_csv_file(filename, seperator, AgencyAdhoc)
|
||||||
|
case self.filenames.DOMAIN_ADDITIONAL:
|
||||||
|
self.domain_additional = self._read_csv_file(filename, seperator, DomainAdditionalData)
|
||||||
|
case self.filenames.DOMAIN_ADHOC:
|
||||||
|
self.domain_adhoc = self._read_csv_file(filename, seperator, DomainTypeAdhoc)
|
||||||
|
case self.filenames.ORGANIZATION_ADHOC:
|
||||||
|
self.organization_adhoc = self._read_csv_file(filename, seperator, OrganizationAdhoc)
|
||||||
|
case _:
|
||||||
|
logger.warning("Could not find default mapping")
|
||||||
|
break
|
||||||
|
|
||||||
|
def _read_csv_file(self, file, seperator, dataclass_type):
|
||||||
|
with open(file, "r", encoding="utf-8") as requested_file:
|
||||||
|
reader = csv.DictReader(requested_file, delimiter=seperator)
|
||||||
|
return [dataclass_type(**row) for row in reader]
|
||||||
|
|
||||||
|
|
||||||
|
def _create_filename_dict(self, filename, default_filename):
|
||||||
|
regex = self.filename_pattern_mapping.get(filename)
|
||||||
|
|
||||||
|
# returns (filename, inferred_successfully)
|
||||||
|
infer = self._infer_filename(regex, filename)
|
||||||
|
filename_dict = {
|
||||||
|
"filename": infer[0],
|
||||||
|
"default_filename": default_filename,
|
||||||
|
"is_default": filename == default_filename,
|
||||||
|
"could_infer": infer[1]
|
||||||
|
}
|
||||||
|
return filename_dict
|
||||||
|
|
||||||
|
def _infer_filename(self, regex, current_file_name):
|
||||||
|
if regex is None:
|
||||||
|
return (current_file_name, False)
|
||||||
|
|
||||||
|
match = regex.match(current_file_name)
|
||||||
|
|
||||||
|
if match is None:
|
||||||
|
return (None, False)
|
||||||
|
|
||||||
|
filename_without_date = match.group(1)
|
||||||
|
return (match, filename_without_date == current_file_name)
|
Loading…
Add table
Add a link
Reference in a new issue