mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-07-23 11:16:07 +02:00
Finish parsing for most fields minus expiration
This commit is contained in:
parent
cb4db4f71a
commit
c9537ed7f5
3 changed files with 32 additions and 21 deletions
|
@ -89,8 +89,7 @@ class FileTransitionLog:
|
|||
for log in self.logs.get(file_type):
|
||||
match log.code:
|
||||
case LogCode.ERROR:
|
||||
if log.domain_name is None:
|
||||
logger.error(log.message)
|
||||
logger.error(log.message)
|
||||
case LogCode.WARNING:
|
||||
logger.warning(log.message)
|
||||
case LogCode.INFO:
|
||||
|
@ -149,11 +148,9 @@ class Command(BaseCommand):
|
|||
all_transition_domains = TransitionDomain.objects.all()
|
||||
if not all_transition_domains.exists():
|
||||
raise Exception("No TransitionDomain objects exist.")
|
||||
|
||||
for transition_domain in all_transition_domains:
|
||||
domain_name = transition_domain.domain_name
|
||||
domain_name = transition_domain.domain_name.upper()
|
||||
updated_transition_domain = transition_domain
|
||||
|
||||
# STEP 1: Parse organization data
|
||||
updated_transition_domain = self.parse_org_data(
|
||||
domain_name, transition_domain
|
||||
|
@ -166,7 +163,7 @@ class Command(BaseCommand):
|
|||
)
|
||||
self.parse_logs.display_logs(EnumFilenames.DOMAIN_ADHOC)
|
||||
|
||||
# STEP 3: Parse agency data - TODO
|
||||
# STEP 3: Parse agency data
|
||||
updated_transition_domain = self.parse_agency_data(
|
||||
domain_name, transition_domain
|
||||
)
|
||||
|
@ -257,8 +254,9 @@ class Command(BaseCommand):
|
|||
# For all other records, it is stored as so: Interstate
|
||||
# We can infer if it is federal or not based on this fact.
|
||||
domain_type = info.domaintype.split("-")
|
||||
if domain_type.count != 1 or domain_type.count != 2:
|
||||
raise ValueError("Found invalid data in DOMAIN_ADHOC")
|
||||
domain_type_length = len(domain_type)
|
||||
if domain_type_length < 1 or domain_type_length > 2:
|
||||
raise ValueError("Found invalid data on DOMAIN_ADHOC")
|
||||
|
||||
# Then, just grab the organization type.
|
||||
new_organization_type = domain_type[0].strip()
|
||||
|
@ -276,9 +274,9 @@ class Command(BaseCommand):
|
|||
|
||||
# Are we updating data that already exists,
|
||||
# or are we adding new data in its place?
|
||||
federal_agency_exists = (
|
||||
organization_type_exists = (
|
||||
transition_domain.organization_type is not None
|
||||
and transition_domain.federal_agency.strip() != ""
|
||||
and transition_domain.organization_type.strip() != ""
|
||||
)
|
||||
federal_type_exists = (
|
||||
transition_domain.federal_type is not None
|
||||
|
@ -287,7 +285,7 @@ class Command(BaseCommand):
|
|||
|
||||
# If we get two records, then we know it is federal.
|
||||
# needs to be lowercase for federal type
|
||||
is_federal = domain_type.count() == 2
|
||||
is_federal = domain_type_length == 2
|
||||
if is_federal:
|
||||
new_federal_type = domain_type[1].strip()
|
||||
transition_domain.organization_type = new_organization_type
|
||||
|
@ -300,10 +298,10 @@ class Command(BaseCommand):
|
|||
# or modified it.
|
||||
self._add_or_change_message(
|
||||
EnumFilenames.DOMAIN_ADHOC,
|
||||
"federal_agency",
|
||||
transition_domain.federal_agency,
|
||||
"organization_type",
|
||||
transition_domain.organization_type,
|
||||
domain_name,
|
||||
federal_agency_exists,
|
||||
organization_type_exists,
|
||||
)
|
||||
|
||||
self._add_or_change_message(
|
||||
|
@ -358,7 +356,7 @@ class Command(BaseCommand):
|
|||
self.parse_logs.create_log_item(
|
||||
file_type,
|
||||
LogCode.DEBUG,
|
||||
f"Added {file_type} as '{var_name}' on {domain_name}",
|
||||
f"Added {var_name} as '{changed_value}' on {domain_name}",
|
||||
domain_name
|
||||
)
|
||||
else:
|
||||
|
|
|
@ -7,7 +7,7 @@ By keeping it as a dataclass instead of a dictionary, we can maintain data consi
|
|||
"""
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -26,7 +26,7 @@ class DomainAdditionalData:
|
|||
domaintypeid: Optional[int] = None
|
||||
authorityid: Optional[int] = None
|
||||
orgid: Optional[int] = None
|
||||
securitycontact_email: Optional[str] = None
|
||||
securitycontactemail: Optional[str] = None
|
||||
dnsseckeymonitor: Optional[str] = None
|
||||
domainpurpose: Optional[str] = None
|
||||
|
||||
|
@ -62,7 +62,7 @@ class AuthorityAdhoc:
|
|||
email: Optional[str] = None
|
||||
phonenumber: Optional[str] = None
|
||||
agencyid: Optional[int] = None
|
||||
addlinfo: Optional[str] = None
|
||||
addlinfo: Optional[List[str]] = None
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -146,8 +146,8 @@ class ExtraTransitionDomain:
|
|||
"""
|
||||
self.clear_file_data()
|
||||
for name, value in self.file_data.items():
|
||||
filename = f"{value.filename}"
|
||||
|
||||
filename = f"{value.filename}"
|
||||
if filename in self.all_files_set:
|
||||
_file = f"{self.directory}{value.filename}"
|
||||
value.data = self._read_csv_file(
|
||||
|
@ -194,6 +194,19 @@ class ExtraTransitionDomain:
|
|||
def _read_csv_file(self, file, seperator, dataclass_type, id_field):
|
||||
with open(file, "r", encoding="utf-8-sig") as requested_file:
|
||||
reader = csv.DictReader(requested_file, delimiter=seperator)
|
||||
dict_data = {row[id_field]: dataclass_type(**row) for row in reader}
|
||||
logger.debug(f"it is finally here {dict_data}")
|
||||
"""
|
||||
for row in reader:
|
||||
print({key: type(key) for key in row.keys()}) # print out the keys and their types
|
||||
test = {row[id_field]: dataclass_type(**row)}
|
||||
"""
|
||||
dict_data = {}
|
||||
for row in reader:
|
||||
if None in row:
|
||||
print("Skipping row with None key")
|
||||
#for key, value in row.items():
|
||||
#print(f"key: {key} value: {value}")
|
||||
continue
|
||||
row_id = row[id_field]
|
||||
dict_data[row_id] = dataclass_type(**row)
|
||||
#dict_data = {row[id_field]: dataclass_type(**row) for row in reader}
|
||||
return dict_data
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue