diff --git a/src/registrar/management/commands/load_extra_transition_domain.py b/src/registrar/management/commands/load_extra_transition_domain.py deleted file mode 100644 index a3fdca7e3..000000000 --- a/src/registrar/management/commands/load_extra_transition_domain.py +++ /dev/null @@ -1,465 +0,0 @@ -"""""" -import csv -import glob -import re -import logging - -import os -from typing import List -from enum import Enum -from django.core.management import BaseCommand - -from registrar.models.transition_domain import TransitionDomain -from .utility.extra_transition_domain import ExtraTransitionDomain -from .utility.epp_data_containers import ( - AgencyAdhoc, - AuthorityAdhoc, - DomainAdditionalData, - DomainTypeAdhoc, - OrganizationAdhoc, - EnumFilenames, -) - -logger = logging.getLogger(__name__) - - -class LogCode(Enum): - ERROR = 1 - WARNING = 2 - INFO = 3 - DEBUG = 4 - - -class FileTransitionLog: - """Container for storing event logs. Used to lessen - the complexity of storing multiple logs across multiple - variables. - - self.logs: dict -> { - EnumFilenames.DOMAIN_ADHOC: List[LogItem], - EnumFilenames.AGENCY_ADHOC: List[LogItem], - EnumFilenames.ORGANIZATION_ADHOC: List[LogItem], - EnumFilenames.DOMAIN_ADDITIONAL: List[LogItem], - } - """ - def __init__(self): - self.logs = { - EnumFilenames.DOMAIN_ADHOC: [], - EnumFilenames.AGENCY_ADHOC: [], - EnumFilenames.ORGANIZATION_ADHOC: [], - EnumFilenames.DOMAIN_ADDITIONAL: [], - } - - class LogItem: - """Used for storing data about logger information.""" - def __init__(self, file_type, code, message, domain_name): - self.file_type = file_type - self.code = code - self.message = message - self.domain_name = domain_name - - def add_log(self, file_type, code, message, domain_name): - """Adds a log item to self.logs - - file_type -> Which array to add to, - ex. EnumFilenames.DOMAIN_ADHOC - - code -> Log severity or other metadata, ex. LogCode.ERROR - - message -> Message to display - """ - self.logs[file_type].append(self.LogItem(file_type, code, message, domain_name)) - - def create_log_item(self, file_type, code, message, domain_name=None, add_to_list=True): - """Creates and returns an LogItem object. - - add_to_list: bool -> If enabled, add it to the logs array. - """ - log = self.LogItem(file_type, code, message, domain_name) - if not add_to_list: - return log - else: - self.logs[file_type].append(log) - return log - - def display_logs(self, file_type): - """Displays all logs in the given file_type in EnumFilenames. - Will log with the correct severity depending on code. - """ - for log in self.logs.get(file_type): - match log.code: - case LogCode.ERROR: - logger.error(log.message) - case LogCode.WARNING: - logger.warning(log.message) - case LogCode.INFO: - logger.info(log.message) - case LogCode.DEBUG: - logger.debug(log.message) - - -class Command(BaseCommand): - help = "" - filenames = EnumFilenames - parse_logs = FileTransitionLog() - - def add_arguments(self, parser): - """Add filename arguments.""" - parser.add_argument( - "--directory", default="migrationdata", help="Desired directory" - ) - parser.add_argument( - "--agency_adhoc_filename", - default=EnumFilenames.AGENCY_ADHOC.value[1], - help="Defines the filename for agency adhocs", - ) - parser.add_argument( - "--domain_additional_filename", - default=EnumFilenames.DOMAIN_ADDITIONAL.value[1], - help="Defines the filename for additional domain data", - ) - parser.add_argument( - "--domain_adhoc_filename", - default=EnumFilenames.DOMAIN_ADHOC.value[1], - help="Defines the filename for domain type adhocs", - ) - parser.add_argument( - "--organization_adhoc_filename", - default=EnumFilenames.ORGANIZATION_ADHOC.value[1], - help="Defines the filename for domain type adhocs", - ) - parser.add_argument("--sep", default="|", help="Delimiter character") - - def handle(self, **options): - try: - self.domain_object = ExtraTransitionDomain( - agency_adhoc_filename=options["agency_adhoc_filename"], - domain_additional_filename=options["domain_additional_filename"], - domain_adhoc_filename=options["domain_adhoc_filename"], - organization_adhoc_filename=options["organization_adhoc_filename"], - directory=options["directory"], - seperator=options["sep"], - ) - self.domain_object.parse_all_files() - except Exception as err: - logger.error(f"Could not load additional data. Error: {err}") - raise err - else: - all_transition_domains = TransitionDomain.objects.all() - if not all_transition_domains.exists(): - raise Exception("No TransitionDomain objects exist.") - for transition_domain in all_transition_domains: - domain_name = transition_domain.domain_name.upper() - updated_transition_domain = transition_domain - # STEP 1: Parse organization data - updated_transition_domain = self.parse_org_data( - domain_name, transition_domain - ) - self.parse_logs.display_logs(EnumFilenames.ORGANIZATION_ADHOC) - - # STEP 2: Parse domain type data - updated_transition_domain = self.parse_domain_type_data( - domain_name, transition_domain - ) - self.parse_logs.display_logs(EnumFilenames.DOMAIN_ADHOC) - - # STEP 3: Parse agency data - updated_transition_domain = self.parse_agency_data( - domain_name, transition_domain - ) - self.parse_logs.display_logs(EnumFilenames.AGENCY_ADHOC) - - # STEP 4: Parse expiration data - TODO - updated_transition_domain = self.parse_expiration_data( - domain_name, transition_domain - ) - # self.parse_logs(EnumFilenames.EXPIRATION_DATA) - - updated_transition_domain.save() - - # TODO - Implement once Niki gets her ticket in - def parse_expiration_data(self, domain_name, transition_domain): - return transition_domain - - def parse_agency_data(self, domain_name, transition_domain) -> TransitionDomain: - if not isinstance(transition_domain, TransitionDomain): - raise ValueError("Not a valid object, must be TransitionDomain") - - info = self.get_agency_info(domain_name) - if info is None: - self.parse_logs.create_log_item( - EnumFilenames.AGENCY_ADHOC, - LogCode.ERROR, - f"Could not add federal_agency on {domain_name}, no data exists.", - domain_name - ) - return transition_domain - - agency_exists = ( - transition_domain.federal_agency is not None - and transition_domain.federal_agency.strip() != "" - ) - - if not info.active.lower() == "y": - self.parse_logs.create_log_item( - EnumFilenames.DOMAIN_ADHOC, - LogCode.ERROR, - f"Could not add inactive agency {info.agencyname} on {domain_name}", - domain_name - ) - return transition_domain - - if not info.isfederal.lower() == "y": - self.parse_logs.create_log_item( - EnumFilenames.DOMAIN_ADHOC, - LogCode.ERROR, - f"Could not add non-federal agency {info.agencyname} on {domain_name}", - domain_name - ) - return transition_domain - - transition_domain.federal_agency = info.agencyname - - # Logs if we either added to this property, - # or modified it. - self._add_or_change_message( - EnumFilenames.AGENCY_ADHOC, - "federal_agency", - transition_domain.federal_agency, - domain_name, - agency_exists - ) - - return transition_domain - - def parse_domain_type_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain: - """Parses the DomainType file. - This file has definitions for organization_type and federal_agency. - Logs if - """ - if not isinstance(transition_domain, TransitionDomain): - raise ValueError("Not a valid object, must be TransitionDomain") - - info = self.get_domain_type_info(domain_name) - if info is None: - self.parse_logs.create_log_item( - EnumFilenames.DOMAIN_ADHOC, - LogCode.ERROR, - f"Could not add domain_type on {domain_name}, no data exists.", - domain_name - ) - return transition_domain - - # This data is stored as follows: FEDERAL - Judicial - # For all other records, it is stored as so: Interstate - # We can infer if it is federal or not based on this fact. - domain_type = info.domaintype.split("-") - domain_type_length = len(domain_type) - if domain_type_length < 1 or domain_type_length > 2: - raise ValueError("Found invalid data on DOMAIN_ADHOC") - - # Then, just grab the organization type. - new_organization_type = domain_type[0].strip() - - # Check if this domain_type is active or not. - # If not, we don't want to add this. - if not info.active.lower() == "y": - self.parse_logs.create_log_item( - EnumFilenames.DOMAIN_ADHOC, - LogCode.ERROR, - f"Could not add inactive domain_type {domain_type[0]} on {domain_name}", - domain_name - ) - return transition_domain - - # Are we updating data that already exists, - # or are we adding new data in its place? - organization_type_exists = ( - transition_domain.organization_type is not None - and transition_domain.organization_type.strip() != "" - ) - federal_type_exists = ( - transition_domain.federal_type is not None - and transition_domain.federal_type.strip() != "" - ) - - # If we get two records, then we know it is federal. - # needs to be lowercase for federal type - is_federal = domain_type_length == 2 - if is_federal: - new_federal_type = domain_type[1].strip() - transition_domain.organization_type = new_organization_type - transition_domain.federal_type = new_federal_type - else: - transition_domain.organization_type = new_organization_type - transition_domain.federal_type = None - - # Logs if we either added to this property, - # or modified it. - self._add_or_change_message( - EnumFilenames.DOMAIN_ADHOC, - "organization_type", - transition_domain.organization_type, - domain_name, - organization_type_exists, - ) - - self._add_or_change_message( - EnumFilenames.DOMAIN_ADHOC, - "federal_type", - transition_domain.federal_type, - domain_name, - federal_type_exists, - ) - - return transition_domain - - def parse_org_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain: - if not isinstance(transition_domain, TransitionDomain): - raise ValueError("Not a valid object, must be TransitionDomain") - - org_info = self.get_org_info(domain_name) - if org_info is None: - self.parse_logs.create_log_item( - EnumFilenames.ORGANIZATION_ADHOC, - LogCode.ERROR, - f"Could not add organization_name on {domain_name}, no data exists.", - domain_name - ) - return transition_domain - - desired_property_exists = ( - transition_domain.organization_name is not None - and transition_domain.organization_name.strip() != "" - ) - - transition_domain.organization_name = org_info.orgname - - # Logs if we either added to this property, - # or modified it. - self._add_or_change_message( - EnumFilenames.ORGANIZATION_ADHOC, - "organization_name", - transition_domain.organization_name, - domain_name, - desired_property_exists, - ) - - return transition_domain - - def _add_or_change_message( - self, file_type, var_name, changed_value, domain_name, is_update=False - ): - """Creates a log instance when a property - is successfully changed on a given TransitionDomain.""" - if not is_update: - self.parse_logs.create_log_item( - file_type, - LogCode.DEBUG, - f"Added {var_name} as '{changed_value}' on {domain_name}", - domain_name - ) - else: - self.parse_logs.create_log_item( - file_type, - LogCode.INFO, - f"Updated existing {var_name} to '{changed_value}' on {domain_name}", - domain_name - ) - - # Property getters, i.e. orgid or domaintypeid - def get_org_info(self, domain_name) -> OrganizationAdhoc: - domain_info = self.get_domain_data(domain_name) - if domain_info is None: - return None - org_id = domain_info.orgid - return self.get_organization_adhoc(org_id) - - def get_domain_type_info(self, domain_name) -> DomainTypeAdhoc: - domain_info = self.get_domain_data(domain_name) - if domain_info is None: - return None - type_id = domain_info.domaintypeid - return self.get_domain_adhoc(type_id) - - def get_agency_info(self, domain_name) -> AgencyAdhoc: - domain_info = self.get_domain_data(domain_name) - if domain_info is None: - return None - type_id = domain_info.orgid - return self.get_domain_adhoc(type_id) - - def get_authority_info(self, domain_name): - domain_info = self.get_domain_data(domain_name) - if domain_info is None: - return None - type_id = domain_info.authorityid - return self.get_authority_adhoc(type_id) - - # Object getters, i.e. DomainAdditionalData or OrganizationAdhoc - def get_domain_data(self, desired_id) -> DomainAdditionalData: - return self.get_object_by_id(EnumFilenames.DOMAIN_ADDITIONAL, desired_id) - - def get_organization_adhoc(self, desired_id) -> OrganizationAdhoc: - """Grabs adhoc information for organizations. Returns an organization - adhoc object. - """ - return self.get_object_by_id(EnumFilenames.ORGANIZATION_ADHOC, desired_id) - - def get_domain_adhoc(self, desired_id) -> DomainTypeAdhoc: - """""" - return self.get_object_by_id(EnumFilenames.DOMAIN_ADHOC, desired_id) - - def get_agency_adhoc(self, desired_id) -> AgencyAdhoc: - """""" - return self.get_object_by_id(EnumFilenames.AGENCY_ADHOC, desired_id) - - def get_authority_adhoc(self, desired_id) -> AuthorityAdhoc: - """""" - return self.get_object_by_id(EnumFilenames.AUTHORITY_ADHOC, desired_id) - - def get_object_by_id(self, file_type: EnumFilenames, desired_id): - """Returns a field in a dictionary based off the type and id. - - vars: - file_type: (constant) EnumFilenames -> Which data file to target. - An example would be `EnumFilenames.DOMAIN_ADHOC`. - - desired_id: str -> Which id you want to search on. - An example would be `"12"` or `"igorville.gov"` - - Explanation: - Each data file has an associated type (file_type) for tracking purposes. - - Each file_type is a dictionary which - contains a dictionary of row[id_field]: object. - - In practice, this would look like: - - EnumFilenames.AUTHORITY_ADHOC: { - "1": AuthorityAdhoc(...), - "2": AuthorityAdhoc(...), - ... - } - - desired_id will then specify which id to grab. If we wanted "1", - then this function will return the value of id "1". - So, `AuthorityAdhoc(...)` - """ - # Grabs a dict associated with the file_type. - # For example, EnumFilenames.DOMAIN_ADDITIONAL. - desired_type = self.domain_object.file_data.get(file_type) - if desired_type is None: - self.parse_logs.create_log_item( - file_type, LogCode.ERROR, f"Type {file_type} does not exist" - ) - return None - - # Grab the value given an Id within that file_type dict. - # For example, "igorville.gov". - obj = desired_type.data.get(desired_id) - if obj is None: - self.parse_logs.create_log_item( - file_type, LogCode.ERROR, f"Id {desired_id} does not exist" - ) - return obj diff --git a/src/registrar/management/commands/load_transition_domain.py b/src/registrar/management/commands/load_transition_domain.py index 3a08d70dd..496ef9c4f 100644 --- a/src/registrar/management/commands/load_transition_domain.py +++ b/src/registrar/management/commands/load_transition_domain.py @@ -16,7 +16,7 @@ from registrar.management.commands.utility.terminal_helper import ( ) from .utility.transition_domain_arguments import TransitionDomainArguments -from .utility.extra_transition_domain import LoadExtraTransitionDomain +from .utility.extra_transition_domain_helper import LoadExtraTransitionDomain logger = logging.getLogger(__name__) @@ -503,7 +503,7 @@ class Command(BaseCommand): # Prompt the user if they want to load additional data on the domains # TODO - add this logic into the core of this file arguments = TransitionDomainArguments(**options) - + do_parse_extra = TerminalHelper.prompt_for_execution( True, "./manage.py test", @@ -512,4 +512,3 @@ class Command(BaseCommand): if do_parse_extra: extra = LoadExtraTransitionDomain(arguments) extra_logs = extra.parse_logs.logs - diff --git a/src/registrar/management/commands/utility/epp_data_containers.py b/src/registrar/management/commands/utility/epp_data_containers.py index 42dbdebd5..36ec9e19a 100644 --- a/src/registrar/management/commands/utility/epp_data_containers.py +++ b/src/registrar/management/commands/utility/epp_data_containers.py @@ -14,6 +14,7 @@ from typing import List, Optional @dataclass class AgencyAdhoc: """Defines the structure given in the AGENCY_ADHOC file""" + agencyid: Optional[int] = None agencyname: Optional[str] = None active: Optional[str] = None @@ -23,6 +24,7 @@ class AgencyAdhoc: @dataclass class DomainAdditionalData: """Defines the structure given in the DOMAIN_ADDITIONAL file""" + domainname: Optional[str] = None domaintypeid: Optional[int] = None authorityid: Optional[int] = None @@ -35,6 +37,7 @@ class DomainAdditionalData: @dataclass class DomainTypeAdhoc: """Defines the structure given in the DOMAIN_ADHOC file""" + domaintypeid: Optional[int] = None domaintype: Optional[str] = None code: Optional[str] = None @@ -44,6 +47,7 @@ class DomainTypeAdhoc: @dataclass class OrganizationAdhoc: """Defines the structure given in the ORGANIZATION_ADHOC file""" + orgid: Optional[int] = None orgname: Optional[str] = None orgstreet: Optional[str] = None @@ -56,6 +60,7 @@ class OrganizationAdhoc: @dataclass class AuthorityAdhoc: """Defines the structure given in the AUTHORITY_ADHOC file""" + authorityid: Optional[int] = None firstname: Optional[str] = None middlename: Optional[str] = None @@ -65,9 +70,11 @@ class AuthorityAdhoc: agencyid: Optional[int] = None addlinfo: Optional[List[str]] = None + @dataclass class DomainEscrow: """Defines the structure given in the DOMAIN_ESCROW file""" + domainname: Optional[str] = None creationdate: Optional[date] = None expirationdate: Optional[date] = None diff --git a/src/registrar/management/commands/utility/extra_transition_domain.py b/src/registrar/management/commands/utility/extra_transition_domain_helper.py similarity index 79% rename from src/registrar/management/commands/utility/extra_transition_domain.py rename to src/registrar/management/commands/utility/extra_transition_domain_helper.py index 3489e55c1..7bf5dab10 100644 --- a/src/registrar/management/commands/utility/extra_transition_domain.py +++ b/src/registrar/management/commands/utility/extra_transition_domain_helper.py @@ -26,8 +26,10 @@ from .transition_domain_arguments import TransitionDomainArguments logger = logging.getLogger(__name__) + class LogCode(Enum): """Stores the desired log severity""" + ERROR = 1 WARNING = 2 INFO = 3 @@ -36,9 +38,9 @@ class LogCode(Enum): class FileTransitionLog: """Container for storing event logs. Used to lessen - the complexity of storing multiple logs across multiple - variables. - + the complexity of storing multiple logs across multiple + variables. + self.logs: dict -> { EnumFilenames.DOMAIN_ADHOC: List[LogItem], EnumFilenames.AGENCY_ADHOC: List[LogItem], @@ -46,16 +48,19 @@ class FileTransitionLog: EnumFilenames.DOMAIN_ADDITIONAL: List[LogItem], } """ + def __init__(self): self.logs = { EnumFilenames.DOMAIN_ADHOC: [], EnumFilenames.AGENCY_ADHOC: [], EnumFilenames.ORGANIZATION_ADHOC: [], EnumFilenames.DOMAIN_ADDITIONAL: [], + EnumFilenames.DOMAIN_ESCROW: [], } class LogItem: """Used for storing data about logger information.""" + def __init__(self, file_type, code, message, domain_name): self.file_type = file_type self.code = code @@ -65,7 +70,7 @@ class FileTransitionLog: def add_log(self, file_type, code, message, domain_name): """Adds a log item to self.logs - file_type -> Which array to add to, + file_type -> Which array to add to, ex. EnumFilenames.DOMAIN_ADHOC code -> Log severity or other metadata, ex. LogCode.ERROR @@ -74,7 +79,9 @@ class FileTransitionLog: """ self.logs[file_type].append(self.LogItem(file_type, code, message, domain_name)) - def create_log_item(self, file_type, code, message, domain_name=None, add_to_list=True): + def create_log_item( + self, file_type, code, message, domain_name=None, add_to_list=True + ): """Creates and returns an LogItem object. add_to_list: bool -> If enabled, add it to the logs array. @@ -118,7 +125,6 @@ class LoadExtraTransitionDomain: # transition domain object with that data. self.update_transition_domain_models() - def update_transition_domain_models(self): """Updates TransitionDomain objects based off the file content given in self.domain_object""" @@ -148,22 +154,62 @@ class LoadExtraTransitionDomain: ) self.parse_logs.display_logs(EnumFilenames.AGENCY_ADHOC) - # STEP 4: Parse expiration data - TODO - updated_transition_domain = self.parse_expiration_data( + # STEP 4: Parse creation and expiration data + updated_transition_domain = self.parse_creation_expiration_data( domain_name, transition_domain ) - # self.parse_logs(EnumFilenames.EXPIRATION_DATA) + self.parse_logs.display_logs(EnumFilenames.DOMAIN_ESCROW) updated_transition_domain.save() - # TODO - Implement once Niki gets her ticket in - def parse_expiration_data(self, domain_name, transition_domain): - """Grabs expiration_date from the parsed files and associates it + def parse_creation_expiration_data(self, domain_name, transition_domain): + """Grabs expiration_date from the parsed files and associates it with a transition_domain object, then returns that object.""" + if not isinstance(transition_domain, TransitionDomain): + raise ValueError("Not a valid object, must be TransitionDomain") + + info = self.get_domain_escrow_info(domain_name) + if info is None: + self.parse_logs.create_log_item( + EnumFilenames.DOMAIN_ESCROW, + LogCode.ERROR, + "Could not add epp_creation_date and epp_expiration_date " + f"on {domain_name}, no data exists.", + domain_name, + ) + return transition_domain + + creation_exists = ( + transition_domain.epp_creation_date is not None + ) + expiration_exists = ( + transition_domain.epp_expiration_date is not None + ) + + transition_domain.epp_creation_date = info.creationdate + transition_domain.epp_expiration_date = info.expirationdate + + # Logs if we either added to this property, + # or modified it. + self._add_or_change_message( + EnumFilenames.DOMAIN_ESCROW, + "epp_creation_date", + transition_domain.epp_creation_date, + domain_name, + creation_exists, + ) + self._add_or_change_message( + EnumFilenames.DOMAIN_ESCROW, + "epp_expiration_date", + transition_domain.epp_expiration_date, + domain_name, + expiration_exists, + ) + return transition_domain def parse_agency_data(self, domain_name, transition_domain) -> TransitionDomain: - """Grabs federal_agency from the parsed files and associates it + """Grabs federal_agency from the parsed files and associates it with a transition_domain object, then returns that object.""" if not isinstance(transition_domain, TransitionDomain): raise ValueError("Not a valid object, must be TransitionDomain") @@ -174,7 +220,7 @@ class LoadExtraTransitionDomain: EnumFilenames.AGENCY_ADHOC, LogCode.ERROR, f"Could not add federal_agency on {domain_name}, no data exists.", - domain_name + domain_name, ) return transition_domain @@ -188,19 +234,19 @@ class LoadExtraTransitionDomain: EnumFilenames.DOMAIN_ADHOC, LogCode.ERROR, f"Could not add inactive agency {info.agencyname} on {domain_name}", - domain_name + domain_name, ) return transition_domain - + if not info.isfederal.lower() == "y": self.parse_logs.create_log_item( EnumFilenames.DOMAIN_ADHOC, LogCode.ERROR, f"Could not add non-federal agency {info.agencyname} on {domain_name}", - domain_name + domain_name, ) return transition_domain - + transition_domain.federal_agency = info.agencyname # Logs if we either added to this property, @@ -210,13 +256,15 @@ class LoadExtraTransitionDomain: "federal_agency", transition_domain.federal_agency, domain_name, - agency_exists + agency_exists, ) return transition_domain - def parse_domain_type_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain: - """Grabs organization_type and federal_type from the parsed files + def parse_domain_type_data( + self, domain_name, transition_domain: TransitionDomain + ) -> TransitionDomain: + """Grabs organization_type and federal_type from the parsed files and associates it with a transition_domain object, then returns that object.""" if not isinstance(transition_domain, TransitionDomain): raise ValueError("Not a valid object, must be TransitionDomain") @@ -227,7 +275,7 @@ class LoadExtraTransitionDomain: EnumFilenames.DOMAIN_ADHOC, LogCode.ERROR, f"Could not add domain_type on {domain_name}, no data exists.", - domain_name + domain_name, ) return transition_domain @@ -249,7 +297,7 @@ class LoadExtraTransitionDomain: EnumFilenames.DOMAIN_ADHOC, LogCode.ERROR, f"Could not add inactive domain_type {domain_type[0]} on {domain_name}", - domain_name + domain_name, ) return transition_domain @@ -295,8 +343,10 @@ class LoadExtraTransitionDomain: return transition_domain - def parse_org_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain: - """Grabs organization_name from the parsed files and associates it + def parse_org_data( + self, domain_name, transition_domain: TransitionDomain + ) -> TransitionDomain: + """Grabs organization_name from the parsed files and associates it with a transition_domain object, then returns that object.""" if not isinstance(transition_domain, TransitionDomain): raise ValueError("Not a valid object, must be TransitionDomain") @@ -307,7 +357,7 @@ class LoadExtraTransitionDomain: EnumFilenames.ORGANIZATION_ADHOC, LogCode.ERROR, f"Could not add organization_name on {domain_name}, no data exists.", - domain_name + domain_name, ) return transition_domain @@ -340,19 +390,19 @@ class LoadExtraTransitionDomain: file_type, LogCode.DEBUG, f"Added {var_name} as '{changed_value}' on {domain_name}", - domain_name + domain_name, ) else: self.parse_logs.create_log_item( file_type, LogCode.INFO, f"Updated existing {var_name} to '{changed_value}' on {domain_name}", - domain_name + domain_name, ) # Property getters, i.e. orgid or domaintypeid def get_org_info(self, domain_name) -> OrganizationAdhoc: - """Maps an id given in get_domain_data to a organization_adhoc + """Maps an id given in get_domain_data to a organization_adhoc record which has its corresponding definition""" domain_info = self.get_domain_data(domain_name) if domain_info is None: @@ -361,7 +411,7 @@ class LoadExtraTransitionDomain: return self.get_organization_adhoc(org_id) def get_domain_type_info(self, domain_name) -> DomainTypeAdhoc: - """Maps an id given in get_domain_data to a domain_type_adhoc + """Maps an id given in get_domain_data to a domain_type_adhoc record which has its corresponding definition""" domain_info = self.get_domain_data(domain_name) if domain_info is None: @@ -370,23 +420,30 @@ class LoadExtraTransitionDomain: return self.get_domain_adhoc(type_id) def get_agency_info(self, domain_name) -> AgencyAdhoc: - """Maps an id given in get_domain_data to a agency_adhoc + """Maps an id given in get_domain_data to a agency_adhoc record which has its corresponding definition""" domain_info = self.get_domain_data(domain_name) if domain_info is None: return None type_id = domain_info.orgid return self.get_domain_adhoc(type_id) - + def get_authority_info(self, domain_name): - """Maps an id given in get_domain_data to a authority_adhoc + """Maps an id given in get_domain_data to a authority_adhoc record which has its corresponding definition""" domain_info = self.get_domain_data(domain_name) if domain_info is None: return None type_id = domain_info.authorityid return self.get_authority_adhoc(type_id) - + + def get_domain_escrow_info(self, domain_name): + domain_info = self.get_domain_data(domain_name) + if domain_info is None: + return None + type_id = domain_info.domainname + return self.get_domain_escrow(type_id) + # Object getters, i.e. DomainAdditionalData or OrganizationAdhoc def get_domain_data(self, desired_id) -> DomainAdditionalData: """Grabs a corresponding row within the DOMAIN_ADDITIONAL file, @@ -407,11 +464,16 @@ class LoadExtraTransitionDomain: """Grabs a corresponding row within the AGENCY_ADHOC file, based off a desired_id""" return self.get_object_by_id(EnumFilenames.AGENCY_ADHOC, desired_id) - + def get_authority_adhoc(self, desired_id) -> AuthorityAdhoc: """Grabs a corresponding row within the AUTHORITY_ADHOC file, based off a desired_id""" return self.get_object_by_id(EnumFilenames.AUTHORITY_ADHOC, desired_id) + + def get_domain_escrow(self, desired_id) -> DomainEscrow: + """Grabs a corresponding row within the DOMAIN_ESCROW file, + based off a desired_id""" + return self.get_object_by_id(EnumFilenames.DOMAIN_ESCROW, desired_id) def get_object_by_id(self, file_type: EnumFilenames, desired_id): """Returns a field in a dictionary based off the type and id. @@ -420,23 +482,23 @@ class LoadExtraTransitionDomain: file_type: (constant) EnumFilenames -> Which data file to target. An example would be `EnumFilenames.DOMAIN_ADHOC`. - desired_id: str -> Which id you want to search on. + desired_id: str -> Which id you want to search on. An example would be `"12"` or `"igorville.gov"` - + Explanation: Each data file has an associated type (file_type) for tracking purposes. - Each file_type is a dictionary which + Each file_type is a dictionary which contains a dictionary of row[id_field]: object. In practice, this would look like: - EnumFilenames.AUTHORITY_ADHOC: { + EnumFilenames.AUTHORITY_ADHOC: { "1": AuthorityAdhoc(...), "2": AuthorityAdhoc(...), ... } - + desired_id will then specify which id to grab. If we wanted "1", then this function will return the value of id "1". So, `AuthorityAdhoc(...)` @@ -450,7 +512,7 @@ class LoadExtraTransitionDomain: ) return None - # Grab the value given an Id within that file_type dict. + # Grab the value given an Id within that file_type dict. # For example, "igorville.gov". obj = desired_type.data.get(desired_id) if obj is None: @@ -474,9 +536,12 @@ class PatternMap: data_type: type -> Metadata about the desired type for data. id_field: str -> Defines which field should act as the id in data. - - data: dict -> The returned data. Intended to be used with data_type - to cross-reference. + This is necessary as we store lists of "data_type" in ExtraTransitionDomain as follows: + { + id_field: data_type(...), + id_field: data_type(...), + ... + } """ @@ -487,15 +552,27 @@ class PatternMap: data_type: type, id_field: str, ): + # Metadata # + ## Filename inference metadata ## self.regex = regex - self.data_type = data_type - self.id_field = id_field - self.data = {} - self.filename = filename self.could_infer = False + ## "data" object metadata ## + ### Where the data is sourced from ### + self.filename = filename + + ### What type the data is ### + self.data_type = data_type + + ### What the id should be in the holding dict ### + self.id_field = id_field + + # Object data # + self.data = {} + self.logs = [] + def try_infer_filename(self, current_file_name, default_file_name): - """Tries to match a given filename to a regex, + """Tries to match a given filename to a regex, then uses that match to generate the filename.""" # returns (filename, inferred_successfully) return self._infer_filename(self.regex, current_file_name, default_file_name) @@ -503,21 +580,36 @@ class PatternMap: def _infer_filename(self, regex: re.Pattern, matched_file_name, default_file_name): if not isinstance(regex, re.Pattern): return (self.filename, False) - + match = regex.match(matched_file_name) - + if not match: return (self.filename, False) + total_groups = len(match.groups()) + + # If no matches exist or if we have too many + # matches, then we shouldn't infer + if total_groups == 0 or total_groups > 2: + return (self.filename, False) + + # If only one match is returned, + # it means that our default matches our request + if total_groups == 1: + return (self.filename, True) + + # Otherwise, if two are returned, then + # its likely the pattern we want date = match.group(1) filename_without_date = match.group(2) - # Can the supplied self.regex do a match on the filename? + # After stripping out the date, + # do the two filenames match? can_infer = filename_without_date == default_file_name if not can_infer: return (self.filename, False) - # If so, note that and return the inferred name + # If they do, recreate the filename and return it full_filename = date + "." + filename_without_date return (full_filename, can_infer) @@ -525,6 +617,7 @@ class PatternMap: class ExtraTransitionDomain: """Helper class to aid in storing TransitionDomain data spread across multiple files.""" + strip_date_regex = re.compile(r"(?:.*\/)?(\d+)\.(.+)") def __init__( @@ -545,27 +638,57 @@ class ExtraTransitionDomain: self.seperator = sep self.all_files = glob.glob(f"{directory}*") + # Create a set with filenames as keys for quick lookup self.all_files_set = {os.path.basename(file) for file in self.all_files} - # Used for a container of values at each filename. + # Used for a container of values at each filename. # Instead of tracking each in a seperate variable, we can declare # metadata about each file and associate it with an enum. # That way if we want the data located at the agency_adhoc file, # we can just call EnumFilenames.AGENCY_ADHOC. pattern_map_params = [ - (EnumFilenames.AGENCY_ADHOC, agency_adhoc_filename, AgencyAdhoc, "agencyid"), - (EnumFilenames.DOMAIN_ADDITIONAL, domain_additional_filename, DomainAdditionalData, "domainname"), - (EnumFilenames.DOMAIN_ESCROW, domain_escrow_filename, DomainEscrow, "domainname"), - (EnumFilenames.DOMAIN_ADHOC, domain_adhoc_filename, DomainTypeAdhoc, "domaintypeid"), - (EnumFilenames.ORGANIZATION_ADHOC, organization_adhoc_filename, OrganizationAdhoc, "orgid"), - (EnumFilenames.AUTHORITY_ADHOC, authority_adhoc_filename, AuthorityAdhoc, "authorityid"), + ( + EnumFilenames.AGENCY_ADHOC, + agency_adhoc_filename, + AgencyAdhoc, + "agencyid", + ), + ( + EnumFilenames.DOMAIN_ADDITIONAL, + domain_additional_filename, + DomainAdditionalData, + "domainname", + ), + ( + EnumFilenames.DOMAIN_ESCROW, + domain_escrow_filename, + DomainEscrow, + "domainname", + ), + ( + EnumFilenames.DOMAIN_ADHOC, + domain_adhoc_filename, + DomainTypeAdhoc, + "domaintypeid", + ), + ( + EnumFilenames.ORGANIZATION_ADHOC, + organization_adhoc_filename, + OrganizationAdhoc, + "orgid", + ), + ( + EnumFilenames.AUTHORITY_ADHOC, + authority_adhoc_filename, + AuthorityAdhoc, + "authorityid", + ), ] self.file_data = self.populate_file_data(pattern_map_params) - + def populate_file_data( - self, - pattern_map_params: List[Tuple[EnumFilenames, str, type, str]] + self, pattern_map_params: List[Tuple[EnumFilenames, str, type, str]] ): """Populates the self.file_data field given a set of tuple params. @@ -574,10 +697,10 @@ class ExtraTransitionDomain: [ (field_type, filename, data_type, id_field), ] - + vars: file_type (EnumFilenames) -> The name of the dictionary. - Defined as a value on EnumFilenames, such as + Defined as a value on EnumFilenames, such as EnumFilenames.AGENCY_ADHOC filename (str) -> The filepath of the given @@ -635,10 +758,14 @@ class ExtraTransitionDomain: logger.error(f"Could not find file: {filename}") continue - logger.warning( - "Attempting to infer filename" - f" for file: {filename}." - ) + # Infer filename logic # + # This mode is used for development and testing only. Rather than having + # to manually define the filename each time, we can infer what the filename + # actually is. + + # Not intended for use outside of that, as it is better to assume + # the end-user wants to be specific. + logger.warning("Attempting to infer filename" f" for file: {filename}.") for filename in self.all_files: default_name = name.value[1] match = value.try_infer_filename(filename, default_name) @@ -666,7 +793,9 @@ class ExtraTransitionDomain: file_type: PatternMap = item file_type.data = {} - def parse_csv_file(self, file, seperator, dataclass_type, id_field, is_domain_escrow=False): + def parse_csv_file( + self, file, seperator, dataclass_type, id_field, is_domain_escrow=False + ): # Domain escrow is an edge case if is_domain_escrow: return self._read_domain_escrow(file, seperator) @@ -684,11 +813,9 @@ class ExtraTransitionDomain: # TODO - add error handling creation_date = datetime.strptime(row[8], date_format) expiration_date = datetime.strptime(row[10], date_format) - + dict_data[domain_name] = DomainEscrow( - domain_name, - creation_date, - expiration_date + domain_name, creation_date, expiration_date ) return dict_data @@ -704,10 +831,10 @@ class ExtraTransitionDomain: for row in reader: if None in row: print("Skipping row with None key") - #for key, value in row.items(): - #print(f"key: {key} value: {value}") + # for key, value in row.items(): + # print(f"key: {key} value: {value}") continue row_id = row[id_field] dict_data[row_id] = dataclass_type(**row) - #dict_data = {row[id_field]: dataclass_type(**row) for row in reader} - return dict_data \ No newline at end of file + # dict_data = {row[id_field]: dataclass_type(**row) for row in reader} + return dict_data diff --git a/src/registrar/management/commands/utility/transition_domain_arguments.py b/src/registrar/management/commands/utility/transition_domain_arguments.py index b699ee5d6..459eab6d3 100644 --- a/src/registrar/management/commands/utility/transition_domain_arguments.py +++ b/src/registrar/management/commands/utility/transition_domain_arguments.py @@ -1,29 +1,27 @@ - - class TransitionDomainArguments: """Stores arguments for load_transition_domain""" def __init__(self, **options): # Settings # - self.directory = options.get('directory') - self.sep = options.get('sep') - self.limitParse = options.get('limitParse') - + self.directory = options.get("directory") + self.sep = options.get("sep") + self.limitParse = options.get("limitParse") + # Filenames # ## Adhocs ## - self.agency_adhoc_filename = options.get('agency_adhoc_filename') - self.domain_adhoc_filename = options.get('domain_adhoc_filename') - self.organization_adhoc_filename = options.get('organization_adhoc_filename') + self.agency_adhoc_filename = options.get("agency_adhoc_filename") + self.domain_adhoc_filename = options.get("domain_adhoc_filename") + self.organization_adhoc_filename = options.get("organization_adhoc_filename") ## Data files ## - self.domain_additional_filename = options.get('domain_additional_filename') - self.domain_contacts_filename = options.get('domain_contacts_filename') - self.domain_statuses_filename = options.get('domain_statuses_filename') + self.domain_additional_filename = options.get("domain_additional_filename") + self.domain_contacts_filename = options.get("domain_contacts_filename") + self.domain_statuses_filename = options.get("domain_statuses_filename") # Flags # - self.debug = options.get('debug') - self.resetTable = options.get('resetTable') - + self.debug = options.get("debug") + self.resetTable = options.get("resetTable") + def args_extra_transition_domain(self): return { "agency_adhoc_filename": self.agency_adhoc_filename, @@ -32,4 +30,4 @@ class TransitionDomainArguments: "domain_additional_filename": self.domain_additional_filename, "directory": self.directory, "sep": self.sep, - } \ No newline at end of file + } diff --git a/src/registrar/migrations/0044_transitiondomain_federal_agency_and_more.py b/src/registrar/migrations/0044_transitiondomain_federal_agency_and_more.py index 4de69695b..838a26cd4 100644 --- a/src/registrar/migrations/0044_transitiondomain_federal_agency_and_more.py +++ b/src/registrar/migrations/0044_transitiondomain_federal_agency_and_more.py @@ -38,4 +38,20 @@ class Migration(migrations.Migration): blank=True, db_index=True, help_text="Organization name", null=True ), ), + migrations.AddField( + model_name="transitiondomain", + name="epp_creation_date", + field=models.DateField( + help_text="Duplication of registry's creation date saved for ease of reporting", + null=True, + ), + ), + migrations.AddField( + model_name="transitiondomain", + name="epp_expiration_date", + field=models.DateField( + help_text="Duplication of registry's expiration date saved for ease of reporting", + null=True, + ), + ), ] diff --git a/src/registrar/models/transition_domain.py b/src/registrar/models/transition_domain.py index aca80881c..a719defe1 100644 --- a/src/registrar/models/transition_domain.py +++ b/src/registrar/models/transition_domain.py @@ -65,6 +65,18 @@ class TransitionDomain(TimeStampedModel): blank=True, help_text="Federal agency", ) + epp_creation_date = models.DateField( + null=True, + help_text=( + "Duplication of registry's creation " "date saved for ease of reporting" + ), + ) + epp_expiration_date = models.DateField( + null=True, + help_text=( + "Duplication of registry's expiration " "date saved for ease of reporting" + ), + ) def __str__(self): return (