Parse expiration

This commit is contained in:
zandercymatics 2023-11-01 09:07:22 -06:00
parent 13172870fb
commit dca5bdef72
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
7 changed files with 256 additions and 562 deletions

View file

@ -1,465 +0,0 @@
""""""
import csv
import glob
import re
import logging
import os
from typing import List
from enum import Enum
from django.core.management import BaseCommand
from registrar.models.transition_domain import TransitionDomain
from .utility.extra_transition_domain import ExtraTransitionDomain
from .utility.epp_data_containers import (
AgencyAdhoc,
AuthorityAdhoc,
DomainAdditionalData,
DomainTypeAdhoc,
OrganizationAdhoc,
EnumFilenames,
)
logger = logging.getLogger(__name__)
class LogCode(Enum):
ERROR = 1
WARNING = 2
INFO = 3
DEBUG = 4
class FileTransitionLog:
"""Container for storing event logs. Used to lessen
the complexity of storing multiple logs across multiple
variables.
self.logs: dict -> {
EnumFilenames.DOMAIN_ADHOC: List[LogItem],
EnumFilenames.AGENCY_ADHOC: List[LogItem],
EnumFilenames.ORGANIZATION_ADHOC: List[LogItem],
EnumFilenames.DOMAIN_ADDITIONAL: List[LogItem],
}
"""
def __init__(self):
self.logs = {
EnumFilenames.DOMAIN_ADHOC: [],
EnumFilenames.AGENCY_ADHOC: [],
EnumFilenames.ORGANIZATION_ADHOC: [],
EnumFilenames.DOMAIN_ADDITIONAL: [],
}
class LogItem:
"""Used for storing data about logger information."""
def __init__(self, file_type, code, message, domain_name):
self.file_type = file_type
self.code = code
self.message = message
self.domain_name = domain_name
def add_log(self, file_type, code, message, domain_name):
"""Adds a log item to self.logs
file_type -> Which array to add to,
ex. EnumFilenames.DOMAIN_ADHOC
code -> Log severity or other metadata, ex. LogCode.ERROR
message -> Message to display
"""
self.logs[file_type].append(self.LogItem(file_type, code, message, domain_name))
def create_log_item(self, file_type, code, message, domain_name=None, add_to_list=True):
"""Creates and returns an LogItem object.
add_to_list: bool -> If enabled, add it to the logs array.
"""
log = self.LogItem(file_type, code, message, domain_name)
if not add_to_list:
return log
else:
self.logs[file_type].append(log)
return log
def display_logs(self, file_type):
"""Displays all logs in the given file_type in EnumFilenames.
Will log with the correct severity depending on code.
"""
for log in self.logs.get(file_type):
match log.code:
case LogCode.ERROR:
logger.error(log.message)
case LogCode.WARNING:
logger.warning(log.message)
case LogCode.INFO:
logger.info(log.message)
case LogCode.DEBUG:
logger.debug(log.message)
class Command(BaseCommand):
help = ""
filenames = EnumFilenames
parse_logs = FileTransitionLog()
def add_arguments(self, parser):
"""Add filename arguments."""
parser.add_argument(
"--directory", default="migrationdata", help="Desired directory"
)
parser.add_argument(
"--agency_adhoc_filename",
default=EnumFilenames.AGENCY_ADHOC.value[1],
help="Defines the filename for agency adhocs",
)
parser.add_argument(
"--domain_additional_filename",
default=EnumFilenames.DOMAIN_ADDITIONAL.value[1],
help="Defines the filename for additional domain data",
)
parser.add_argument(
"--domain_adhoc_filename",
default=EnumFilenames.DOMAIN_ADHOC.value[1],
help="Defines the filename for domain type adhocs",
)
parser.add_argument(
"--organization_adhoc_filename",
default=EnumFilenames.ORGANIZATION_ADHOC.value[1],
help="Defines the filename for domain type adhocs",
)
parser.add_argument("--sep", default="|", help="Delimiter character")
def handle(self, **options):
try:
self.domain_object = ExtraTransitionDomain(
agency_adhoc_filename=options["agency_adhoc_filename"],
domain_additional_filename=options["domain_additional_filename"],
domain_adhoc_filename=options["domain_adhoc_filename"],
organization_adhoc_filename=options["organization_adhoc_filename"],
directory=options["directory"],
seperator=options["sep"],
)
self.domain_object.parse_all_files()
except Exception as err:
logger.error(f"Could not load additional data. Error: {err}")
raise err
else:
all_transition_domains = TransitionDomain.objects.all()
if not all_transition_domains.exists():
raise Exception("No TransitionDomain objects exist.")
for transition_domain in all_transition_domains:
domain_name = transition_domain.domain_name.upper()
updated_transition_domain = transition_domain
# STEP 1: Parse organization data
updated_transition_domain = self.parse_org_data(
domain_name, transition_domain
)
self.parse_logs.display_logs(EnumFilenames.ORGANIZATION_ADHOC)
# STEP 2: Parse domain type data
updated_transition_domain = self.parse_domain_type_data(
domain_name, transition_domain
)
self.parse_logs.display_logs(EnumFilenames.DOMAIN_ADHOC)
# STEP 3: Parse agency data
updated_transition_domain = self.parse_agency_data(
domain_name, transition_domain
)
self.parse_logs.display_logs(EnumFilenames.AGENCY_ADHOC)
# STEP 4: Parse expiration data - TODO
updated_transition_domain = self.parse_expiration_data(
domain_name, transition_domain
)
# self.parse_logs(EnumFilenames.EXPIRATION_DATA)
updated_transition_domain.save()
# TODO - Implement once Niki gets her ticket in
def parse_expiration_data(self, domain_name, transition_domain):
return transition_domain
def parse_agency_data(self, domain_name, transition_domain) -> TransitionDomain:
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
info = self.get_agency_info(domain_name)
if info is None:
self.parse_logs.create_log_item(
EnumFilenames.AGENCY_ADHOC,
LogCode.ERROR,
f"Could not add federal_agency on {domain_name}, no data exists.",
domain_name
)
return transition_domain
agency_exists = (
transition_domain.federal_agency is not None
and transition_domain.federal_agency.strip() != ""
)
if not info.active.lower() == "y":
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR,
f"Could not add inactive agency {info.agencyname} on {domain_name}",
domain_name
)
return transition_domain
if not info.isfederal.lower() == "y":
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR,
f"Could not add non-federal agency {info.agencyname} on {domain_name}",
domain_name
)
return transition_domain
transition_domain.federal_agency = info.agencyname
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
EnumFilenames.AGENCY_ADHOC,
"federal_agency",
transition_domain.federal_agency,
domain_name,
agency_exists
)
return transition_domain
def parse_domain_type_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain:
"""Parses the DomainType file.
This file has definitions for organization_type and federal_agency.
Logs if
"""
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
info = self.get_domain_type_info(domain_name)
if info is None:
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR,
f"Could not add domain_type on {domain_name}, no data exists.",
domain_name
)
return transition_domain
# This data is stored as follows: FEDERAL - Judicial
# For all other records, it is stored as so: Interstate
# We can infer if it is federal or not based on this fact.
domain_type = info.domaintype.split("-")
domain_type_length = len(domain_type)
if domain_type_length < 1 or domain_type_length > 2:
raise ValueError("Found invalid data on DOMAIN_ADHOC")
# Then, just grab the organization type.
new_organization_type = domain_type[0].strip()
# Check if this domain_type is active or not.
# If not, we don't want to add this.
if not info.active.lower() == "y":
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR,
f"Could not add inactive domain_type {domain_type[0]} on {domain_name}",
domain_name
)
return transition_domain
# Are we updating data that already exists,
# or are we adding new data in its place?
organization_type_exists = (
transition_domain.organization_type is not None
and transition_domain.organization_type.strip() != ""
)
federal_type_exists = (
transition_domain.federal_type is not None
and transition_domain.federal_type.strip() != ""
)
# If we get two records, then we know it is federal.
# needs to be lowercase for federal type
is_federal = domain_type_length == 2
if is_federal:
new_federal_type = domain_type[1].strip()
transition_domain.organization_type = new_organization_type
transition_domain.federal_type = new_federal_type
else:
transition_domain.organization_type = new_organization_type
transition_domain.federal_type = None
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
EnumFilenames.DOMAIN_ADHOC,
"organization_type",
transition_domain.organization_type,
domain_name,
organization_type_exists,
)
self._add_or_change_message(
EnumFilenames.DOMAIN_ADHOC,
"federal_type",
transition_domain.federal_type,
domain_name,
federal_type_exists,
)
return transition_domain
def parse_org_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain:
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
org_info = self.get_org_info(domain_name)
if org_info is None:
self.parse_logs.create_log_item(
EnumFilenames.ORGANIZATION_ADHOC,
LogCode.ERROR,
f"Could not add organization_name on {domain_name}, no data exists.",
domain_name
)
return transition_domain
desired_property_exists = (
transition_domain.organization_name is not None
and transition_domain.organization_name.strip() != ""
)
transition_domain.organization_name = org_info.orgname
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
EnumFilenames.ORGANIZATION_ADHOC,
"organization_name",
transition_domain.organization_name,
domain_name,
desired_property_exists,
)
return transition_domain
def _add_or_change_message(
self, file_type, var_name, changed_value, domain_name, is_update=False
):
"""Creates a log instance when a property
is successfully changed on a given TransitionDomain."""
if not is_update:
self.parse_logs.create_log_item(
file_type,
LogCode.DEBUG,
f"Added {var_name} as '{changed_value}' on {domain_name}",
domain_name
)
else:
self.parse_logs.create_log_item(
file_type,
LogCode.INFO,
f"Updated existing {var_name} to '{changed_value}' on {domain_name}",
domain_name
)
# Property getters, i.e. orgid or domaintypeid
def get_org_info(self, domain_name) -> OrganizationAdhoc:
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
org_id = domain_info.orgid
return self.get_organization_adhoc(org_id)
def get_domain_type_info(self, domain_name) -> DomainTypeAdhoc:
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
type_id = domain_info.domaintypeid
return self.get_domain_adhoc(type_id)
def get_agency_info(self, domain_name) -> AgencyAdhoc:
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
type_id = domain_info.orgid
return self.get_domain_adhoc(type_id)
def get_authority_info(self, domain_name):
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
type_id = domain_info.authorityid
return self.get_authority_adhoc(type_id)
# Object getters, i.e. DomainAdditionalData or OrganizationAdhoc
def get_domain_data(self, desired_id) -> DomainAdditionalData:
return self.get_object_by_id(EnumFilenames.DOMAIN_ADDITIONAL, desired_id)
def get_organization_adhoc(self, desired_id) -> OrganizationAdhoc:
"""Grabs adhoc information for organizations. Returns an organization
adhoc object.
"""
return self.get_object_by_id(EnumFilenames.ORGANIZATION_ADHOC, desired_id)
def get_domain_adhoc(self, desired_id) -> DomainTypeAdhoc:
""""""
return self.get_object_by_id(EnumFilenames.DOMAIN_ADHOC, desired_id)
def get_agency_adhoc(self, desired_id) -> AgencyAdhoc:
""""""
return self.get_object_by_id(EnumFilenames.AGENCY_ADHOC, desired_id)
def get_authority_adhoc(self, desired_id) -> AuthorityAdhoc:
""""""
return self.get_object_by_id(EnumFilenames.AUTHORITY_ADHOC, desired_id)
def get_object_by_id(self, file_type: EnumFilenames, desired_id):
"""Returns a field in a dictionary based off the type and id.
vars:
file_type: (constant) EnumFilenames -> Which data file to target.
An example would be `EnumFilenames.DOMAIN_ADHOC`.
desired_id: str -> Which id you want to search on.
An example would be `"12"` or `"igorville.gov"`
Explanation:
Each data file has an associated type (file_type) for tracking purposes.
Each file_type is a dictionary which
contains a dictionary of row[id_field]: object.
In practice, this would look like:
EnumFilenames.AUTHORITY_ADHOC: {
"1": AuthorityAdhoc(...),
"2": AuthorityAdhoc(...),
...
}
desired_id will then specify which id to grab. If we wanted "1",
then this function will return the value of id "1".
So, `AuthorityAdhoc(...)`
"""
# Grabs a dict associated with the file_type.
# For example, EnumFilenames.DOMAIN_ADDITIONAL.
desired_type = self.domain_object.file_data.get(file_type)
if desired_type is None:
self.parse_logs.create_log_item(
file_type, LogCode.ERROR, f"Type {file_type} does not exist"
)
return None
# Grab the value given an Id within that file_type dict.
# For example, "igorville.gov".
obj = desired_type.data.get(desired_id)
if obj is None:
self.parse_logs.create_log_item(
file_type, LogCode.ERROR, f"Id {desired_id} does not exist"
)
return obj

