mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-05-19 02:49:21 +02:00
Black linting
This commit is contained in:
parent
cfaafb8ef3
commit
8aa840f427
10 changed files with 73 additions and 234 deletions
|
@ -45,10 +45,7 @@ class Command(BaseCommand):
|
|||
"""
|
||||
parser.add_argument(
|
||||
"migration_json_filename",
|
||||
help=(
|
||||
"A JSON file that holds the location and filenames"
|
||||
"of all the data files used for migrations"
|
||||
),
|
||||
help=("A JSON file that holds the location and filenames" "of all the data files used for migrations"),
|
||||
)
|
||||
|
||||
parser.add_argument("--sep", default="|", help="Delimiter character")
|
||||
|
@ -73,9 +70,7 @@ class Command(BaseCommand):
|
|||
"Recommended to be enabled only in a development or testing setting.",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--directory", default="migrationdata", help="Desired directory"
|
||||
)
|
||||
parser.add_argument("--directory", default="migrationdata", help="Desired directory")
|
||||
parser.add_argument(
|
||||
"--domain_contacts_filename",
|
||||
help="Data file with domain contact information",
|
||||
|
@ -119,9 +114,7 @@ class Command(BaseCommand):
|
|||
help="Defines the filename for domain type adhocs",
|
||||
)
|
||||
|
||||
def print_debug_mode_statements(
|
||||
self, debug_on: bool, debug_max_entries_to_parse: int
|
||||
):
|
||||
def print_debug_mode_statements(self, debug_on: bool, debug_max_entries_to_parse: int):
|
||||
"""Prints additional terminal statements to indicate if --debug
|
||||
or --limitParse are in use"""
|
||||
if debug_on:
|
||||
|
@ -356,42 +349,31 @@ class Command(BaseCommand):
|
|||
debug_on = args.debug
|
||||
|
||||
# Get --LimitParse argument
|
||||
debug_max_entries_to_parse = int(
|
||||
args.limitParse
|
||||
) # set to 0 to parse all entries
|
||||
debug_max_entries_to_parse = int(args.limitParse) # set to 0 to parse all entries
|
||||
|
||||
# Variables for Additional TransitionDomain Information #
|
||||
|
||||
# Main script filenames - these do not have defaults
|
||||
domain_contacts_filename = None
|
||||
try:
|
||||
domain_contacts_filename = directory + options.get(
|
||||
"domain_contacts_filename"
|
||||
)
|
||||
domain_contacts_filename = directory + options.get("domain_contacts_filename")
|
||||
except TypeError:
|
||||
logger.error(
|
||||
f"Invalid filename of '{args.domain_contacts_filename}'"
|
||||
" was provided for domain_contacts_filename"
|
||||
f"Invalid filename of '{args.domain_contacts_filename}'" " was provided for domain_contacts_filename"
|
||||
)
|
||||
|
||||
contacts_filename = None
|
||||
try:
|
||||
contacts_filename = directory + options.get("contacts_filename")
|
||||
except TypeError:
|
||||
logger.error(
|
||||
f"Invalid filename of '{args.contacts_filename}'"
|
||||
" was provided for contacts_filename"
|
||||
)
|
||||
logger.error(f"Invalid filename of '{args.contacts_filename}'" " was provided for contacts_filename")
|
||||
|
||||
domain_statuses_filename = None
|
||||
try:
|
||||
domain_statuses_filename = directory + options.get(
|
||||
"domain_statuses_filename"
|
||||
)
|
||||
domain_statuses_filename = directory + options.get("domain_statuses_filename")
|
||||
except TypeError:
|
||||
logger.error(
|
||||
f"Invalid filename of '{args.domain_statuses_filename}'"
|
||||
" was provided for domain_statuses_filename"
|
||||
f"Invalid filename of '{args.domain_statuses_filename}'" " was provided for domain_statuses_filename"
|
||||
)
|
||||
|
||||
# Agency information
|
||||
|
@ -630,9 +612,7 @@ class Command(BaseCommand):
|
|||
|
||||
# Print a summary of findings (duplicate entries,
|
||||
# missing data..etc.)
|
||||
self.print_summary_duplications(
|
||||
duplicate_domain_user_combos, duplicate_domains, users_without_email
|
||||
)
|
||||
self.print_summary_duplications(duplicate_domain_user_combos, duplicate_domains, users_without_email)
|
||||
self.print_summary_status_findings(domains_without_status, outlier_statuses)
|
||||
|
||||
logger.info(
|
||||
|
|
|
@ -94,10 +94,7 @@ class Command(BaseCommand):
|
|||
parser.add_argument(
|
||||
"--migrationJSON",
|
||||
default="migrationFilepaths.json",
|
||||
help=(
|
||||
"A JSON file that holds the location and filenames"
|
||||
"of all the data files used for migrations"
|
||||
),
|
||||
help=("A JSON file that holds the location and filenames" "of all the data files used for migrations"),
|
||||
)
|
||||
|
||||
# TODO: deprecate this once JSON module is done? (or keep as an override)
|
||||
|
|
|
@ -42,9 +42,7 @@ class Command(BaseCommand):
|
|||
# ======================================================
|
||||
# ===================== PRINTING ======================
|
||||
# ======================================================
|
||||
def print_debug_mode_statements(
|
||||
self, debug_on: bool, debug_max_entries_to_parse: int
|
||||
):
|
||||
def print_debug_mode_statements(self, debug_on: bool, debug_max_entries_to_parse: int):
|
||||
"""Prints additional terminal statements to indicate if --debug
|
||||
or --limitParse are in use"""
|
||||
TerminalHelper.print_conditional(
|
||||
|
@ -66,13 +64,8 @@ class Command(BaseCommand):
|
|||
""",
|
||||
)
|
||||
|
||||
def parse_limit_reached(
|
||||
self, debug_max_entries_to_parse: bool, total_rows_parsed: int
|
||||
) -> bool:
|
||||
if (
|
||||
debug_max_entries_to_parse > 0
|
||||
and total_rows_parsed >= debug_max_entries_to_parse
|
||||
):
|
||||
def parse_limit_reached(self, debug_max_entries_to_parse: bool, total_rows_parsed: int) -> bool:
|
||||
if debug_max_entries_to_parse > 0 and total_rows_parsed >= debug_max_entries_to_parse:
|
||||
logger.info(
|
||||
f"""{TerminalColors.YELLOW}
|
||||
----PARSE LIMIT REACHED. HALTING PARSER.----
|
||||
|
@ -159,9 +152,7 @@ class Command(BaseCommand):
|
|||
# ======================================================
|
||||
# =================== DOMAIN =====================
|
||||
# ======================================================
|
||||
def update_or_create_domain(
|
||||
self, transition_domain: TransitionDomain, debug_on: bool
|
||||
):
|
||||
def update_or_create_domain(self, transition_domain: TransitionDomain, debug_on: bool):
|
||||
"""Given a transition domain, either finds & updates an existing
|
||||
corresponding domain, or creates a new corresponding domain in
|
||||
the Domain table.
|
||||
|
@ -260,9 +251,7 @@ class Command(BaseCommand):
|
|||
)
|
||||
return (target_domain, True)
|
||||
|
||||
def update_domain_status(
|
||||
self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool
|
||||
) -> bool:
|
||||
def update_domain_status(self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool) -> bool:
|
||||
"""Given a transition domain that matches an existing domain,
|
||||
updates the existing domain object with that status of
|
||||
the transition domain.
|
||||
|
@ -293,9 +282,7 @@ class Command(BaseCommand):
|
|||
# ======================================================
|
||||
# ================ DOMAIN INVITATION ==================
|
||||
# ======================================================
|
||||
def try_add_domain_invitation(
|
||||
self, domain_email: str, associated_domain: Domain
|
||||
) -> DomainInvitation | None:
|
||||
def try_add_domain_invitation(self, domain_email: str, associated_domain: Domain) -> DomainInvitation | None:
|
||||
"""If no domain invitation exists for the given domain and
|
||||
e-mail, create and return a new domain invitation object.
|
||||
If one already exists, or if the email is invalid, return NONE"""
|
||||
|
@ -334,17 +321,11 @@ class Command(BaseCommand):
|
|||
# ======================================================
|
||||
# ================ DOMAIN INFORMATION =================
|
||||
# ======================================================
|
||||
def update_domain_information(
|
||||
self, current: DomainInformation, target: DomainInformation, debug_on: bool
|
||||
) -> bool:
|
||||
def update_domain_information(self, current: DomainInformation, target: DomainInformation, debug_on: bool) -> bool:
|
||||
# DEBUG:
|
||||
TerminalHelper.print_conditional(
|
||||
debug_on,
|
||||
(
|
||||
f"{TerminalColors.OKCYAN}"
|
||||
f"Updating: {current}"
|
||||
f"{TerminalColors.ENDC}"
|
||||
), # noqa
|
||||
(f"{TerminalColors.OKCYAN}" f"Updating: {current}" f"{TerminalColors.ENDC}"), # noqa
|
||||
)
|
||||
|
||||
updated = False
|
||||
|
@ -466,15 +447,11 @@ class Command(BaseCommand):
|
|||
debug_on,
|
||||
)
|
||||
target_domain_information = None
|
||||
domain_information_exists = DomainInformation.objects.filter(
|
||||
domain__name=transition_domain_name
|
||||
).exists()
|
||||
domain_information_exists = DomainInformation.objects.filter(domain__name=transition_domain_name).exists()
|
||||
if domain_information_exists:
|
||||
try:
|
||||
# get the existing domain information object
|
||||
target_domain_information = DomainInformation.objects.get(
|
||||
domain__name=transition_domain_name
|
||||
)
|
||||
target_domain_information = DomainInformation.objects.get(domain__name=transition_domain_name)
|
||||
# DEBUG:
|
||||
TerminalHelper.print_conditional(
|
||||
debug_on,
|
||||
|
@ -488,9 +465,7 @@ class Command(BaseCommand):
|
|||
|
||||
# for existing entry, update the status to
|
||||
# the transition domain status
|
||||
self.update_domain_information(
|
||||
target_domain_information, template_domain_information, debug_on
|
||||
)
|
||||
self.update_domain_information(target_domain_information, template_domain_information, debug_on)
|
||||
# TODO: not all domains need to be updated
|
||||
# (the information is the same).
|
||||
# Need to bubble this up to the final report.
|
||||
|
@ -560,9 +535,7 @@ class Command(BaseCommand):
|
|||
if target_domain_information is None:
|
||||
# ---------------- SKIPPED ----------------
|
||||
skipped_domain_information_entries.append(target_domain_information)
|
||||
debug_string = (
|
||||
f"skipped domain information: {target_domain_information}"
|
||||
)
|
||||
debug_string = f"skipped domain information: {target_domain_information}"
|
||||
elif was_created:
|
||||
# DEBUG:
|
||||
TerminalHelper.print_conditional(
|
||||
|
@ -577,11 +550,7 @@ class Command(BaseCommand):
|
|||
# The unique key constraint does not allow multiple domain
|
||||
# information objects to share the same domain
|
||||
existing_domain_information_in_to_create = next(
|
||||
(
|
||||
x
|
||||
for x in domain_information_to_create
|
||||
if x.domain.name == target_domain_information.domain.name
|
||||
),
|
||||
(x for x in domain_information_to_create if x.domain.name == target_domain_information.domain.name),
|
||||
None,
|
||||
)
|
||||
# TODO: this is redundant.
|
||||
|
@ -590,10 +559,7 @@ class Command(BaseCommand):
|
|||
existing_domain_info = DomainInformation.objects.filter(
|
||||
domain__name=target_domain_information.domain.name
|
||||
).exists()
|
||||
if (
|
||||
existing_domain_information_in_to_create is not None
|
||||
or existing_domain_info
|
||||
):
|
||||
if existing_domain_information_in_to_create is not None or existing_domain_info:
|
||||
debug_string = f"""{TerminalColors.YELLOW}
|
||||
Duplicate Detected: {existing_domain_information_in_to_create}.
|
||||
Cannot add duplicate Domain Information object
|
||||
|
@ -601,15 +567,11 @@ class Command(BaseCommand):
|
|||
else:
|
||||
# ---------------- CREATED ----------------
|
||||
domain_information_to_create.append(target_domain_information)
|
||||
debug_string = (
|
||||
f"created domain information: {target_domain_information}"
|
||||
)
|
||||
debug_string = f"created domain information: {target_domain_information}"
|
||||
elif not was_created:
|
||||
# ---------------- UPDATED ----------------
|
||||
updated_domain_information.append(target_domain_information)
|
||||
debug_string = (
|
||||
f"updated domain information: {target_domain_information}"
|
||||
)
|
||||
debug_string = f"updated domain information: {target_domain_information}"
|
||||
else:
|
||||
debug_string = "domain information already exists and "
|
||||
f"matches incoming data (NO CHANGES MADE): {target_domain_information}"
|
||||
|
@ -664,9 +626,7 @@ class Command(BaseCommand):
|
|||
|
||||
# ======================================================
|
||||
# ====================== DOMAIN =======================
|
||||
target_domain, was_created = self.update_or_create_domain(
|
||||
transition_domain, debug_on
|
||||
)
|
||||
target_domain, was_created = self.update_or_create_domain(transition_domain, debug_on)
|
||||
|
||||
debug_string = ""
|
||||
if target_domain is None:
|
||||
|
@ -704,9 +664,7 @@ class Command(BaseCommand):
|
|||
|
||||
# ======================================================
|
||||
# ================ DOMAIN INVITATIONS ==================
|
||||
new_domain_invitation = self.try_add_domain_invitation(
|
||||
transition_domain_email, target_domain
|
||||
)
|
||||
new_domain_invitation = self.try_add_domain_invitation(transition_domain_email, target_domain)
|
||||
if new_domain_invitation is None:
|
||||
logger.info(
|
||||
f"{TerminalColors.YELLOW} ! No new e-mail detected !" # noqa
|
||||
|
@ -812,19 +770,11 @@ class Command(BaseCommand):
|
|||
invitation.domain = existing_domain.get()
|
||||
else:
|
||||
# Raise an err for now
|
||||
raise Exception(
|
||||
f"Domain {existing_domain} wants to be added"
|
||||
"but doesn't exist in the DB"
|
||||
)
|
||||
raise Exception(f"Domain {existing_domain} wants to be added" "but doesn't exist in the DB")
|
||||
invitation.save()
|
||||
|
||||
valid_org_choices = [
|
||||
(name, value)
|
||||
for name, value in DomainApplication.OrganizationChoices.choices
|
||||
]
|
||||
valid_fed_choices = [
|
||||
value for name, value in DomainApplication.BranchChoices.choices
|
||||
]
|
||||
valid_org_choices = [(name, value) for name, value in DomainApplication.OrganizationChoices.choices]
|
||||
valid_fed_choices = [value for name, value in DomainApplication.BranchChoices.choices]
|
||||
valid_agency_choices = DomainApplication.AGENCIES
|
||||
# ======================================================
|
||||
# ================= DOMAIN INFORMATION =================
|
||||
|
@ -852,11 +802,7 @@ class Command(BaseCommand):
|
|||
|
||||
TerminalHelper.print_conditional(
|
||||
debug_on,
|
||||
(
|
||||
f"{TerminalColors.YELLOW}"
|
||||
f"Trying to add: {domain_information_to_create}"
|
||||
f"{TerminalColors.ENDC}"
|
||||
),
|
||||
(f"{TerminalColors.YELLOW}" f"Trying to add: {domain_information_to_create}" f"{TerminalColors.ENDC}"),
|
||||
)
|
||||
DomainInformation.objects.bulk_create(domain_information_to_create)
|
||||
|
||||
|
|
|
@ -111,9 +111,7 @@ class FileTransitionLog:
|
|||
"""Logs every LogItem contained in this object"""
|
||||
for parent_log in self.logs:
|
||||
for child_log in parent_log:
|
||||
TerminalHelper.print_conditional(
|
||||
True, child_log.message, child_log.severity
|
||||
)
|
||||
TerminalHelper.print_conditional(True, child_log.message, child_log.severity)
|
||||
|
||||
def display_logs_by_domain_name(self, domain_name, restrict_type=LogCode.DEFAULT):
|
||||
"""Displays all logs of a given domain_name.
|
||||
|
@ -130,9 +128,7 @@ class FileTransitionLog:
|
|||
return None
|
||||
|
||||
for log in domain_logs:
|
||||
TerminalHelper.print_conditional(
|
||||
restrict_type != log.code, log.message, log.code
|
||||
)
|
||||
TerminalHelper.print_conditional(restrict_type != log.code, log.message, log.code)
|
||||
|
||||
def get_logs(self, file_type, domain_name):
|
||||
"""Grabs the logs associated with
|
||||
|
@ -166,33 +162,21 @@ class LoadExtraTransitionDomain:
|
|||
updated_transition_domain = transition_domain
|
||||
try:
|
||||
# STEP 1: Parse organization data
|
||||
updated_transition_domain = self.parse_org_data(
|
||||
domain_name, transition_domain
|
||||
)
|
||||
updated_transition_domain = self.parse_org_data(domain_name, transition_domain)
|
||||
|
||||
# STEP 2: Parse domain type data
|
||||
updated_transition_domain = self.parse_domain_type_data(
|
||||
domain_name, transition_domain
|
||||
)
|
||||
updated_transition_domain = self.parse_domain_type_data(domain_name, transition_domain)
|
||||
|
||||
# STEP 3: Parse agency data
|
||||
updated_transition_domain = self.parse_agency_data(
|
||||
domain_name, transition_domain
|
||||
)
|
||||
updated_transition_domain = self.parse_agency_data(domain_name, transition_domain)
|
||||
|
||||
# STEP 4: Parse creation and expiration data
|
||||
updated_transition_domain = self.parse_creation_expiration_data(
|
||||
domain_name, transition_domain
|
||||
)
|
||||
updated_transition_domain = self.parse_creation_expiration_data(domain_name, transition_domain)
|
||||
|
||||
# Check if the instance has changed before saving
|
||||
updated_transition_domain.save()
|
||||
updated_transition_domains.append(updated_transition_domain)
|
||||
logger.info(
|
||||
f"{TerminalColors.OKCYAN}"
|
||||
f"Successfully updated {domain_name}"
|
||||
f"{TerminalColors.ENDC}"
|
||||
)
|
||||
logger.info(f"{TerminalColors.OKCYAN}" f"Successfully updated {domain_name}" f"{TerminalColors.ENDC}")
|
||||
|
||||
# If we run into an exception on this domain,
|
||||
# Just skip over it and log that it happened.
|
||||
|
@ -267,8 +251,7 @@ class LoadExtraTransitionDomain:
|
|||
self.parse_logs.create_log_item(
|
||||
EnumFilenames.DOMAIN_ESCROW,
|
||||
LogCode.ERROR,
|
||||
"Could not add epp_creation_date and epp_expiration_date "
|
||||
f"on {domain_name}, no data exists.",
|
||||
"Could not add epp_creation_date and epp_expiration_date " f"on {domain_name}, no data exists.",
|
||||
domain_name,
|
||||
not self.debug,
|
||||
)
|
||||
|
@ -316,10 +299,7 @@ class LoadExtraTransitionDomain:
|
|||
)
|
||||
return transition_domain
|
||||
|
||||
agency_exists = (
|
||||
transition_domain.federal_agency is not None
|
||||
and transition_domain.federal_agency.strip() != ""
|
||||
)
|
||||
agency_exists = transition_domain.federal_agency is not None and transition_domain.federal_agency.strip() != ""
|
||||
|
||||
if not isinstance(info.active, str) or not info.active.lower() == "y":
|
||||
self.parse_logs.create_log_item(
|
||||
|
@ -354,9 +334,7 @@ class LoadExtraTransitionDomain:
|
|||
|
||||
return transition_domain
|
||||
|
||||
def parse_domain_type_data(
|
||||
self, domain_name, transition_domain: TransitionDomain
|
||||
) -> TransitionDomain:
|
||||
def parse_domain_type_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain:
|
||||
"""Grabs organization_type and federal_type from the parsed files
|
||||
and associates it with a transition_domain object, then returns that object."""
|
||||
if not isinstance(transition_domain, TransitionDomain):
|
||||
|
@ -401,12 +379,10 @@ class LoadExtraTransitionDomain:
|
|||
# Are we updating data that already exists,
|
||||
# or are we adding new data in its place?
|
||||
organization_type_exists = (
|
||||
transition_domain.organization_type is not None
|
||||
and transition_domain.organization_type.strip() != ""
|
||||
transition_domain.organization_type is not None and transition_domain.organization_type.strip() != ""
|
||||
)
|
||||
federal_type_exists = (
|
||||
transition_domain.federal_type is not None
|
||||
and transition_domain.federal_type.strip() != ""
|
||||
transition_domain.federal_type is not None and transition_domain.federal_type.strip() != ""
|
||||
)
|
||||
|
||||
# If we get two records, then we know it is federal.
|
||||
|
@ -440,9 +416,7 @@ class LoadExtraTransitionDomain:
|
|||
|
||||
return transition_domain
|
||||
|
||||
def parse_org_data(
|
||||
self, domain_name, transition_domain: TransitionDomain
|
||||
) -> TransitionDomain:
|
||||
def parse_org_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain:
|
||||
"""Grabs organization_name from the parsed files and associates it
|
||||
with a transition_domain object, then returns that object."""
|
||||
if not isinstance(transition_domain, TransitionDomain):
|
||||
|
@ -460,8 +434,7 @@ class LoadExtraTransitionDomain:
|
|||
return transition_domain
|
||||
|
||||
desired_property_exists = (
|
||||
transition_domain.organization_name is not None
|
||||
and transition_domain.organization_name.strip() != ""
|
||||
transition_domain.organization_name is not None and transition_domain.organization_name.strip() != ""
|
||||
)
|
||||
|
||||
transition_domain.organization_name = org_info.orgname
|
||||
|
@ -478,9 +451,7 @@ class LoadExtraTransitionDomain:
|
|||
|
||||
return transition_domain
|
||||
|
||||
def _add_or_change_message(
|
||||
self, file_type, var_name, changed_value, domain_name, is_update=False
|
||||
):
|
||||
def _add_or_change_message(self, file_type, var_name, changed_value, domain_name, is_update=False):
|
||||
"""Creates a log instance when a property
|
||||
is successfully changed on a given TransitionDomain."""
|
||||
if not is_update:
|
||||
|
@ -868,9 +839,7 @@ class ExtraTransitionDomain:
|
|||
else:
|
||||
if not infer_filenames:
|
||||
raise FileNotFoundError(
|
||||
f"{TerminalColors.FAIL}"
|
||||
f"Could not find file {filename} for {name}"
|
||||
f"{TerminalColors.ENDC}"
|
||||
f"{TerminalColors.FAIL}" f"Could not find file {filename} for {name}" f"{TerminalColors.ENDC}"
|
||||
)
|
||||
|
||||
# Infer filename logic #
|
||||
|
@ -903,9 +872,7 @@ class ExtraTransitionDomain:
|
|||
)
|
||||
continue
|
||||
raise FileNotFoundError(
|
||||
f"{TerminalColors.FAIL}"
|
||||
f"Could not find file {filename} for {name}"
|
||||
f"{TerminalColors.ENDC}"
|
||||
f"{TerminalColors.FAIL}" f"Could not find file {filename} for {name}" f"{TerminalColors.ENDC}"
|
||||
)
|
||||
|
||||
def clear_file_data(self):
|
||||
|
@ -913,17 +880,13 @@ class ExtraTransitionDomain:
|
|||
file_type: FileDataHolder = item
|
||||
file_type.data = {}
|
||||
|
||||
def parse_csv_file(
|
||||
self, file, seperator, dataclass_type, id_field, is_domain_escrow=False
|
||||
):
|
||||
def parse_csv_file(self, file, seperator, dataclass_type, id_field, is_domain_escrow=False):
|
||||
# Domain escrow is an edge case
|
||||
if is_domain_escrow:
|
||||
item_to_return = self._read_domain_escrow(file, seperator)
|
||||
return item_to_return
|
||||
else:
|
||||
item_to_return = self._read_csv_file(
|
||||
file, seperator, dataclass_type, id_field
|
||||
)
|
||||
item_to_return = self._read_csv_file(file, seperator, dataclass_type, id_field)
|
||||
return item_to_return
|
||||
|
||||
# Domain escrow is an edgecase given that its structured differently data-wise.
|
||||
|
@ -938,9 +901,7 @@ class ExtraTransitionDomain:
|
|||
creation_date = datetime.strptime(row[7], date_format)
|
||||
expiration_date = datetime.strptime(row[11], date_format)
|
||||
|
||||
dict_data[domain_name] = DomainEscrow(
|
||||
domain_name, creation_date, expiration_date
|
||||
)
|
||||
dict_data[domain_name] = DomainEscrow(domain_name, creation_date, expiration_date)
|
||||
return dict_data
|
||||
|
||||
def _grab_row_id(self, row, id_field, file, dataclass_type):
|
||||
|
@ -973,9 +934,7 @@ class ExtraTransitionDomain:
|
|||
f"Found bad data in {file}. Attempting to clean."
|
||||
f"{TerminalColors.ENDC}"
|
||||
)
|
||||
updated_file_content = self.replace_bad_seperators(
|
||||
file, f"{seperator}", ";badseperator;"
|
||||
)
|
||||
updated_file_content = self.replace_bad_seperators(file, f"{seperator}", ";badseperator;")
|
||||
dict_data = {}
|
||||
break
|
||||
|
||||
|
@ -989,11 +948,7 @@ class ExtraTransitionDomain:
|
|||
|
||||
# After we clean the data, try to parse it again
|
||||
if updated_file_content:
|
||||
logger.info(
|
||||
f"{TerminalColors.MAGENTA}"
|
||||
f"Retrying load for {file}"
|
||||
f"{TerminalColors.ENDC}"
|
||||
)
|
||||
logger.info(f"{TerminalColors.MAGENTA}" f"Retrying load for {file}" f"{TerminalColors.ENDC}")
|
||||
# Store the file locally rather than writing to the file.
|
||||
# This is to avoid potential data corruption.
|
||||
updated_file = io.StringIO(updated_file_content)
|
||||
|
@ -1004,9 +959,7 @@ class ExtraTransitionDomain:
|
|||
# is wrong with the file.
|
||||
if None in row:
|
||||
logger.error(
|
||||
f"{TerminalColors.FAIL}"
|
||||
f"Corrupt data found for {row_id}. Skipping."
|
||||
f"{TerminalColors.ENDC}"
|
||||
f"{TerminalColors.FAIL}" f"Corrupt data found for {row_id}. Skipping." f"{TerminalColors.ENDC}"
|
||||
)
|
||||
continue
|
||||
|
||||
|
|
|
@ -193,9 +193,7 @@ class TerminalHelper:
|
|||
return total_line
|
||||
|
||||
@staticmethod
|
||||
def print_to_file_conditional(
|
||||
print_condition: bool, filename: str, file_directory: str, file_contents: str
|
||||
):
|
||||
def print_to_file_conditional(print_condition: bool, filename: str, file_directory: str, file_contents: str):
|
||||
"""Sometimes logger outputs get insanely huge."""
|
||||
if print_condition:
|
||||
# Add a slash if the last character isn't one
|
||||
|
@ -204,10 +202,6 @@ class TerminalHelper:
|
|||
# Assemble filepath
|
||||
filepath = f"{file_directory}{filename}.txt"
|
||||
# Write to file
|
||||
logger.info(
|
||||
f"{TerminalColors.MAGENTA}Writing to file "
|
||||
f" {filepath}..."
|
||||
f"{TerminalColors.ENDC}"
|
||||
)
|
||||
logger.info(f"{TerminalColors.MAGENTA}Writing to file " f" {filepath}..." f"{TerminalColors.ENDC}")
|
||||
with open(f"{filepath}", "w+") as f:
|
||||
f.write(file_contents)
|
||||
|
|
|
@ -37,26 +37,14 @@ class TransitionDomainArguments:
|
|||
|
||||
# Filenames #
|
||||
# = Adhocs =#
|
||||
agency_adhoc_filename: Optional[str] = field(
|
||||
default=EnumFilenames.AGENCY_ADHOC.value[1], repr=True
|
||||
)
|
||||
domain_adhoc_filename: Optional[str] = field(
|
||||
default=EnumFilenames.DOMAIN_ADHOC.value[1], repr=True
|
||||
)
|
||||
organization_adhoc_filename: Optional[str] = field(
|
||||
default=EnumFilenames.ORGANIZATION_ADHOC.value[1], repr=True
|
||||
)
|
||||
authority_adhoc_filename: Optional[str] = field(
|
||||
default=EnumFilenames.AUTHORITY_ADHOC.value[1], repr=True
|
||||
)
|
||||
agency_adhoc_filename: Optional[str] = field(default=EnumFilenames.AGENCY_ADHOC.value[1], repr=True)
|
||||
domain_adhoc_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADHOC.value[1], repr=True)
|
||||
organization_adhoc_filename: Optional[str] = field(default=EnumFilenames.ORGANIZATION_ADHOC.value[1], repr=True)
|
||||
authority_adhoc_filename: Optional[str] = field(default=EnumFilenames.AUTHORITY_ADHOC.value[1], repr=True)
|
||||
|
||||
# = Data files =#
|
||||
domain_escrow_filename: Optional[str] = field(
|
||||
default=EnumFilenames.DOMAIN_ESCROW.value[1], repr=True
|
||||
)
|
||||
domain_additional_filename: Optional[str] = field(
|
||||
default=EnumFilenames.DOMAIN_ADDITIONAL.value[1], repr=True
|
||||
)
|
||||
domain_escrow_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ESCROW.value[1], repr=True)
|
||||
domain_additional_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADDITIONAL.value[1], repr=True)
|
||||
domain_contacts_filename: Optional[str] = field(default=None, repr=True)
|
||||
domain_statuses_filename: Optional[str] = field(default=None, repr=True)
|
||||
contacts_filename: Optional[str] = field(default=None, repr=True)
|
||||
|
|
|
@ -27,16 +27,12 @@ class Migration(migrations.Migration):
|
|||
migrations.AddField(
|
||||
model_name="transitiondomain",
|
||||
name="organization_type",
|
||||
field=models.TextField(
|
||||
blank=True, help_text="Type of organization", max_length=255, null=True
|
||||
),
|
||||
field=models.TextField(blank=True, help_text="Type of organization", max_length=255, null=True),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name="transitiondomain",
|
||||
name="organization_name",
|
||||
field=models.TextField(
|
||||
blank=True, db_index=True, help_text="Organization name", null=True
|
||||
),
|
||||
field=models.TextField(blank=True, db_index=True, help_text="Organization name", null=True),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name="transitiondomain",
|
||||
|
|
|
@ -166,10 +166,7 @@ class DomainApplication(TimeStampedModel):
|
|||
"American Battle Monuments Commission",
|
||||
"AMTRAK",
|
||||
"Appalachian Regional Commission",
|
||||
(
|
||||
"Appraisal Subcommittee of the Federal Financial "
|
||||
"Institutions Examination Council"
|
||||
),
|
||||
("Appraisal Subcommittee of the Federal Financial " "Institutions Examination Council"),
|
||||
"Appraisal Subcommittee",
|
||||
"Architect of the Capitol",
|
||||
"Armed Forces Retirement Home",
|
||||
|
|
|
@ -67,15 +67,11 @@ class TransitionDomain(TimeStampedModel):
|
|||
)
|
||||
epp_creation_date = models.DateField(
|
||||
null=True,
|
||||
help_text=(
|
||||
"Duplication of registry's creation " "date saved for ease of reporting"
|
||||
),
|
||||
help_text=("Duplication of registry's creation " "date saved for ease of reporting"),
|
||||
)
|
||||
epp_expiration_date = models.DateField(
|
||||
null=True,
|
||||
help_text=(
|
||||
"Duplication of registry's expiration " "date saved for ease of reporting"
|
||||
),
|
||||
help_text=("Duplication of registry's expiration " "date saved for ease of reporting"),
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
|
|
|
@ -103,9 +103,7 @@ class TestMigrations(TestCase):
|
|||
# Check Domain table
|
||||
matching_domains = Domain.objects.filter(name=transition_domain_name)
|
||||
# Check Domain Information table
|
||||
matching_domain_informations = DomainInformation.objects.filter(
|
||||
domain__name=transition_domain_name
|
||||
)
|
||||
matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name)
|
||||
# Check Domain Invitation table
|
||||
matching_domain_invitations = DomainInvitation.objects.filter(
|
||||
email=transition_domain_email.lower(),
|
||||
|
@ -146,12 +144,8 @@ class TestMigrations(TestCase):
|
|||
)
|
||||
self.assertEqual(total_missing_domains, expected_missing_domains)
|
||||
self.assertEqual(total_duplicate_domains, expected_duplicate_domains)
|
||||
self.assertEqual(
|
||||
total_missing_domain_informations, expected_missing_domain_informations
|
||||
)
|
||||
self.assertEqual(
|
||||
total_missing_domain_invitations, expected_missing_domain_invitations
|
||||
)
|
||||
self.assertEqual(total_missing_domain_informations, expected_missing_domain_informations)
|
||||
self.assertEqual(total_missing_domain_invitations, expected_missing_domain_invitations)
|
||||
|
||||
self.assertEqual(total_transition_domains, expected_total_transition_domains)
|
||||
self.assertEqual(total_domains, expected_total_domains)
|
||||
|
@ -352,9 +346,7 @@ class TestMigrations(TestCase):
|
|||
# Simluate Logins
|
||||
for invite in DomainInvitation.objects.all():
|
||||
# get a user with this email address
|
||||
user, user_created = User.objects.get_or_create(
|
||||
email=invite.email, username=invite.email
|
||||
)
|
||||
user, user_created = User.objects.get_or_create(email=invite.email, username=invite.email)
|
||||
user.on_each_login()
|
||||
|
||||
# Analyze the tables
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue