Prepare for parsing expiration date

This commit is contained in:
zandercymatics 2023-10-31 15:09:19 -06:00
parent 353079e7dd
commit 13172870fb
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
5 changed files with 189 additions and 71 deletions

View file

@ -6,6 +6,7 @@ import argparse
from collections import defaultdict
from django.core.management import BaseCommand
from registrar.management.commands.utility.epp_data_containers import EnumFilenames
from registrar.models import TransitionDomain
@ -14,6 +15,9 @@ from registrar.management.commands.utility.terminal_helper import (
TerminalHelper,
)
from .utility.transition_domain_arguments import TransitionDomainArguments
from .utility.extra_transition_domain import LoadExtraTransitionDomain
logger = logging.getLogger(__name__)
@ -61,6 +65,31 @@ class Command(BaseCommand):
action=argparse.BooleanOptionalAction,
)
# TODO - Narrow this down
parser.add_argument(
"--directory", default="migrationdata", help="Desired directory"
)
parser.add_argument(
"--agency_adhoc_filename",
default=EnumFilenames.AGENCY_ADHOC.value[1],
help="Defines the filename for agency adhocs",
)
parser.add_argument(
"--domain_additional_filename",
default=EnumFilenames.DOMAIN_ADDITIONAL.value[1],
help="Defines the filename for additional domain data",
)
parser.add_argument(
"--domain_adhoc_filename",
default=EnumFilenames.DOMAIN_ADHOC.value[1],
help="Defines the filename for domain type adhocs",
)
parser.add_argument(
"--organization_adhoc_filename",
default=EnumFilenames.ORGANIZATION_ADHOC.value[1],
help="Defines the filename for domain type adhocs",
)
def print_debug_mode_statements(
self, debug_on: bool, debug_max_entries_to_parse: int
):
@ -255,7 +284,6 @@ class Command(BaseCommand):
):
"""Parse the data files and create TransitionDomains."""
sep = options.get("sep")
load_extra_data = options.get("loadExtraData")
# If --resetTable was used, prompt user to confirm
# deletion of table data
@ -286,7 +314,6 @@ class Command(BaseCommand):
# STEP 3:
# Parse the domain_contacts file and create TransitionDomain objects,
# using the dictionaries from steps 1 & 2 to lookup needed information.
to_create = []
# keep track of statuses that don't match our available
@ -472,3 +499,17 @@ class Command(BaseCommand):
duplicate_domain_user_combos, duplicate_domains, users_without_email
)
self.print_summary_status_findings(domains_without_status, outlier_statuses)
# Prompt the user if they want to load additional data on the domains
# TODO - add this logic into the core of this file
arguments = TransitionDomainArguments(**options)
do_parse_extra = TerminalHelper.prompt_for_execution(
True,
"./manage.py test",
"Running load_extra_transition_domains script",
)
if do_parse_extra:
extra = LoadExtraTransitionDomain(arguments)
extra_logs = extra.parse_logs.logs

View file

@ -6,6 +6,7 @@ Not intended to be used as models but rather as an alternative to storing as a d
By keeping it as a dataclass instead of a dictionary, we can maintain data consistency.
"""
from dataclasses import dataclass
from datetime import date
from enum import Enum
from typing import List, Optional
@ -64,6 +65,13 @@ class AuthorityAdhoc:
agencyid: Optional[int] = None
addlinfo: Optional[List[str]] = None
@dataclass
class DomainEscrow:
"""Defines the structure given in the DOMAIN_ESCROW file"""
domainname: Optional[str] = None
creationdate: Optional[date] = None
expirationdate: Optional[date] = None
class EnumFilenames(Enum):
"""Returns a tuple mapping for (filetype, default_file_name).
@ -79,6 +87,7 @@ class EnumFilenames(Enum):
"domain_additional",
"domainadditionaldatalink.adhoc.dotgov.txt",
)
DOMAIN_ESCROW = ("domain_escrow", "escrow_domains.daily.dotgov.GOV.txt")
DOMAIN_ADHOC = ("domain_adhoc", "domaintypes.adhoc.dotgov.txt")
ORGANIZATION_ADHOC = ("organization_adhoc", "organization.adhoc.dotgov.txt")
AUTHORITY_ADHOC = ("authority_adhoc", "authority.adhoc.dotgov.txt")

View file

