Some refactoring, cleanup, and fixes so that the data loads correctly. Migrations run correctly end-to-end. But unit tests need to be updated next.

This commit is contained in:
CocoByte 2023-11-02 21:25:19 -05:00
parent 795a4ec160
commit 87696d9b92
No known key found for this signature in database
GPG key ID: BBFAA2526384C97F
2 changed files with 426 additions and 287 deletions

View file

@ -27,6 +27,9 @@ class Command(BaseCommand):
entries for every domain we ADD (but not for domains entries for every domain we ADD (but not for domains
we UPDATE)""" we UPDATE)"""
# ======================================================
# ===================== ARGUMENTS =====================
# ======================================================
def add_arguments(self, parser): def add_arguments(self, parser):
parser.add_argument("--debug", action=argparse.BooleanOptionalAction) parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
@ -36,6 +39,10 @@ class Command(BaseCommand):
help="Sets max number of entries to load, set to 0 to load all entries", help="Sets max number of entries to load, set to 0 to load all entries",
) )
# ======================================================
# ===================== PRINTING ======================
# ======================================================
def print_debug_mode_statements( def print_debug_mode_statements(
self, debug_on: bool, debug_max_entries_to_parse: int self, debug_on: bool, debug_max_entries_to_parse: int
): ):
@ -60,49 +67,17 @@ class Command(BaseCommand):
""", """,
) )
def update_domain_information(self, current: DomainInformation, target: DomainInformation, debug_on: bool) -> bool: def parse_limit_reached(self,
updated = False debug_max_entries_to_parse: bool,
total_rows_parsed: int
fields_to_update = [ ) -> bool:
'organization_type', if (debug_max_entries_to_parse > 0
'federal_type', and total_rows_parsed >= debug_max_entries_to_parse):
'federal_agency', logger.info(
"organization_name"
]
defaults = {field: getattr(target, field) for field in fields_to_update}
if current != target:
current = target
DomainInformation.objects.filter(domain=current.domain).update(**defaults)
updated = True
return updated
def update_domain_status(
self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool
) -> bool:
"""Given a transition domain that matches an existing domain,
updates the existing domain object with that status of
the transition domain.
Returns TRUE if an update was made. FALSE if the states
matched and no update was made"""
transition_domain_status = transition_domain.status
existing_status = target_domain.state
if transition_domain_status != existing_status:
if transition_domain_status == TransitionDomain.StatusChoices.ON_HOLD:
target_domain.place_client_hold(ignoreEPP=True)
else:
target_domain.revert_client_hold(ignoreEPP=True)
target_domain.save()
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW} f"""{TerminalColors.YELLOW}
>> Updated {target_domain.name} state from ----PARSE LIMIT REACHED. HALTING PARSER.----
'{existing_status}' to '{target_domain.state}' {TerminalColors.ENDC}
(no domain invitation entry added) """
{TerminalColors.ENDC}""",
) )
return True return True
return False return False
@ -181,9 +156,143 @@ class Command(BaseCommand):
""", """,
) )
def try_add_domain_information(self):
pass
# ======================================================
# =================== DOMAIN =====================
# ======================================================
def update_or_create_domain(self,
transition_domain: TransitionDomain,
debug_on: bool) -> (Domain, bool):
""" Given a transition domain, either finds & updates an existing
corresponding domain, or creates a new corresponding domain in
the Domain table.
Returns the corresponding Domain object and a boolean
that is TRUE if that Domain was newly created.
"""
# Create some local variables to make data tracing easier
transition_domain_name = transition_domain.domain_name
transition_domain_status = transition_domain.status
transition_domain_creation_date = transition_domain.epp_creation_date
transition_domain_expiration_date = transition_domain.epp_expiration_date
domain_exists = Domain.objects.filter(name=transition_domain_name).exists()
if domain_exists:
try:
# ----------------------- UPDATE DOMAIN -----------------------
# ---- GET THE DOMAIN
target_domain = Domain.objects.get(name=transition_domain_name)
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}
> Found existing entry in Domain table for: {transition_domain_name}, {target_domain.state}
{TerminalColors.ENDC}""", # noqa
)
# ---- UPDATE THE DOMAIN
# update the status
update_made = self.update_domain_status(
transition_domain, target_domain, debug_on
)
# TODO: not all domains need to be updated (the information is the same). Need to bubble this up to the final report.
# update dates (creation and expiration)
if transition_domain_creation_date is not None:
# TODO: added this because I ran into a situation where the created_at date was null (violated a key constraint). How do we want to handle this case?
target_domain.created_at = transition_domain_creation_date
if transition_domain_expiration_date is not None:
target_domain.expiration_date = transition_domain_expiration_date
target_domain.save()
return (target_domain, False)
except Domain.MultipleObjectsReturned:
# This exception was thrown once before during testing.
# While the circumstances that led to corrupt data in
# the domain table was a freak accident, and the possibility of it
# happening again is safe-guarded by a key constraint,
# better to keep an eye out for it since it would require
# immediate attention.
logger.warning(
f"""
{TerminalColors.FAIL}
!!! ERROR: duplicate entries already exist in the
Domain table for the following domain:
{transition_domain_name}
RECOMMENDATION:
This means the Domain table is corrupt. Please
check the Domain table data as there should be a key
constraint which prevents duplicate entries.
----------TERMINATING----------"""
)
sys.exit()
except TransitionNotAllowed as err:
logger.warning(
f"""{TerminalColors.FAIL}
Unable to change state for {transition_domain_name}
RECOMMENDATION:
This indicates there might have been changes to the
Domain model which were not accounted for in this
migration script. Please check state change rules
in the Domain model and ensure we are following the
correct state transition pathways.
INTERNAL ERROR MESSAGE:
'TRANSITION NOT ALLOWED' exception
{err}
----------SKIPPING----------"""
)
return (None, False)
else:
# ----------------------- CREATE DOMAIN -----------------------
# no matching entry, make one
target_domain = Domain(
name=transition_domain_name,
state=transition_domain_status,
expiration_date=transition_domain_expiration_date,
)
return (target_domain, True)
def update_domain_status(
self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool
) -> bool:
"""Given a transition domain that matches an existing domain,
updates the existing domain object with that status of
the transition domain.
Returns TRUE if an update was made. FALSE if the states
matched and no update was made"""
transition_domain_status = transition_domain.status
existing_status = target_domain.state
if transition_domain_status != existing_status:
if transition_domain_status == TransitionDomain.StatusChoices.ON_HOLD:
target_domain.place_client_hold(ignoreEPP=True)
else:
target_domain.revert_client_hold(ignoreEPP=True)
target_domain.save()
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}
>> Updated {target_domain.name} state from
'{existing_status}' to '{target_domain.state}'
(no domain invitation entry added)
{TerminalColors.ENDC}""",
)
return True
return False
# ======================================================
# ================ DOMAIN INVITATION ==================
# ======================================================
def try_add_domain_invitation( def try_add_domain_invitation(
self, domain_email: str, associated_domain: Domain self, domain_email: str, associated_domain: Domain
) -> DomainInvitation | None: ) -> DomainInvitation | None:
@ -224,6 +333,169 @@ class Command(BaseCommand):
return new_domain_invitation return new_domain_invitation
return None return None
# ======================================================
# ================ DOMAIN INFORMATION =================
# ======================================================
def update_domain_information(self, current: DomainInformation, target: DomainInformation, debug_on: bool) -> bool:
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.OKCYAN}"
f"Updating: {current}"
f"{TerminalColors.ENDC}"), # noqa
)
updated = False
fields_to_update = [
'organization_type',
'federal_type',
'federal_agency',
"organization_name"
]
defaults = {field: getattr(target, field) for field in fields_to_update}
if current != target:
current = target
DomainInformation.objects.filter(domain=current.domain).update(**defaults)
updated = True
return updated
def try_add_domain_information(self):
pass
def create_new_domain_info(self,
transition_domain: TransitionDomain,
domain: Domain) -> DomainInformation:
org_type = transition_domain.organization_type
fed_type = transition_domain.federal_type
fed_agency = transition_domain.federal_agency
valid_org_type = org_type in [choice_value for choice_value, _ in DomainApplication.OrganizationChoices.choices]
valid_fed_type = fed_type in [choice_value for choice_value, _ in DomainApplication.BranchChoices.choices]
valid_fed_agency = fed_agency in DomainApplication.AGENCIES
default_creator, _ = User.objects.get_or_create(username="System")
new_domain_info_data = {
'domain': domain,
'organization_name': transition_domain.organization_name,
"creator": default_creator,
}
new_domain_info_data['federal_type'] = None
for item in DomainApplication.BranchChoices.choices:
print(f"it is this: {item}")
name, _ = item
if fed_type is not None and fed_type.lower() == name:
new_domain_info_data['federal_type'] = item
new_domain_info_data['organization_type'] = org_type
new_domain_info_data['federal_agency'] = fed_agency
if valid_org_type:
new_domain_info_data['organization_type'] = org_type
else:
logger.debug(f"No org type found on {domain.name}")
if valid_fed_type:
new_domain_info_data['federal_type'] = fed_type
else:
logger.debug(f"No federal type found on {domain.name}")
if valid_fed_agency:
new_domain_info_data['federal_agency'] = fed_agency
else:
logger.debug(f"No federal agency found on {domain.name}")
new_domain_info = DomainInformation(**new_domain_info_data)
# DEBUG:
TerminalHelper.print_conditional(
True,
(f"{TerminalColors.MAGENTA}"
f"Created template: {new_domain_info}"
f"{TerminalColors.ENDC}"), # noqa
)
return new_domain_info
def update_or_create_domain_information(self,
transition_domain: TransitionDomain,
debug_on: bool) -> (DomainInformation, bool):
transition_domain_name = transition_domain.domain_name
# Get associated domain
domain_data = Domain.objects.filter(name=transition_domain.domain_name)
if not domain_data.exists():
logger.warn(
f"{TerminalColors.FAIL}"
f"WARNING: No Domain exists for:"
f"{transition_domain_name}"
f"{TerminalColors.ENDC}\n"
)
return (None, None, False)
domain = domain_data.get()
template_domain_information = self.create_new_domain_info(transition_domain, domain)
target_domain_information = None
domain_information_exists = DomainInformation.objects.filter(domain__name=transition_domain_name).exists()
if domain_information_exists:
try:
# get the existing domain information object
target_domain_information = DomainInformation.objects.get(domain__name=transition_domain_name)
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.FAIL}"
f"Found existing entry in Domain Information table for:"
f"{transition_domain_name}"
f"{TerminalColors.ENDC}"), # noqa
)
# for existing entry, update the status to
# the transition domain status
update_made = self.update_domain_information(
target_domain_information, template_domain_information, debug_on
)
# TODO: not all domains need to be updated (the information is the same). Need to bubble this up to the final report.
return (target_domain_information, domain, False)
except DomainInformation.MultipleObjectsReturned:
# This should never happen (just like with the Domain Table).
# However, because such an error did occur in the past,
# we will watch for it in this script
logger.warning(
f"""
{TerminalColors.FAIL}
!!! ERROR: duplicate entries already exist in the
Domain Information table for the following domain:
{transition_domain_name}
RECOMMENDATION:
This means the Domain Information table is corrupt. Please
check the Domain Information table data as there should be a key
constraint which prevents duplicate entries.
----------TERMINATING----------"""
)
sys.exit()
else:
# no matching entry, make one
target_domain_information = template_domain_information
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.OKCYAN}"
f" Adding domain information for:"
f"{transition_domain_name}"
f"{TerminalColors.ENDC}"),
)
return (target_domain_information, domain, True)
# ======================================================
# ===================== HANDLE ========================
# ======================================================
def handle( def handle(
self, self,
**options, **options,
@ -244,7 +516,6 @@ class Command(BaseCommand):
domains_to_create = [] domains_to_create = []
domain_information_to_create = [] domain_information_to_create = []
domain_invitations_to_create = []
# domains we UPDATED # domains we UPDATED
updated_domain_entries = [] updated_domain_entries = []
updated_domain_information = [] updated_domain_information = []
@ -253,19 +524,29 @@ class Command(BaseCommand):
skipped_domain_entries = [] skipped_domain_entries = []
skipped_domain_information_entries = [] skipped_domain_information_entries = []
# domain invitations to ADD
domain_invitations_to_create = []
# if we are limiting our parse (for testing purposes, keep # if we are limiting our parse (for testing purposes, keep
# track of total rows parsed) # track of total rows parsed)
total_rows_parsed = 0 total_rows_parsed = 0
logger.info( logger.info(
f"""{TerminalColors.OKGREEN} f"""{TerminalColors.OKCYAN}
========================== ==========================
Beginning Data Transfer Beginning Data Transfer
========================== ==========================
{TerminalColors.ENDC}""" {TerminalColors.ENDC}"""
) )
logger.info(
f"""{TerminalColors.OKCYAN}
========= Adding Domains and Domain Invitations =========
{TerminalColors.ENDC}"""
)
for transition_domain in TransitionDomain.objects.all(): for transition_domain in TransitionDomain.objects.all():
# Create some local variables to make data tracing easier
transition_domain_name = transition_domain.domain_name transition_domain_name = transition_domain.domain_name
transition_domain_status = transition_domain.status transition_domain_status = transition_domain.status
transition_domain_email = transition_domain.username transition_domain_email = transition_domain.username
@ -282,86 +563,17 @@ class Command(BaseCommand):
f"{TerminalColors.ENDC}", # noqa f"{TerminalColors.ENDC}", # noqa
) )
new_domain_invitation = None # ======================================================
# Check for existing domain entry # ====================== DOMAIN =======================
domain_exists = Domain.objects.filter(name=transition_domain_name).exists() target_domain, was_created = self.update_or_create_domain(transition_domain, debug_on)
if domain_exists:
try:
# get the existing domain
domain_to_update = Domain.objects.get(name=transition_domain_name)
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}
> Found existing entry in Domain table for: {transition_domain_name}, {domain_to_update.state}
{TerminalColors.ENDC}""", # noqa
)
# for existing entry, update the status to debug_string = ""
# the transition domain status if target_domain is None:
update_made = self.update_domain_status( # ---------------- SKIPPED ----------------
transition_domain, domain_to_update, debug_on skipped_domain_entries.append(transition_domain_name)
) debug_string = f"skipped domain: {target_domain}"
elif was_created:
domain_to_update.created_at = transition_domain_creation_date # ---------------- DUPLICATE ----------------
domain_to_update.expiration_date = transition_domain_expiration_date
domain_to_update.save()
if update_made:
# keep track of updated domains for data analysis purposes
updated_domain_entries.append(transition_domain.domain_name)
# check if we need to add a domain invitation
# (eg. for a new user)
new_domain_invitation = self.try_add_domain_invitation(
transition_domain_email, domain_to_update
)
except Domain.MultipleObjectsReturned:
# This exception was thrown once before during testing.
# While the circumstances that led to corrupt data in
# the domain table was a freak accident, and the possibility of it
# happening again is safe-guarded by a key constraint,
# better to keep an eye out for it since it would require
# immediate attention.
logger.warning(
f"""
{TerminalColors.FAIL}
!!! ERROR: duplicate entries already exist in the
Domain table for the following domain:
{transition_domain_name}
RECOMMENDATION:
This means the Domain table is corrupt. Please
check the Domain table data as there should be a key
constraint which prevents duplicate entries.
----------TERMINATING----------"""
)
sys.exit()
except TransitionNotAllowed as err:
skipped_domain_entries.append(transition_domain_name)
logger.warning(
f"""{TerminalColors.FAIL}
Unable to change state for {transition_domain_name}
RECOMMENDATION:
This indicates there might have been changes to the
Domain model which were not accounted for in this
migration script. Please check state change rules
in the Domain model and ensure we are following the
correct state transition pathways.
INTERNAL ERROR MESSAGE:
'TRANSITION NOT ALLOWED' exception
{err}
----------SKIPPING----------"""
)
else:
# no entry was found in the domain table
# for the given domain. Create a new entry.
# first see if we are already adding an entry for this domain.
# The unique key constraint does not allow duplicate domain entries # The unique key constraint does not allow duplicate domain entries
# even if there are different users. # even if there are different users.
existing_domain_in_to_create = next( existing_domain_in_to_create = next(
@ -369,38 +581,33 @@ class Command(BaseCommand):
None, None,
) )
if existing_domain_in_to_create is not None: if existing_domain_in_to_create is not None:
TerminalHelper.print_conditional( debug_string = f"""{TerminalColors.YELLOW}
debug_on,
f"""{TerminalColors.YELLOW}
Duplicate Detected: {transition_domain_name}. Duplicate Detected: {transition_domain_name}.
Cannot add duplicate entry for another username. Cannot add duplicate entry for another username.
Violates Unique Key constraint. Violates Unique Key constraint.
Checking for unique user e-mail for Domain Invitations... Checking for unique user e-mail for Domain Invitations...
{TerminalColors.ENDC}""", {TerminalColors.ENDC}"""
)
new_domain_invitation = self.try_add_domain_invitation(
transition_domain_email, existing_domain_in_to_create
)
else: else:
# no matching entry, make one # ---------------- CREATED ----------------
new_domain = Domain( domains_to_create.append(target_domain)
name=transition_domain_name, debug_string = f"created domain: {target_domain}"
state=transition_domain_status, elif not was_created:
expiration_date=transition_domain_expiration_date, # ---------------- UPDATED ----------------
) updated_domain_entries.append(transition_domain.domain_name)
debug_string = f"updated domain: {target_domain}"
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.OKCYAN} {debug_string} {TerminalColors.ENDC}"),
)
domains_to_create.append(new_domain) # ======================================================
# DEBUG: # ================ DOMAIN INVITATIONS ==================
TerminalHelper.print_conditional( new_domain_invitation = self.try_add_domain_invitation(
debug_on, transition_domain_email, target_domain
f"{TerminalColors.OKCYAN} Adding domain: {new_domain} {TerminalColors.ENDC}", # noqa )
)
new_domain_invitation = self.try_add_domain_invitation(
transition_domain_email, new_domain
)
if new_domain_invitation is None: if new_domain_invitation is None:
logger.info( logger.info(
f"{TerminalColors.YELLOW} ! No new e-mail detected !" # noqa f"{TerminalColors.YELLOW} ! No new e-mail detected !" # noqa
@ -414,152 +621,83 @@ class Command(BaseCommand):
) )
domain_invitations_to_create.append(new_domain_invitation) domain_invitations_to_create.append(new_domain_invitation)
# ------------------ Parse limit reached? ------------------
# Check parse limit and exit loop if parse limit has been reached # Check parse limit and exit loop if parse limit has been reached
if ( if self.parse_limit_reached(debug_max_entries_to_parse, total_rows_parsed):
debug_max_entries_to_parse > 0
and total_rows_parsed >= debug_max_entries_to_parse
):
logger.info(
f"""{TerminalColors.YELLOW}
----PARSE LIMIT REACHED. HALTING PARSER.----
{TerminalColors.ENDC}
"""
)
break break
logger.info(
f"""{TerminalColors.OKCYAN}
========= Adding Domains Information Objects =========
{TerminalColors.ENDC}"""
)
Domain.objects.bulk_create(domains_to_create) Domain.objects.bulk_create(domains_to_create)
DomainInvitation.objects.bulk_create(domain_invitations_to_create)
# ======================================================
# ================= DOMAIN INFORMATION =================
for transition_domain in TransitionDomain.objects.all(): for transition_domain in TransitionDomain.objects.all():
transition_domain_name = transition_domain.domain_name target_domain_information, associated_domain, was_created = self.update_or_create_domain_information(transition_domain, debug_on)
# Create associated domain information objects debug_string = ""
domain_data = Domain.objects.filter(name=transition_domain.domain_name) if target_domain_information is None:
if not domain_data.exists(): # ---------------- SKIPPED ----------------
raise ValueError("No domain exists") skipped_domain_information_entries.append(target_domain_information)
debug_string = f"skipped domain information: {target_domain_information}"
domain = domain_data.get() elif was_created:
# DEBUG:
org_type = transition_domain.organization_type TerminalHelper.print_conditional(
fed_type = transition_domain.federal_type debug_on,
fed_agency = transition_domain.federal_agency (f"{TerminalColors.OKCYAN}"
f"Checking duplicates for: {target_domain_information}"
f"{TerminalColors.ENDC}"), # noqa
)
valid_org_type = org_type in [choice_value for choice_value, _ in DomainApplication.OrganizationChoices.choices] # ---------------- DUPLICATE ----------------
valid_fed_type = fed_type in [choice_value for choice_value, _ in DomainApplication.BranchChoices.choices] # The unique key constraint does not allow multiple domain
valid_fed_agency = fed_agency in DomainApplication.AGENCIES # information objects to share the same domain
existing_domain_information_in_to_create = next(
default_creator, _ = User.objects.get_or_create(username="System") (x for x in domain_information_to_create if x.domain.name == target_domain_information.domain.name),
new_domain_info_data = {
'domain': domain,
'organization_name': transition_domain.organization_name,
"creator": default_creator,
}
new_domain_info_data['federal_type'] = None
for item in DomainApplication.BranchChoices.choices:
print(f"it is this: {item}")
name, _ = item
if fed_type is not None and fed_type.lower() == name:
new_domain_info_data['federal_type'] = item
new_domain_info_data['organization_type'] = org_type
new_domain_info_data['federal_agency'] = fed_agency
if valid_org_type:
new_domain_info_data['organization_type'] = org_type
else:
logger.debug(f"No org type found on {domain.name}")
if valid_fed_type:
new_domain_info_data['federal_type'] = fed_type
else:
logger.debug(f"No federal type found on {domain.name}")
if valid_fed_agency:
new_domain_info_data['federal_agency'] = fed_agency
else:
logger.debug(f"No federal agency found on {domain.name}")
new_domain_info = DomainInformation(**new_domain_info_data)
domain_information_exists = DomainInformation.objects.filter(domain=domain).exists()
if domain_information_exists:
try:
# get the existing domain information object
domain_info_to_update = DomainInformation.objects.get(domain=domain)
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}
> Found existing entry in Domain Information table for: {transition_domain_name}
{TerminalColors.ENDC}""", # noqa
)
# for existing entry, update the status to
# the transition domain status
update_made = self.update_domain_information(
domain_info_to_update, new_domain_info, debug_on
)
if update_made:
# keep track of updated domains for data analysis purposes
updated_domain_information.append(transition_domain.domain_name)
except DomainInformation.MultipleObjectsReturned:
# This exception was thrown once before during testing.
# While the circumstances that led to corrupt data in
# the domain table was a freak accident, and the possibility of it
# happening again is safe-guarded by a key constraint,
# better to keep an eye out for it since it would require
# immediate attention.
logger.warning(
f"""
{TerminalColors.FAIL}
!!! ERROR: duplicate entries already exist in the
Domain Information table for the following domain:
{transition_domain_name}
RECOMMENDATION:
This means the Domain Information table is corrupt. Please
check the Domain Information table data as there should be a key
constraint which prevents duplicate entries.
----------TERMINATING----------"""
)
sys.exit()
else:
# no entry was found in the domain table
# for the given domain. Create a new entry.
# first see if we are already adding an entry for this domain.
# The unique key constraint does not allow duplicate domain entries
# even if there are different users.
existing_domain_info_in_to_create = next(
(x for x in domain_information_to_create if x.domain.name == transition_domain_name),
None, None,
) )
if existing_domain_info_in_to_create is not None: # TODO: this is redundant. Currently debugging....running into unique key constraint error....
TerminalHelper.print_conditional( existing_domain_info = DomainInformation.objects.filter(domain__name=target_domain_information.domain.name).exists()
debug_on, if existing_domain_information_in_to_create is not None or existing_domain_info:
f"""{TerminalColors.YELLOW} debug_string = f"""{TerminalColors.YELLOW}
Duplicate Detected: {transition_domain_name}. Duplicate Detected: {domain_information_to_create}.
Cannot add duplicate entry. Cannot add duplicate Domain Information object
Violates Unique Key constraint. {TerminalColors.ENDC}"""
{TerminalColors.ENDC}""",
)
else: else:
# no matching entry, make one # ---------------- CREATED ----------------
domain_information_to_create.append(new_domain_info) domain_information_to_create.append(target_domain_information)
# DEBUG: debug_string = f"created domain information: {target_domain_information}"
TerminalHelper.print_conditional( elif not was_created:
debug_on, # ---------------- UPDATED ----------------
f"{TerminalColors.OKCYAN} Adding domain information on: {new_domain_info.domain.name} {TerminalColors.ENDC}", # noqa updated_domain_information.append(target_domain_information)
) debug_string = f"updated domain information: {target_domain_information}"
else:
debug_string = f"domain information already exists and matches incoming data (NO CHANGES MADE): {target_domain_information}"
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.OKCYAN} {debug_string} {TerminalColors.ENDC}"),
)
# ------------------ Parse limit reached? ------------------
# Check parse limit and exit loop if parse limit has been reached
if self.parse_limit_reached(debug_max_entries_to_parse, total_rows_parsed):
break
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.YELLOW}"
f"Trying to add: {domain_information_to_create}"
f"{TerminalColors.ENDC}"),
)
DomainInformation.objects.bulk_create(domain_information_to_create) DomainInformation.objects.bulk_create(domain_information_to_create)
DomainInvitation.objects.bulk_create(domain_invitations_to_create)
self.print_summary_of_findings( self.print_summary_of_findings(
domains_to_create, domains_to_create,
updated_domain_entries, updated_domain_entries,

View file

@ -149,6 +149,7 @@ class LoadExtraTransitionDomain:
if not all_transition_domains.exists(): if not all_transition_domains.exists():
raise ValueError("No TransitionDomain objects exist.") raise ValueError("No TransitionDomain objects exist.")
updated_trasition_domains = []
for transition_domain in all_transition_domains: for transition_domain in all_transition_domains:
domain_name = transition_domain.domain_name.upper() domain_name = transition_domain.domain_name.upper()
updated_transition_domain = transition_domain updated_transition_domain = transition_domain
@ -180,6 +181,7 @@ class LoadExtraTransitionDomain:
f"Successfully updated {domain_name}" f"Successfully updated {domain_name}"
f"{TerminalColors.ENDC}" f"{TerminalColors.ENDC}"
) )
updated_trasition_domains.append(updated_transition_domain)
# If we run into an exception on this domain, # If we run into an exception on this domain,
@ -195,8 +197,7 @@ class LoadExtraTransitionDomain:
logger.info( logger.info(
f"""{TerminalColors.OKGREEN} f"""{TerminalColors.OKGREEN}
============= FINISHED =============== ============= FINISHED ===============
Created 123 transition domain entries, updated {len(updated_trasition_domains)} transition domain entries
updated 123 transition domain entries
{TerminalColors.ENDC} {TerminalColors.ENDC}
""" """
) )