View file

@ -16,7 +16,7 @@ from registrar.management.commands.utility.terminal_helper import (
) )
from .utility.transition_domain_arguments import TransitionDomainArguments from .utility.transition_domain_arguments import TransitionDomainArguments
from .utility.extra_transition_domain import LoadExtraTransitionDomain from .utility.extra_transition_domain_helper import LoadExtraTransitionDomain
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -503,7 +503,7 @@ class Command(BaseCommand):
# Prompt the user if they want to load additional data on the domains # Prompt the user if they want to load additional data on the domains
# TODO - add this logic into the core of this file # TODO - add this logic into the core of this file
arguments = TransitionDomainArguments(**options) arguments = TransitionDomainArguments(**options)
do_parse_extra = TerminalHelper.prompt_for_execution( do_parse_extra = TerminalHelper.prompt_for_execution(
True, True,
"./manage.py test", "./manage.py test",
@ -512,4 +512,3 @@ class Command(BaseCommand):
if do_parse_extra: if do_parse_extra:
extra = LoadExtraTransitionDomain(arguments) extra = LoadExtraTransitionDomain(arguments)
extra_logs = extra.parse_logs.logs extra_logs = extra.parse_logs.logs

View file

@ -14,6 +14,7 @@ from typing import List, Optional
@dataclass @dataclass
class AgencyAdhoc: class AgencyAdhoc:
"""Defines the structure given in the AGENCY_ADHOC file""" """Defines the structure given in the AGENCY_ADHOC file"""
agencyid: Optional[int] = None agencyid: Optional[int] = None
agencyname: Optional[str] = None agencyname: Optional[str] = None
active: Optional[str] = None active: Optional[str] = None
@ -23,6 +24,7 @@ class AgencyAdhoc:
@dataclass @dataclass
class DomainAdditionalData: class DomainAdditionalData:
"""Defines the structure given in the DOMAIN_ADDITIONAL file""" """Defines the structure given in the DOMAIN_ADDITIONAL file"""
domainname: Optional[str] = None domainname: Optional[str] = None
domaintypeid: Optional[int] = None domaintypeid: Optional[int] = None
authorityid: Optional[int] = None authorityid: Optional[int] = None
@ -35,6 +37,7 @@ class DomainAdditionalData:
@dataclass @dataclass
class DomainTypeAdhoc: class DomainTypeAdhoc:
"""Defines the structure given in the DOMAIN_ADHOC file""" """Defines the structure given in the DOMAIN_ADHOC file"""
domaintypeid: Optional[int] = None domaintypeid: Optional[int] = None
domaintype: Optional[str] = None domaintype: Optional[str] = None
code: Optional[str] = None code: Optional[str] = None
@ -44,6 +47,7 @@ class DomainTypeAdhoc:
@dataclass @dataclass
class OrganizationAdhoc: class OrganizationAdhoc:
"""Defines the structure given in the ORGANIZATION_ADHOC file""" """Defines the structure given in the ORGANIZATION_ADHOC file"""
orgid: Optional[int] = None orgid: Optional[int] = None
orgname: Optional[str] = None orgname: Optional[str] = None
orgstreet: Optional[str] = None orgstreet: Optional[str] = None
@ -56,6 +60,7 @@ class OrganizationAdhoc:
@dataclass @dataclass
class AuthorityAdhoc: class AuthorityAdhoc:
"""Defines the structure given in the AUTHORITY_ADHOC file""" """Defines the structure given in the AUTHORITY_ADHOC file"""
authorityid: Optional[int] = None authorityid: Optional[int] = None
firstname: Optional[str] = None firstname: Optional[str] = None
middlename: Optional[str] = None middlename: Optional[str] = None
@ -65,9 +70,11 @@ class AuthorityAdhoc:
agencyid: Optional[int] = None agencyid: Optional[int] = None
addlinfo: Optional[List[str]] = None addlinfo: Optional[List[str]] = None
@dataclass @dataclass
class DomainEscrow: class DomainEscrow:
"""Defines the structure given in the DOMAIN_ESCROW file""" """Defines the structure given in the DOMAIN_ESCROW file"""
domainname: Optional[str] = None domainname: Optional[str] = None
creationdate: Optional[date] = None creationdate: Optional[date] = None
expirationdate: Optional[date] = None expirationdate: Optional[date] = None

View file

@ -26,8 +26,10 @@ from .transition_domain_arguments import TransitionDomainArguments
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class LogCode(Enum): class LogCode(Enum):
"""Stores the desired log severity""" """Stores the desired log severity"""
ERROR = 1 ERROR = 1
WARNING = 2 WARNING = 2
INFO = 3 INFO = 3
@ -36,9 +38,9 @@ class LogCode(Enum):
class FileTransitionLog: class FileTransitionLog:
"""Container for storing event logs. Used to lessen """Container for storing event logs. Used to lessen
the complexity of storing multiple logs across multiple the complexity of storing multiple logs across multiple
variables. variables.
self.logs: dict -> { self.logs: dict -> {
EnumFilenames.DOMAIN_ADHOC: List[LogItem], EnumFilenames.DOMAIN_ADHOC: List[LogItem],
EnumFilenames.AGENCY_ADHOC: List[LogItem], EnumFilenames.AGENCY_ADHOC: List[LogItem],
@ -46,16 +48,19 @@ class FileTransitionLog:
EnumFilenames.DOMAIN_ADDITIONAL: List[LogItem], EnumFilenames.DOMAIN_ADDITIONAL: List[LogItem],
} }
""" """
def __init__(self): def __init__(self):
self.logs = { self.logs = {
EnumFilenames.DOMAIN_ADHOC: [], EnumFilenames.DOMAIN_ADHOC: [],
EnumFilenames.AGENCY_ADHOC: [], EnumFilenames.AGENCY_ADHOC: [],
EnumFilenames.ORGANIZATION_ADHOC: [], EnumFilenames.ORGANIZATION_ADHOC: [],
EnumFilenames.DOMAIN_ADDITIONAL: [], EnumFilenames.DOMAIN_ADDITIONAL: [],
EnumFilenames.DOMAIN_ESCROW: [],
} }
class LogItem: class LogItem:
"""Used for storing data about logger information.""" """Used for storing data about logger information."""
def __init__(self, file_type, code, message, domain_name): def __init__(self, file_type, code, message, domain_name):
self.file_type = file_type self.file_type = file_type
self.code = code self.code = code
@ -65,7 +70,7 @@ class FileTransitionLog:
def add_log(self, file_type, code, message, domain_name): def add_log(self, file_type, code, message, domain_name):
"""Adds a log item to self.logs """Adds a log item to self.logs
file_type -> Which array to add to, file_type -> Which array to add to,
ex. EnumFilenames.DOMAIN_ADHOC ex. EnumFilenames.DOMAIN_ADHOC
code -> Log severity or other metadata, ex. LogCode.ERROR code -> Log severity or other metadata, ex. LogCode.ERROR
@ -74,7 +79,9 @@ class FileTransitionLog:
""" """
self.logs[file_type].append(self.LogItem(file_type, code, message, domain_name)) self.logs[file_type].append(self.LogItem(file_type, code, message, domain_name))
def create_log_item(self, file_type, code, message, domain_name=None, add_to_list=True): def create_log_item(
self, file_type, code, message, domain_name=None, add_to_list=True
):
"""Creates and returns an LogItem object. """Creates and returns an LogItem object.
add_to_list: bool -> If enabled, add it to the logs array. add_to_list: bool -> If enabled, add it to the logs array.
@ -118,7 +125,6 @@ class LoadExtraTransitionDomain:
# transition domain object with that data. # transition domain object with that data.
self.update_transition_domain_models() self.update_transition_domain_models()
def update_transition_domain_models(self): def update_transition_domain_models(self):
"""Updates TransitionDomain objects based off the file content """Updates TransitionDomain objects based off the file content
given in self.domain_object""" given in self.domain_object"""
@ -148,22 +154,62 @@ class LoadExtraTransitionDomain:
) )
self.parse_logs.display_logs(EnumFilenames.AGENCY_ADHOC) self.parse_logs.display_logs(EnumFilenames.AGENCY_ADHOC)
# STEP 4: Parse expiration data - TODO # STEP 4: Parse creation and expiration data
updated_transition_domain = self.parse_expiration_data( updated_transition_domain = self.parse_creation_expiration_data(
domain_name, transition_domain domain_name, transition_domain
) )
# self.parse_logs(EnumFilenames.EXPIRATION_DATA) self.parse_logs.display_logs(EnumFilenames.DOMAIN_ESCROW)
updated_transition_domain.save() updated_transition_domain.save()
# TODO - Implement once Niki gets her ticket in def parse_creation_expiration_data(self, domain_name, transition_domain):
def parse_expiration_data(self, domain_name, transition_domain): """Grabs expiration_date from the parsed files and associates it
"""Grabs expiration_date from the parsed files and associates it
with a transition_domain object, then returns that object.""" with a transition_domain object, then returns that object."""
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
info = self.get_domain_escrow_info(domain_name)
if info is None:
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ESCROW,
LogCode.ERROR,
"Could not add epp_creation_date and epp_expiration_date "
f"on {domain_name}, no data exists.",
domain_name,
)
return transition_domain
creation_exists = (
transition_domain.epp_creation_date is not None
)
expiration_exists = (
transition_domain.epp_expiration_date is not None
)
transition_domain.epp_creation_date = info.creationdate
transition_domain.epp_expiration_date = info.expirationdate
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
EnumFilenames.DOMAIN_ESCROW,
"epp_creation_date",
transition_domain.epp_creation_date,
domain_name,
creation_exists,
)
self._add_or_change_message(
EnumFilenames.DOMAIN_ESCROW,
"epp_expiration_date",
transition_domain.epp_expiration_date,
domain_name,
expiration_exists,
)
return transition_domain return transition_domain
def parse_agency_data(self, domain_name, transition_domain) -> TransitionDomain: def parse_agency_data(self, domain_name, transition_domain) -> TransitionDomain:
"""Grabs federal_agency from the parsed files and associates it """Grabs federal_agency from the parsed files and associates it
with a transition_domain object, then returns that object.""" with a transition_domain object, then returns that object."""
if not isinstance(transition_domain, TransitionDomain): if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain") raise ValueError("Not a valid object, must be TransitionDomain")
@ -174,7 +220,7 @@ class LoadExtraTransitionDomain:
EnumFilenames.AGENCY_ADHOC, EnumFilenames.AGENCY_ADHOC,
LogCode.ERROR, LogCode.ERROR,
f"Could not add federal_agency on {domain_name}, no data exists.", f"Could not add federal_agency on {domain_name}, no data exists.",
domain_name domain_name,
) )
return transition_domain return transition_domain
@ -188,19 +234,19 @@ class LoadExtraTransitionDomain:
EnumFilenames.DOMAIN_ADHOC, EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR, LogCode.ERROR,
f"Could not add inactive agency {info.agencyname} on {domain_name}", f"Could not add inactive agency {info.agencyname} on {domain_name}",
domain_name domain_name,
) )
return transition_domain return transition_domain
if not info.isfederal.lower() == "y": if not info.isfederal.lower() == "y":
self.parse_logs.create_log_item( self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ADHOC, EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR, LogCode.ERROR,
f"Could not add non-federal agency {info.agencyname} on {domain_name}", f"Could not add non-federal agency {info.agencyname} on {domain_name}",
domain_name domain_name,
) )
return transition_domain return transition_domain
transition_domain.federal_agency = info.agencyname transition_domain.federal_agency = info.agencyname
# Logs if we either added to this property, # Logs if we either added to this property,
@ -210,13 +256,15 @@ class LoadExtraTransitionDomain:
"federal_agency", "federal_agency",
transition_domain.federal_agency, transition_domain.federal_agency,
domain_name, domain_name,
agency_exists agency_exists,
) )
return transition_domain return transition_domain
def parse_domain_type_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain: def parse_domain_type_data(
"""Grabs organization_type and federal_type from the parsed files self, domain_name, transition_domain: TransitionDomain
) -> TransitionDomain:
"""Grabs organization_type and federal_type from the parsed files
and associates it with a transition_domain object, then returns that object.""" and associates it with a transition_domain object, then returns that object."""
if not isinstance(transition_domain, TransitionDomain): if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain") raise ValueError("Not a valid object, must be TransitionDomain")
@ -227,7 +275,7 @@ class LoadExtraTransitionDomain:
EnumFilenames.DOMAIN_ADHOC, EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR, LogCode.ERROR,
f"Could not add domain_type on {domain_name}, no data exists.", f"Could not add domain_type on {domain_name}, no data exists.",
domain_name domain_name,
) )
return transition_domain return transition_domain
@ -249,7 +297,7 @@ class LoadExtraTransitionDomain:
EnumFilenames.DOMAIN_ADHOC, EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR, LogCode.ERROR,
f"Could not add inactive domain_type {domain_type[0]} on {domain_name}", f"Could not add inactive domain_type {domain_type[0]} on {domain_name}",
domain_name domain_name,
) )
return transition_domain return transition_domain
@ -295,8 +343,10 @@ class LoadExtraTransitionDomain:
return transition_domain return transition_domain
def parse_org_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain: def parse_org_data(
"""Grabs organization_name from the parsed files and associates it self, domain_name, transition_domain: TransitionDomain
) -> TransitionDomain:
"""Grabs organization_name from the parsed files and associates it
with a transition_domain object, then returns that object.""" with a transition_domain object, then returns that object."""
if not isinstance(transition_domain, TransitionDomain): if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain") raise ValueError("Not a valid object, must be TransitionDomain")
@ -307,7 +357,7 @@ class LoadExtraTransitionDomain:
EnumFilenames.ORGANIZATION_ADHOC, EnumFilenames.ORGANIZATION_ADHOC,
LogCode.ERROR, LogCode.ERROR,
f"Could not add organization_name on {domain_name}, no data exists.", f"Could not add organization_name on {domain_name}, no data exists.",
domain_name domain_name,
) )
return transition_domain return transition_domain
@ -340,19 +390,19 @@ class LoadExtraTransitionDomain:
file_type, file_type,
LogCode.DEBUG, LogCode.DEBUG,
f"Added {var_name} as '{changed_value}' on {domain_name}", f"Added {var_name} as '{changed_value}' on {domain_name}",
domain_name domain_name,
) )
else: else:
self.parse_logs.create_log_item( self.parse_logs.create_log_item(
file_type, file_type,
LogCode.INFO, LogCode.INFO,
f"Updated existing {var_name} to '{changed_value}' on {domain_name}", f"Updated existing {var_name} to '{changed_value}' on {domain_name}",
domain_name domain_name,
) )
# Property getters, i.e. orgid or domaintypeid # Property getters, i.e. orgid or domaintypeid
def get_org_info(self, domain_name) -> OrganizationAdhoc: def get_org_info(self, domain_name) -> OrganizationAdhoc:
"""Maps an id given in get_domain_data to a organization_adhoc """Maps an id given in get_domain_data to a organization_adhoc
record which has its corresponding definition""" record which has its corresponding definition"""
domain_info = self.get_domain_data(domain_name) domain_info = self.get_domain_data(domain_name)
if domain_info is None: if domain_info is None:
@ -361,7 +411,7 @@ class LoadExtraTransitionDomain:
return self.get_organization_adhoc(org_id) return self.get_organization_adhoc(org_id)
def get_domain_type_info(self, domain_name) -> DomainTypeAdhoc: def get_domain_type_info(self, domain_name) -> DomainTypeAdhoc:
"""Maps an id given in get_domain_data to a domain_type_adhoc """Maps an id given in get_domain_data to a domain_type_adhoc
record which has its corresponding definition""" record which has its corresponding definition"""
domain_info = self.get_domain_data(domain_name) domain_info = self.get_domain_data(domain_name)
if domain_info is None: if domain_info is None:
@ -370,23 +420,30 @@ class LoadExtraTransitionDomain:
return self.get_domain_adhoc(type_id) return self.get_domain_adhoc(type_id)
def get_agency_info(self, domain_name) -> AgencyAdhoc: def get_agency_info(self, domain_name) -> AgencyAdhoc:
"""Maps an id given in get_domain_data to a agency_adhoc """Maps an id given in get_domain_data to a agency_adhoc
record which has its corresponding definition""" record which has its corresponding definition"""
domain_info = self.get_domain_data(domain_name) domain_info = self.get_domain_data(domain_name)
if domain_info is None: if domain_info is None:
return None return None
type_id = domain_info.orgid type_id = domain_info.orgid
return self.get_domain_adhoc(type_id) return self.get_domain_adhoc(type_id)
def get_authority_info(self, domain_name): def get_authority_info(self, domain_name):
"""Maps an id given in get_domain_data to a authority_adhoc """Maps an id given in get_domain_data to a authority_adhoc
record which has its corresponding definition""" record which has its corresponding definition"""
domain_info = self.get_domain_data(domain_name) domain_info = self.get_domain_data(domain_name)
if domain_info is None: if domain_info is None:
return None return None
type_id = domain_info.authorityid type_id = domain_info.authorityid
return self.get_authority_adhoc(type_id) return self.get_authority_adhoc(type_id)
def get_domain_escrow_info(self, domain_name):
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
type_id = domain_info.domainname
return self.get_domain_escrow(type_id)
# Object getters, i.e. DomainAdditionalData or OrganizationAdhoc # Object getters, i.e. DomainAdditionalData or OrganizationAdhoc
def get_domain_data(self, desired_id) -> DomainAdditionalData: def get_domain_data(self, desired_id) -> DomainAdditionalData:
"""Grabs a corresponding row within the DOMAIN_ADDITIONAL file, """Grabs a corresponding row within the DOMAIN_ADDITIONAL file,
@ -407,11 +464,16 @@ class LoadExtraTransitionDomain:
"""Grabs a corresponding row within the AGENCY_ADHOC file, """Grabs a corresponding row within the AGENCY_ADHOC file,
based off a desired_id""" based off a desired_id"""
return self.get_object_by_id(EnumFilenames.AGENCY_ADHOC, desired_id) return self.get_object_by_id(EnumFilenames.AGENCY_ADHOC, desired_id)
def get_authority_adhoc(self, desired_id) -> AuthorityAdhoc: def get_authority_adhoc(self, desired_id) -> AuthorityAdhoc:
"""Grabs a corresponding row within the AUTHORITY_ADHOC file, """Grabs a corresponding row within the AUTHORITY_ADHOC file,
based off a desired_id""" based off a desired_id"""
return self.get_object_by_id(EnumFilenames.AUTHORITY_ADHOC, desired_id) return self.get_object_by_id(EnumFilenames.AUTHORITY_ADHOC, desired_id)
def get_domain_escrow(self, desired_id) -> DomainEscrow:
"""Grabs a corresponding row within the DOMAIN_ESCROW file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.DOMAIN_ESCROW, desired_id)
def get_object_by_id(self, file_type: EnumFilenames, desired_id): def get_object_by_id(self, file_type: EnumFilenames, desired_id):
"""Returns a field in a dictionary based off the type and id. """Returns a field in a dictionary based off the type and id.
@ -420,23 +482,23 @@ class LoadExtraTransitionDomain:
file_type: (constant) EnumFilenames -> Which data file to target. file_type: (constant) EnumFilenames -> Which data file to target.
An example would be `EnumFilenames.DOMAIN_ADHOC`. An example would be `EnumFilenames.DOMAIN_ADHOC`.
desired_id: str -> Which id you want to search on. desired_id: str -> Which id you want to search on.
An example would be `"12"` or `"igorville.gov"` An example would be `"12"` or `"igorville.gov"`
Explanation: Explanation:
Each data file has an associated type (file_type) for tracking purposes. Each data file has an associated type (file_type) for tracking purposes.
Each file_type is a dictionary which Each file_type is a dictionary which
contains a dictionary of row[id_field]: object. contains a dictionary of row[id_field]: object.
In practice, this would look like: In practice, this would look like:
EnumFilenames.AUTHORITY_ADHOC: { EnumFilenames.AUTHORITY_ADHOC: {
"1": AuthorityAdhoc(...), "1": AuthorityAdhoc(...),
"2": AuthorityAdhoc(...), "2": AuthorityAdhoc(...),
... ...
} }
desired_id will then specify which id to grab. If we wanted "1", desired_id will then specify which id to grab. If we wanted "1",
then this function will return the value of id "1". then this function will return the value of id "1".
So, `AuthorityAdhoc(...)` So, `AuthorityAdhoc(...)`
@ -450,7 +512,7 @@ class LoadExtraTransitionDomain:
) )
return None return None
# Grab the value given an Id within that file_type dict. # Grab the value given an Id within that file_type dict.
# For example, "igorville.gov". # For example, "igorville.gov".
obj = desired_type.data.get(desired_id) obj = desired_type.data.get(desired_id)
if obj is None: if obj is None:
@ -474,9 +536,12 @@ class PatternMap:
data_type: type -> Metadata about the desired type for data. data_type: type -> Metadata about the desired type for data.
id_field: str -> Defines which field should act as the id in data. id_field: str -> Defines which field should act as the id in data.
This is necessary as we store lists of "data_type" in ExtraTransitionDomain as follows:
data: dict -> The returned data. Intended to be used with data_type {
to cross-reference. id_field: data_type(...),
id_field: data_type(...),
...
}
""" """
@ -487,15 +552,27 @@ class PatternMap:
data_type: type, data_type: type,
id_field: str, id_field: str,
): ):
# Metadata #
## Filename inference metadata ##
self.regex = regex self.regex = regex
self.data_type = data_type
self.id_field = id_field
self.data = {}
self.filename = filename
self.could_infer = False self.could_infer = False
## "data" object metadata ##
### Where the data is sourced from ###
self.filename = filename
### What type the data is ###
self.data_type = data_type
### What the id should be in the holding dict ###
self.id_field = id_field
# Object data #
self.data = {}
self.logs = []
def try_infer_filename(self, current_file_name, default_file_name): def try_infer_filename(self, current_file_name, default_file_name):
"""Tries to match a given filename to a regex, """Tries to match a given filename to a regex,
then uses that match to generate the filename.""" then uses that match to generate the filename."""
# returns (filename, inferred_successfully) # returns (filename, inferred_successfully)
return self._infer_filename(self.regex, current_file_name, default_file_name) return self._infer_filename(self.regex, current_file_name, default_file_name)
@ -503,21 +580,36 @@ class PatternMap:
def _infer_filename(self, regex: re.Pattern, matched_file_name, default_file_name): def _infer_filename(self, regex: re.Pattern, matched_file_name, default_file_name):
if not isinstance(regex, re.Pattern): if not isinstance(regex, re.Pattern):
return (self.filename, False) return (self.filename, False)
match = regex.match(matched_file_name) match = regex.match(matched_file_name)
if not match: if not match:
return (self.filename, False) return (self.filename, False)
total_groups = len(match.groups())
# If no matches exist or if we have too many
# matches, then we shouldn't infer
if total_groups == 0 or total_groups > 2:
return (self.filename, False)
# If only one match is returned,
# it means that our default matches our request
if total_groups == 1:
return (self.filename, True)
# Otherwise, if two are returned, then
# its likely the pattern we want
date = match.group(1) date = match.group(1)
filename_without_date = match.group(2) filename_without_date = match.group(2)
# Can the supplied self.regex do a match on the filename? # After stripping out the date,
# do the two filenames match?
can_infer = filename_without_date == default_file_name can_infer = filename_without_date == default_file_name
if not can_infer: if not can_infer:
return (self.filename, False) return (self.filename, False)
# If so, note that and return the inferred name # If they do, recreate the filename and return it
full_filename = date + "." + filename_without_date full_filename = date + "." + filename_without_date
return (full_filename, can_infer) return (full_filename, can_infer)
@ -525,6 +617,7 @@ class PatternMap:
class ExtraTransitionDomain: class ExtraTransitionDomain:
"""Helper class to aid in storing TransitionDomain data spread across """Helper class to aid in storing TransitionDomain data spread across
multiple files.""" multiple files."""
strip_date_regex = re.compile(r"(?:.*\/)?(\d+)\.(.+)") strip_date_regex = re.compile(r"(?:.*\/)?(\d+)\.(.+)")
def __init__( def __init__(
@ -545,27 +638,57 @@ class ExtraTransitionDomain:
self.seperator = sep self.seperator = sep
self.all_files = glob.glob(f"{directory}*") self.all_files = glob.glob(f"{directory}*")
# Create a set with filenames as keys for quick lookup # Create a set with filenames as keys for quick lookup
self.all_files_set = {os.path.basename(file) for file in self.all_files} self.all_files_set = {os.path.basename(file) for file in self.all_files}
# Used for a container of values at each filename. # Used for a container of values at each filename.
# Instead of tracking each in a seperate variable, we can declare # Instead of tracking each in a seperate variable, we can declare
# metadata about each file and associate it with an enum. # metadata about each file and associate it with an enum.
# That way if we want the data located at the agency_adhoc file, # That way if we want the data located at the agency_adhoc file,
# we can just call EnumFilenames.AGENCY_ADHOC. # we can just call EnumFilenames.AGENCY_ADHOC.
pattern_map_params = [ pattern_map_params = [
(EnumFilenames.AGENCY_ADHOC, agency_adhoc_filename, AgencyAdhoc, "agencyid"), (
(EnumFilenames.DOMAIN_ADDITIONAL, domain_additional_filename, DomainAdditionalData, "domainname"), EnumFilenames.AGENCY_ADHOC,
(EnumFilenames.DOMAIN_ESCROW, domain_escrow_filename, DomainEscrow, "domainname"), agency_adhoc_filename,
(EnumFilenames.DOMAIN_ADHOC, domain_adhoc_filename, DomainTypeAdhoc, "domaintypeid"), AgencyAdhoc,
(EnumFilenames.ORGANIZATION_ADHOC, organization_adhoc_filename, OrganizationAdhoc, "orgid"), "agencyid",
(EnumFilenames.AUTHORITY_ADHOC, authority_adhoc_filename, AuthorityAdhoc, "authorityid"), ),
(
EnumFilenames.DOMAIN_ADDITIONAL,
domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.DOMAIN_ESCROW,
domain_escrow_filename,
DomainEscrow,
"domainname",
),
(
EnumFilenames.DOMAIN_ADHOC,
domain_adhoc_filename,
DomainTypeAdhoc,
"domaintypeid",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
] ]
self.file_data = self.populate_file_data(pattern_map_params) self.file_data = self.populate_file_data(pattern_map_params)
def populate_file_data( def populate_file_data(
self, self, pattern_map_params: List[Tuple[EnumFilenames, str, type, str]]
pattern_map_params: List[Tuple[EnumFilenames, str, type, str]]
): ):
"""Populates the self.file_data field given a set """Populates the self.file_data field given a set
of tuple params. of tuple params.
@ -574,10 +697,10 @@ class ExtraTransitionDomain:
[ [
(field_type, filename, data_type, id_field), (field_type, filename, data_type, id_field),
] ]
vars: vars:
file_type (EnumFilenames) -> The name of the dictionary. file_type (EnumFilenames) -> The name of the dictionary.
Defined as a value on EnumFilenames, such as Defined as a value on EnumFilenames, such as
EnumFilenames.AGENCY_ADHOC EnumFilenames.AGENCY_ADHOC
filename (str) -> The filepath of the given filename (str) -> The filepath of the given
@ -635,10 +758,14 @@ class ExtraTransitionDomain:
logger.error(f"Could not find file: {filename}") logger.error(f"Could not find file: {filename}")
continue continue
logger.warning( # Infer filename logic #
"Attempting to infer filename" # This mode is used for development and testing only. Rather than having
f" for file: {filename}." # to manually define the filename each time, we can infer what the filename
) # actually is.
# Not intended for use outside of that, as it is better to assume
# the end-user wants to be specific.
logger.warning("Attempting to infer filename" f" for file: {filename}.")
for filename in self.all_files: for filename in self.all_files:
default_name = name.value[1] default_name = name.value[1]
match = value.try_infer_filename(filename, default_name) match = value.try_infer_filename(filename, default_name)
@ -666,7 +793,9 @@ class ExtraTransitionDomain:
file_type: PatternMap = item file_type: PatternMap = item
file_type.data = {} file_type.data = {}
def parse_csv_file(self, file, seperator, dataclass_type, id_field, is_domain_escrow=False): def parse_csv_file(
self, file, seperator, dataclass_type, id_field, is_domain_escrow=False
):
# Domain escrow is an edge case # Domain escrow is an edge case
if is_domain_escrow: if is_domain_escrow:
return self._read_domain_escrow(file, seperator) return self._read_domain_escrow(file, seperator)
@ -684,11 +813,9 @@ class ExtraTransitionDomain:
# TODO - add error handling # TODO - add error handling
creation_date = datetime.strptime(row[8], date_format) creation_date = datetime.strptime(row[8], date_format)
expiration_date = datetime.strptime(row[10], date_format) expiration_date = datetime.strptime(row[10], date_format)
dict_data[domain_name] = DomainEscrow( dict_data[domain_name] = DomainEscrow(
domain_name, domain_name, creation_date, expiration_date
creation_date,
expiration_date
) )
return dict_data return dict_data
@ -704,10 +831,10 @@ class ExtraTransitionDomain:
for row in reader: for row in reader:
if None in row: if None in row:
print("Skipping row with None key") print("Skipping row with None key")
#for key, value in row.items(): # for key, value in row.items():
#print(f"key: {key} value: {value}") # print(f"key: {key} value: {value}")
continue continue
row_id = row[id_field] row_id = row[id_field]
dict_data[row_id] = dataclass_type(**row) dict_data[row_id] = dataclass_type(**row)
#dict_data = {row[id_field]: dataclass_type(**row) for row in reader} # dict_data = {row[id_field]: dataclass_type(**row) for row in reader}
return dict_data return dict_data

View file

@ -1,29 +1,27 @@
class TransitionDomainArguments: class TransitionDomainArguments:
"""Stores arguments for load_transition_domain""" """Stores arguments for load_transition_domain"""
def __init__(self, **options): def __init__(self, **options):
# Settings # # Settings #
self.directory = options.get('directory') self.directory = options.get("directory")
self.sep = options.get('sep') self.sep = options.get("sep")
self.limitParse = options.get('limitParse') self.limitParse = options.get("limitParse")
# Filenames # # Filenames #
## Adhocs ## ## Adhocs ##
self.agency_adhoc_filename = options.get('agency_adhoc_filename') self.agency_adhoc_filename = options.get("agency_adhoc_filename")
self.domain_adhoc_filename = options.get('domain_adhoc_filename') self.domain_adhoc_filename = options.get("domain_adhoc_filename")
self.organization_adhoc_filename = options.get('organization_adhoc_filename') self.organization_adhoc_filename = options.get("organization_adhoc_filename")
## Data files ## ## Data files ##
self.domain_additional_filename = options.get('domain_additional_filename') self.domain_additional_filename = options.get("domain_additional_filename")
self.domain_contacts_filename = options.get('domain_contacts_filename') self.domain_contacts_filename = options.get("domain_contacts_filename")
self.domain_statuses_filename = options.get('domain_statuses_filename') self.domain_statuses_filename = options.get("domain_statuses_filename")
# Flags # # Flags #
self.debug = options.get('debug') self.debug = options.get("debug")
self.resetTable = options.get('resetTable') self.resetTable = options.get("resetTable")
def args_extra_transition_domain(self): def args_extra_transition_domain(self):
return { return {
"agency_adhoc_filename": self.agency_adhoc_filename, "agency_adhoc_filename": self.agency_adhoc_filename,
@ -32,4 +30,4 @@ class TransitionDomainArguments:
"domain_additional_filename": self.domain_additional_filename, "domain_additional_filename": self.domain_additional_filename,
"directory": self.directory, "directory": self.directory,
"sep": self.sep, "sep": self.sep,
} }

View file

@ -38,4 +38,20 @@ class Migration(migrations.Migration):
blank=True, db_index=True, help_text="Organization name", null=True blank=True, db_index=True, help_text="Organization name", null=True
), ),
), ),
migrations.AddField(
model_name="transitiondomain",
name="epp_creation_date",
field=models.DateField(
help_text="Duplication of registry's creation date saved for ease of reporting",
null=True,
),
),
migrations.AddField(
model_name="transitiondomain",
name="epp_expiration_date",
field=models.DateField(
help_text="Duplication of registry's expiration date saved for ease of reporting",
null=True,
),
),
] ]

View file

@ -65,6 +65,18 @@ class TransitionDomain(TimeStampedModel):
blank=True, blank=True,
help_text="Federal agency", help_text="Federal agency",
) )
epp_creation_date = models.DateField(
null=True,
help_text=(
"Duplication of registry's creation " "date saved for ease of reporting"
),
)
epp_expiration_date = models.DateField(
null=True,
help_text=(
"Duplication of registry's expiration " "date saved for ease of reporting"
),
)
def __str__(self): def __str__(self):
return ( return (