Refactored handle() to be less complex, satisfied linter

Signed-off-by: CocoByte <nicolle.leclair@gmail.com>
This commit is contained in:
CocoByte 2023-10-11 16:12:15 -06:00
parent bb2c7f3cfc
commit b8a5718113
No known key found for this signature in database
GPG key ID: BBFAA2526384C97F

View file

@ -70,164 +70,46 @@ class Command(BaseCommand):
"""
)
def handle( # noqa: C901
self,
**options,
):
"""Parse entries in TransitionDomain table
and create (or update) corresponding entries in the
Domain and DomainInvitation tables."""
# grab command line arguments and store locally...
debug_on = options.get("debug")
debug_max_entries_to_parse = int(
options.get("limitParse")
) # set to 0 to parse all entries
self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse)
# domains to ADD
domains_to_create = []
domain_invitations_to_create = []
# domains we UPDATED
updated_domain_entries = []
# domains we SKIPPED
skipped_domain_entries = []
# domainInvitations we SKIPPED
skipped_domain_invitations = []
# if we are limiting our parse (for testing purposes, keep
# track of total rows parsed)
total_rows_parsed = 0
logger.info(
f"""{termColors.OKGREEN}
==========================
Beginning Data Transfer
==========================
{termColors.ENDC}"""
)
for transition_entry in TransitionDomain.objects.all():
transition_domain_name = transition_entry.domain_name
transition_domain_status = transition_entry.status
transition_domain_email = transition_entry.username
# Check for existing domain entry
try:
# DEBUG:
if debug_on:
logger.info(
f"""{termColors.OKCYAN}
Processing Transition Domain: {transition_domain_name}, {transition_domain_status}, {transition_domain_email}
{termColors.ENDC}""" # noqa
)
# for existing entry, update the status to
# the transition domain status
existingEntry = Domain.objects.get(name=transition_domain_name)
current_state = existingEntry.state
# DEBUG:
if debug_on:
logger.info(
f"""{termColors.YELLOW}
> Found existing domain entry for: {transition_domain_name}, {current_state}
{termColors.ENDC}""" # noqa
)
if transition_domain_status != current_state:
if (
transition_domain_status
== TransitionDomain.StatusChoices.ON_HOLD
):
existingEntry.place_client_hold(ignoreEPP=True)
else:
existingEntry.revert_client_hold(ignoreEPP=True)
existingEntry.save()
updated_domain_entries.append(existingEntry)
# DEBUG:
if debug_on:
logger.info(
f"""{termColors.YELLOW}
>> Updated {transition_domain_name} state from
'{current_state}' to '{existingEntry.state}'
(no domain invitation entry added)
{termColors.ENDC}"""
)
except Domain.DoesNotExist:
already_in_to_create = next(
(x for x in domains_to_create if x.name == transition_domain_name),
None,
)
if already_in_to_create:
# DEBUG:
if debug_on:
logger.info(
f"""{termColors.YELLOW}
Duplicate Detected: {transition_domain_name}.
Cannot add duplicate entry for another username.
Violates Unique Key constraint.
{termColors.ENDC}"""
)
else:
# no matching entry, make one
new_entry = Domain(
name=transition_domain_name, state=transition_domain_status
)
domains_to_create.append(new_entry)
if transition_domain_email:
new_domain_invitation = DomainInvitation(
email=transition_domain_email.lower(), domain=new_entry
)
domain_invitations_to_create.append(new_domain_invitation)
else:
logger.info(
f"{termColors.FAIL} ! No e-mail found for domain: {new_entry}" # noqa
f"(SKIPPED ADDING DOMAIN INVITATION){termColors.ENDC}"
)
skipped_domain_invitations.append(transition_domain_name)
# DEBUG:
if debug_on:
logger.info(
f"{termColors.OKCYAN} Adding domain AND domain invitation: {new_entry} {termColors.ENDC}" # noqa
)
except Domain.MultipleObjectsReturned:
logger.info(
f"""
{termColors.FAIL}
!!! ERROR: duplicate entries exist in the
Domain table for domain:
{transition_domain_name}
----------TERMINATING----------"""
)
sys.exit()
except TransitionNotAllowed as err:
skipped_domain_entries.append(transition_domain_name)
logger.info(
f"""{termColors.FAIL}
Unable to change state for {transition_domain_name}
TRANSITION NOT ALLOWED error message (internal):
{err}
----------SKIPPING----------"""
)
def update_domain_status(self,
transition_domain:TransitionDomain,
target_domain:Domain,
debug_on:bool) -> bool:
"""Given a transition domain that matches an existing domain,
updates the existing domain object with that status of
the transition domain.
Returns TRUE if an update was made. FALSE if the states
matched and no update was made"""
transition_domain_status = transition_domain.status
existing_status = target_domain.state
if transition_domain_status != existing_status:
if (
transition_domain_status
== TransitionDomain.StatusChoices.ON_HOLD
):
target_domain.place_client_hold(ignoreEPP=True)
else:
target_domain.revert_client_hold(ignoreEPP=True)
target_domain.save()
# DEBUG:
if debug_on or debug_max_entries_to_parse > 0:
if (
total_rows_parsed >= debug_max_entries_to_parse
and debug_max_entries_to_parse != 0
):
logger.info(
f"""{termColors.YELLOW}
----PARSE LIMIT REACHED. HALTING PARSER.----
{termColors.ENDC}
"""
)
break
if debug_on:
logger.info(
f"""{termColors.YELLOW}
>> Updated {target_domain.name} state from
'{existing_status}' to '{target_domain.state}'
(no domain invitation entry added)
{termColors.ENDC}"""
)
Domain.objects.bulk_create(domains_to_create)
DomainInvitation.objects.bulk_create(domain_invitations_to_create)
def print_summary_of_findings(self,
domains_to_create,
updated_domain_entries,
domain_invitations_to_create,
skipped_domain_entries,
skipped_domain_invitations,
debug_on):
"""Prints to terminal a summary of findings from
transferring transition domains to domains"""
total_new_entries = len(domains_to_create)
total_updated_domain_entries = len(updated_domain_entries)
@ -275,3 +157,150 @@ class Command(BaseCommand):
{termColors.ENDC}
"""
)
def handle(
self,
**options,
):
"""Parse entries in TransitionDomain table
and create (or update) corresponding entries in the
Domain and DomainInvitation tables."""
# grab command line arguments and store locally...
debug_on = options.get("debug")
debug_max_entries_to_parse = int(
options.get("limitParse")
) # set to 0 to parse all entries
self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse)
# domains to ADD
domains_to_create = []
domain_invitations_to_create = []
# domains we UPDATED
updated_domain_entries = []
# domains we SKIPPED
skipped_domain_entries = []
# domainInvitations we SKIPPED
skipped_domain_invitations = []
# if we are limiting our parse (for testing purposes, keep
# track of total rows parsed)
total_rows_parsed = 0
logger.info(
f"""{termColors.OKGREEN}
==========================
Beginning Data Transfer
==========================
{termColors.ENDC}"""
)
for transition_domain in TransitionDomain.objects.all():
transition_domain_name = transition_domain.domain_name
transition_domain_status = transition_domain.status
transition_domain_email = transition_domain.username
# Check for existing domain entry
try:
# DEBUG:
if debug_on:
logger.info(
f"""{termColors.OKCYAN}
Processing Transition Domain: {transition_domain_name}, {transition_domain_status}, {transition_domain_email}
{termColors.ENDC}""" # noqa
)
# get the existing domain
target_domain = Domain.objects.get(name=transition_domain_name)
# DEBUG:
if debug_on:
logger.info(
f"""{termColors.YELLOW}
> Found existing domain entry for: {transition_domain_name}, {target_domain.state}
{termColors.ENDC}""" # noqa
)
# for existing entry, update the status to
# the transition domain status
update_made = self.update_domain_status(transition_domain, target_domain, debug_on)
if update_made:
updated_domain_entries.append(transition_domain.name)
except Domain.DoesNotExist:
already_in_to_create = next(
(x for x in domains_to_create if x.name == transition_domain_name),
None,
)
if already_in_to_create:
# DEBUG:
if debug_on:
logger.info(
f"""{termColors.YELLOW}
Duplicate Detected: {transition_domain_name}.
Cannot add duplicate entry for another username.
Violates Unique Key constraint.
{termColors.ENDC}"""
)
else:
# no matching entry, make one
new_entry = Domain(
name=transition_domain_name, state=transition_domain_status
)
domains_to_create.append(new_entry)
if transition_domain_email:
new_domain_invitation = DomainInvitation(
email=transition_domain_email.lower(), domain=new_entry
)
domain_invitations_to_create.append(new_domain_invitation)
else:
logger.info(
f"{termColors.FAIL} ! No e-mail found for domain: {new_entry}" # noqa
f"(SKIPPED ADDING DOMAIN INVITATION){termColors.ENDC}"
)
skipped_domain_invitations.append(transition_domain_name)
# DEBUG:
if debug_on:
logger.info(
f"{termColors.OKCYAN} Adding domain AND domain invitation: {new_entry} {termColors.ENDC}" # noqa
)
except Domain.MultipleObjectsReturned:
logger.warning(
f"""
{termColors.FAIL}
!!! ERROR: duplicate entries exist in the
Domain table for domain:
{transition_domain_name}
----------TERMINATING----------"""
)
sys.exit()
except TransitionNotAllowed as err:
skipped_domain_entries.append(transition_domain_name)
logger.warning(
f"""{termColors.FAIL}
Unable to change state for {transition_domain_name}
TRANSITION NOT ALLOWED error message (internal):
{err}
----------SKIPPING----------"""
)
# Check parse limit
if debug_max_entries_to_parse > 0 and total_rows_parsed >= debug_max_entries_to_parse:
logger.info(
f"""{termColors.YELLOW}
----PARSE LIMIT REACHED. HALTING PARSER.----
{termColors.ENDC}
"""
)
break
Domain.objects.bulk_create(domains_to_create)
DomainInvitation.objects.bulk_create(domain_invitations_to_create)
self.print_summary_of_findings(domains_to_create,
updated_domain_entries,
domain_invitations_to_create,
skipped_domain_entries,
skipped_domain_invitations,
debug_on)