@ -1,25 +1,29 @@
""""""
import csv
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import glob
import re
import logging
import os
from typing import List
from typing import List, Tuple
from registrar.models.transition_domain import TransitionDomain
from transition_domain_arguments import TransitionDomainArguments
from epp_data_containers import (
from .epp_data_containers import (
AgencyAdhoc,
DomainAdditionalData,
DomainEscrow,
DomainTypeAdhoc,
OrganizationAdhoc,
AuthorityAdhoc,
EnumFilenames,
)
from .transition_domain_arguments import TransitionDomainArguments
logger = logging.getLogger(__name__)
class LogCode(Enum):
@ -105,15 +109,9 @@ class LoadExtraTransitionDomain:
# Stores event logs and organizes them
self.parse_logs = FileTransitionLog()
arguments = options.args_extra_transition_domain()
# Reads and parses migration files
self.domain_object = ExtraTransitionDomain(
agency_adhoc_filename=options.agency_adhoc_filename,
domain_additional_filename=options.domain_additional_filename,
domain_adhoc_filename=options.domain_adhoc_filename,
organization_adhoc_filename=options.organization_adhoc_filename,
directory=options.directory,
seperator=options.seperator,
)
self.domain_object = ExtraTransitionDomain(**arguments)
self.domain_object.parse_all_files()
# Given the data we just parsed, update each
@ -131,6 +129,7 @@ class LoadExtraTransitionDomain:
for transition_domain in all_transition_domains:
domain_name = transition_domain.domain_name.upper()
updated_transition_domain = transition_domain
# STEP 1: Parse organization data
updated_transition_domain = self.parse_org_data(
domain_name, transition_domain
@ -526,59 +525,91 @@ class PatternMap:
class ExtraTransitionDomain:
"""Helper class to aid in storing TransitionDomain data spread across
multiple files."""
filenames = EnumFilenames
#strip_date_regex = re.compile(r"\d+\.(.+)")
strip_date_regex = re.compile(r"(?:.*\/)?(\d+)\.(.+)")
def __init__(
self,
agency_adhoc_filename=filenames.AGENCY_ADHOC.value[1],
domain_additional_filename=filenames.DOMAIN_ADDITIONAL.value[1],
domain_adhoc_filename=filenames.DOMAIN_ADHOC.value[1],
organization_adhoc_filename=filenames.ORGANIZATION_ADHOC.value[1],
authority_adhoc_filename=filenames.AUTHORITY_ADHOC.value[1],
agency_adhoc_filename=EnumFilenames.AGENCY_ADHOC.value[1],
domain_additional_filename=EnumFilenames.DOMAIN_ADDITIONAL.value[1],
domain_escrow_filename=EnumFilenames.DOMAIN_ESCROW.value[1],
domain_adhoc_filename=EnumFilenames.DOMAIN_ADHOC.value[1],
organization_adhoc_filename=EnumFilenames.ORGANIZATION_ADHOC.value[1],
authority_adhoc_filename=EnumFilenames.AUTHORITY_ADHOC.value[1],
directory="migrationdata",
seperator="|",
sep="|",
):
# Add a slash if the last character isn't one
if directory and directory[-1] != "/":
directory += "/"
self.directory = directory
self.seperator = seperator
self.seperator = sep
self.all_files = glob.glob(f"{directory}*")
# Create a set with filenames as keys for quick lookup
self.all_files_set = {os.path.basename(file) for file in self.all_files}
self.file_data = {
# (filename, default_url): metadata about the desired file
self.filenames.AGENCY_ADHOC: PatternMap(
agency_adhoc_filename, self.strip_date_regex, AgencyAdhoc, "agencyid"
),
self.filenames.DOMAIN_ADDITIONAL: PatternMap(
domain_additional_filename,
self.strip_date_regex,
DomainAdditionalData,
"domainname",
),
self.filenames.DOMAIN_ADHOC: PatternMap(
domain_adhoc_filename,
self.strip_date_regex,
DomainTypeAdhoc,
"domaintypeid",
),
self.filenames.ORGANIZATION_ADHOC: PatternMap(
organization_adhoc_filename,
self.strip_date_regex,
OrganizationAdhoc,
"orgid",
),
self.filenames.AUTHORITY_ADHOC: PatternMap(
# Used for a container of values at each filename.
# Instead of tracking each in a seperate variable, we can declare
# metadata about each file and associate it with an enum.
# That way if we want the data located at the agency_adhoc file,
# we can just call EnumFilenames.AGENCY_ADHOC.
pattern_map_params = [
(EnumFilenames.AGENCY_ADHOC, agency_adhoc_filename, AgencyAdhoc, "agencyid"),
(EnumFilenames.DOMAIN_ADDITIONAL, domain_additional_filename, DomainAdditionalData, "domainname"),
(EnumFilenames.DOMAIN_ESCROW, domain_escrow_filename, DomainEscrow, "domainname"),
(EnumFilenames.DOMAIN_ADHOC, domain_adhoc_filename, DomainTypeAdhoc, "domaintypeid"),
(EnumFilenames.ORGANIZATION_ADHOC, organization_adhoc_filename, OrganizationAdhoc, "orgid"),
(EnumFilenames.AUTHORITY_ADHOC, authority_adhoc_filename, AuthorityAdhoc, "authorityid"),
]
self.file_data = self.populate_file_data(pattern_map_params)
def populate_file_data(
self,
pattern_map_params: List[Tuple[EnumFilenames, str, type, str]]
):
"""Populates the self.file_data field given a set
of tuple params.
pattern_map_params must adhere to this format:
[
(field_type, filename, data_type, id_field),
]
vars:
file_type (EnumFilenames) -> The name of the dictionary.
Defined as a value on EnumFilenames, such as
EnumFilenames.AGENCY_ADHOC
filename (str) -> The filepath of the given
"file_type", such as migrationdata/test123.txt
data_type (type) -> The type of data to be read
at the location of the filename. For instance,
each row of test123.txt may return data of type AgencyAdhoc
id_field (str) -> Given the "data_type" of each row,
this specifies what the "id" of that row is.
For example, "agencyid". This is used so we can
store each record in a dictionary rather than
a list of values.
return example:
EnumFilenames.AUTHORITY_ADHOC: PatternMap(
authority_adhoc_filename,
self.strip_date_regex,
AuthorityAdhoc,
"authorityid",
),
}
"""
file_data = {}
for file_type, filename, data_type, id_field in pattern_map_params:
file_data[file_type] = PatternMap(
filename,
self.strip_date_regex,
data_type,
id_field,
)
return file_data
def parse_all_files(self, infer_filenames=True):
"""Clears all preexisting data then parses each related CSV file.
@ -588,15 +619,16 @@ class ExtraTransitionDomain:
"""
self.clear_file_data()
for name, value in self.file_data.items():
is_domain_escrow = name == EnumFilenames.DOMAIN_ESCROW
filename = f"{value.filename}"
if filename in self.all_files_set:
_file = f"{self.directory}{value.filename}"
value.data = self._read_csv_file(
value.data = self.parse_csv_file(
_file,
self.seperator,
value.data_type,
value.id_field,
is_domain_escrow,
)
else:
if not infer_filenames:
@ -618,11 +650,12 @@ class ExtraTransitionDomain:
if filename in self.all_files_set:
logger.info(f"Infer success. Found file {filename}")
_file = f"{self.directory}{filename}"
value.data = self._read_csv_file(
value.data = self.parse_csv_file(
_file,
self.seperator,
value.data_type,
value.id_field,
is_domain_escrow,
)
continue
# Log if we can't find the desired file
@ -633,6 +666,32 @@ class ExtraTransitionDomain:
file_type: PatternMap = item
file_type.data = {}
def parse_csv_file(self, file, seperator, dataclass_type, id_field, is_domain_escrow=False):
# Domain escrow is an edge case
if is_domain_escrow:
return self._read_domain_escrow(file, seperator)
else:
return self._read_csv_file(file, seperator, dataclass_type, id_field)
# Domain escrow is an edgecase given that its structured differently data-wise.
def _read_domain_escrow(self, file, seperator):
dict_data = {}
with open(file, "r", encoding="utf-8-sig") as requested_file:
reader = csv.reader(requested_file, delimiter=seperator)
for row in reader:
domain_name = row[0]
date_format = "%Y-%m-%dT%H:%M:%SZ"
# TODO - add error handling
creation_date = datetime.strptime(row[8], date_format)
expiration_date = datetime.strptime(row[10], date_format)
dict_data[domain_name] = DomainEscrow(
domain_name,
creation_date,
expiration_date
)
return dict_data
def _read_csv_file(self, file, seperator, dataclass_type, id_field):
with open(file, "r", encoding="utf-8-sig") as requested_file:
reader = csv.DictReader(requested_file, delimiter=seperator)

View file

@ -1,26 +1,35 @@
from dataclasses import dataclass
@dataclass
class TransitionDomainArguments:
"""Stores arguments for load_transition_domain"""
def __init__(self, **options):
# Settings #
directory: str
seperator: str
limit_parse: int
self.directory = options.get('directory')
self.sep = options.get('sep')
self.limitParse = options.get('limitParse')
# Filenames #
## Adhocs ##
agency_adhoc_filename: str
domain_adhoc_filename: str
organization_adhoc_filename: str
self.agency_adhoc_filename = options.get('agency_adhoc_filename')
self.domain_adhoc_filename = options.get('domain_adhoc_filename')
self.organization_adhoc_filename = options.get('organization_adhoc_filename')
## Data files ##
domain_additional_filename: str
domain_contacts_filename: str
domain_statuses_filename: str
self.domain_additional_filename = options.get('domain_additional_filename')
self.domain_contacts_filename = options.get('domain_contacts_filename')
self.domain_statuses_filename = options.get('domain_statuses_filename')
# Flags #
debug: bool
reset_table: bool
load_extra: bool
self.debug = options.get('debug')
self.resetTable = options.get('resetTable')
def args_extra_transition_domain(self):
return {
"agency_adhoc_filename": self.agency_adhoc_filename,
"domain_adhoc_filename": self.domain_adhoc_filename,
"organization_adhoc_filename": self.organization_adhoc_filename,
"domain_additional_filename": self.domain_additional_filename,
"directory": self.directory,
"sep": self.sep,
}