Merge branch 'main' into dk/1091-dnssec

This commit is contained in:
David Kennedy 2023-10-17 13:21:00 -04:00
commit a23bb9a723
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B
6 changed files with 1038 additions and 16 deletions

View file

@ -1,11 +1,12 @@
# Registrar Data Migration
There is an existing registrar/registry at Verisign. They will provide us with an
export of the data from that system. The goal of our data migration is to take
the provided data and use it to create as much as possible a _matching_ state
The original system has an existing registrar/registry that we will import.
The company of that system will provide us with an export of the data.
The goal of our data migration is to take the provided data and use
it to create as much as possible a _matching_ state
in our registrar.
There is no way to make our registrar _identical_ to the Verisign system
There is no way to make our registrar _identical_ to the original system
because we have a different data model and workflow model. Instead, we should
focus our migration efforts on creating a state in our new registrar that will
primarily allow users of the system to perform the tasks that they want to do.
@ -18,7 +19,7 @@ Login.gov account can make an account on the new registrar, and the first time
that person logs in through Login.gov, we make a corresponding account in our
user table. Because we cannot know the Universal Unique ID (UUID) for a
person's Login.gov account, we cannot pre-create user accounts for individuals
in our new registrar based on the data from Verisign.
in our new registrar based on the original data.
## Domains
@ -27,7 +28,7 @@ information is the registry, but the registrar needs a copy of that
information to make connections between registry users and the domains that
they manage. The registrar stores very few fields about a domain except for
its name, so it could be straightforward to import the exported list of domains
from Verisign's `escrow_domains.daily.dotgov.GOV.txt`. It doesn't appear that
from `escrow_domains.daily.dotgov.GOV.txt`. It doesn't appear that
that table stores a flag for active or inactive.
An example Django management command that can load the delimited text file
@ -43,7 +44,7 @@ docker compose run -T app ./manage.py load_domains_data < /tmp/escrow_domains.da
## User access to domains
The Verisign data contains a `escrow_domain_contacts.daily.dotgov.txt` file
The data export contains a `escrow_domain_contacts.daily.dotgov.txt` file
that links each domain to three different types of contacts: `billing`,
`tech`, and `admin`. The ID of the contact in this linking table corresponds
to the ID of a contact in the `escrow_contacts.daily.dotgov.txt` file. In the
@ -59,9 +60,9 @@ invitation's domain.
For the purposes of migration, we can prime the invitation system by creating
an invitation in the system for each email address listed in the
`domain_contacts` file. This means that if a person is currently a user in the
Verisign system, and they use the same email address with Login.gov, then they
original system, and they use the same email address with Login.gov, then they
will end up with access to the same domains in the new registrar that they
were associated with in the Verisign system.
were associated with in the original system.
A management command that does this needs to process two data files, one for
the contact information and one for the domain/contact association, so we
@ -76,3 +77,56 @@ An example script using this technique is in
```shell
docker compose run app ./manage.py load_domain_invitations /app/escrow_domain_contacts.daily.dotgov.GOV.txt /app/escrow_contacts.daily.dotgov.GOV.txt
```
## Transition Domains
We are provided with information about Transition Domains in 3 files:
FILE 1: **escrow_domain_contacts.daily.gov.GOV.txt** -> has the map of domain names to contact ID. Domains in this file will usually have 3 contacts each
FILE 2: **escrow_contacts.daily.gov.GOV.txt** -> has the mapping of contact id to contact email address (which is what we care about for sending domain invitations)
FILE 3: **escrow_domain_statuses.daily.gov.GOV.txt** -> has the map of domains and their statuses
Transferring this data from these files into our domain tables happens in two steps;
***IMPORTANT: only run the following locally, to avoid publicizing PII in our public repo.***
### STEP 1: Load Transition Domain data into TransitionDomain table
**SETUP**
In order to use the management command, we need to add the files to a folder under `src/`.
This will allow Docker to mount the files to a container (under `/app`) for our use.
- Create a folder called `tmp` underneath `src/`
- Add the above files to this folder
- Open a terminal and navigate to `src/`
Then run the following command (This will parse the three files in your `tmp` folder and load the information into the TransitionDomain table);
```shell
docker compose run -T app ./manage.py load_transition_domain /app/tmp/escrow_domain_contacts.daily.gov.GOV.txt /app/tmp/escrow_contacts.daily.gov.GOV.txt /app/tmp/escrow_domain_statuses.daily.gov.GOV.txt
```
**OPTIONAL COMMAND LINE ARGUMENTS**:
`--debug`
This will print out additional, detailed logs.
`--limitParse 100`
Directs the script to load only the first 100 entries into the table. You can adjust this number as needed for testing purposes.
`--resetTable`
This will delete all the data loaded into transtion_domain. It is helpful if you want to see the entries reload from scratch or for clearing test data.
### STEP 2: Transfer Transition Domain data into main Domain tables
Now that we've loaded all the data into TransitionDomain, we need to update the main Domain and DomainInvitation tables with this information.
In the same terminal as used in STEP 1, run the command below;
(This will parse the data in TransitionDomain and either create a corresponding Domain object, OR, if a corresponding Domain already exists, it will update that Domain with the incoming status. It will also create DomainInvitation objects for each user associated with the domain):
```shell
docker compose run -T app ./manage.py transfer_transition_domains_to_domains
```
**OPTIONAL COMMAND LINE ARGUMENTS**:
`--debug`
This will print out additional, detailed logs.
`--limitParse 100`
Directs the script to load only the first 100 entries into the table. You can adjust this number as needed for testing purposes.

View file

@ -0,0 +1,524 @@
import sys
import csv
import logging
import argparse
from collections import defaultdict
from django.core.management import BaseCommand
from registrar.models import TransitionDomain
logger = logging.getLogger(__name__)
class termColors:
"""Colors for terminal outputs
(makes reading the logs WAY easier)"""
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
YELLOW = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
BackgroundLightYellow = "\033[103m"
def query_yes_no(question: str, default="yes") -> bool:
"""Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
"""
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
logger.info(question + prompt)
choice = input().lower()
if default is not None and choice == "":
return valid[default]
elif choice in valid:
return valid[choice]
else:
logger.info("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
class Command(BaseCommand):
help = """Loads data for domains that are in transition
(populates transition_domain model objects)."""
def add_arguments(self, parser):
"""Add our three filename arguments (in order: domain contacts,
contacts, and domain statuses)
OPTIONAL ARGUMENTS:
--sep
The default delimiter is set to "|", but may be changed using --sep
--debug
A boolean (default to true), which activates additional print statements
--limitParse
Used to set a limit for the number of data entries to insert. Set to 0
(or just don't use this argument) to parse every entry.
--resetTable
Use this to trigger a prompt for deleting all table entries. Useful
for testing purposes, but USE WITH CAUTION
"""
parser.add_argument(
"domain_contacts_filename", help="Data file with domain contact information"
)
parser.add_argument(
"contacts_filename",
help="Data file with contact information",
)
parser.add_argument(
"domain_statuses_filename", help="Data file with domain status information"
)
parser.add_argument("--sep", default="|", help="Delimiter character")
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
parser.add_argument(
"--limitParse", default=0, help="Sets max number of entries to load"
)
parser.add_argument(
"--resetTable",
help="Deletes all data in the TransitionDomain table",
action=argparse.BooleanOptionalAction,
)
def print_debug_mode_statements(
self, debug_on: bool, debug_max_entries_to_parse: int
):
"""Prints additional terminal statements to indicate if --debug
or --limitParse are in use"""
if debug_on:
logger.info(
f"""{termColors.OKCYAN}
----------DEBUG MODE ON----------
Detailed print statements activated.
{termColors.ENDC}
"""
)
if debug_max_entries_to_parse > 0:
logger.info(
f"""{termColors.OKCYAN}
----------LIMITER ON----------
Parsing of entries will be limited to
{debug_max_entries_to_parse} lines per file.")
Detailed print statements activated.
{termColors.ENDC}
"""
)
def get_domain_user_dict(
self, domain_statuses_filename: str, sep: str
) -> defaultdict[str, str]:
"""Creates a mapping of domain name -> status"""
domain_status_dictionary = defaultdict(str)
logger.info("Reading domain statuses data file %s", domain_statuses_filename)
with open(domain_statuses_filename, "r") as domain_statuses_file: # noqa
for row in csv.reader(domain_statuses_file, delimiter=sep):
domainName = row[0].lower()
domainStatus = row[1].lower()
domain_status_dictionary[domainName] = domainStatus
logger.info("Loaded statuses for %d domains", len(domain_status_dictionary))
return domain_status_dictionary
def get_user_emails_dict(
self, contacts_filename: str, sep
) -> defaultdict[str, str]:
"""Creates mapping of userId -> emails"""
user_emails_dictionary = defaultdict(str)
logger.info("Reading domain-contacts data file %s", contacts_filename)
with open(contacts_filename, "r") as contacts_file:
for row in csv.reader(contacts_file, delimiter=sep):
user_id = row[0]
user_email = row[6]
user_emails_dictionary[user_id] = user_email
logger.info("Loaded emails for %d users", len(user_emails_dictionary))
return user_emails_dictionary
def get_mapped_status(self, status_to_map: str):
"""
Given a verisign domain status, return a corresponding
status defined for our domains.
We map statuses as follows;
"serverHold” fields will map to hold, clientHold to hold
and any ok state should map to Ready.
"""
status_maps = {
"hold": TransitionDomain.StatusChoices.ON_HOLD,
"serverhold": TransitionDomain.StatusChoices.ON_HOLD,
"clienthold": TransitionDomain.StatusChoices.ON_HOLD,
"created": TransitionDomain.StatusChoices.READY,
"ok": TransitionDomain.StatusChoices.READY,
}
mapped_status = status_maps.get(status_to_map)
return mapped_status
def print_summary_duplications(
self,
duplicate_domain_user_combos: list[TransitionDomain],
duplicate_domains: list[TransitionDomain],
users_without_email: list[str],
):
"""Called at the end of the script execution to print out a summary of
data anomalies in the imported Verisign data. Currently, we check for:
- duplicate domains
- duplicate domain - user pairs
- any users without e-mails (this would likely only happen if the contacts
file is missing a user found in the domain_contacts file)
"""
total_duplicate_pairs = len(duplicate_domain_user_combos)
total_duplicate_domains = len(duplicate_domains)
total_users_without_email = len(users_without_email)
if total_users_without_email > 0:
users_without_email_as_string = "{}".format(
", ".join(map(str, duplicate_domain_user_combos))
)
logger.warning(
f"{termColors.YELLOW} No e-mails found for users: {users_without_email_as_string}" # noqa
)
if total_duplicate_pairs > 0 or total_duplicate_domains > 0:
duplicate_pairs_as_string = "{}".format(
", ".join(map(str, duplicate_domain_user_combos))
)
duplicate_domains_as_string = "{}".format(
", ".join(map(str, duplicate_domains))
)
logger.warning(
f"""{termColors.YELLOW}
----DUPLICATES FOUND-----
{total_duplicate_pairs} DOMAIN - USER pairs
were NOT unique in the supplied data files;
{duplicate_pairs_as_string}
{total_duplicate_domains} DOMAINS were NOT unique in
the supplied data files;
{duplicate_domains_as_string}
{termColors.ENDC}"""
)
def print_summary_status_findings(
self, domains_without_status: list[str], outlier_statuses: list[str]
):
"""Called at the end of the script execution to print out a summary of
status anomolies in the imported Verisign data. Currently, we check for:
- domains without a status
- any statuses not accounted for in our status mappings (see
get_mapped_status() function)
"""
total_domains_without_status = len(domains_without_status)
total_outlier_statuses = len(outlier_statuses)
if total_domains_without_status > 0:
domains_without_status_as_string = "{}".format(
", ".join(map(str, domains_without_status))
)
logger.warning(
f"""{termColors.YELLOW}
--------------------------------------------
Found {total_domains_without_status} domains
without a status (defaulted to READY)
---------------------------------------------
{domains_without_status_as_string}
{termColors.ENDC}"""
)
if total_outlier_statuses > 0:
domains_without_status_as_string = "{}".format(
", ".join(map(str, outlier_statuses))
) # noqa
logger.warning(
f"""{termColors.YELLOW}
--------------------------------------------
Found {total_outlier_statuses} unaccounted
for statuses-
--------------------------------------------
No mappings found for the following statuses
(defaulted to Ready):
{domains_without_status_as_string}
{termColors.ENDC}"""
)
def print_debug(self, print_condition: bool, print_statement: str):
"""This function reduces complexity of debug statements
in other functions.
It uses the logger to write the given print_statement to the
terminal if print_condition is TRUE"""
# DEBUG:
if print_condition:
logger.info(print_statement)
def prompt_table_reset(self):
"""Brings up a prompt in the terminal asking
if the user wishes to delete data in the
TransitionDomain table. If the user confirms,
deletes all the data in the TransitionDomain table"""
confirm_reset = query_yes_no(
f"""
{termColors.FAIL}
WARNING: Resetting the table will permanently delete all
the data!
Are you sure you want to continue?{termColors.ENDC}"""
)
if confirm_reset:
logger.info(
f"""{termColors.YELLOW}
----------Clearing Table Data----------
(please wait)
{termColors.ENDC}"""
)
TransitionDomain.objects.all().delete()
def handle( # noqa: C901
self,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
**options,
):
"""Parse the data files and create TransitionDomains."""
sep = options.get("sep")
# If --resetTable was used, prompt user to confirm
# deletion of table data
if options.get("resetTable"):
self.prompt_table_reset()
# Get --debug argument
debug_on = options.get("debug")
# Get --LimitParse argument
debug_max_entries_to_parse = int(
options.get("limitParse")
) # set to 0 to parse all entries
# print message to terminal about which args are in use
self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse)
# STEP 1:
# Create mapping of domain name -> status
domain_status_dictionary = self.get_domain_user_dict(
domain_statuses_filename, sep
)
# STEP 2:
# Create mapping of userId -> email
user_emails_dictionary = self.get_user_emails_dict(contacts_filename, sep)
# STEP 3:
# Parse the domain_contacts file and create TransitionDomain objects,
# using the dictionaries from steps 1 & 2 to lookup needed information.
to_create = []
# keep track of statuses that don't match our available
# status values
outlier_statuses = []
# keep track of domains that have no known status
domains_without_status = []
# keep track of users that have no e-mails
users_without_email = []
# keep track of duplications..
duplicate_domains = []
duplicate_domain_user_combos = []
# keep track of domains we ADD or UPDATE
total_updated_domain_entries = 0
total_new_entries = 0
# if we are limiting our parse (for testing purposes, keep
# track of total rows parsed)
total_rows_parsed = 0
# Start parsing the main file and create TransitionDomain objects
logger.info("Reading domain-contacts data file %s", domain_contacts_filename)
with open(domain_contacts_filename, "r") as domain_contacts_file:
for row in csv.reader(domain_contacts_file, delimiter=sep):
total_rows_parsed += 1
# fields are just domain, userid, role
# lowercase the domain names
new_entry_domain_name = row[0].lower()
user_id = row[1]
new_entry_status = TransitionDomain.StatusChoices.READY
new_entry_email = ""
new_entry_emailSent = False # set to False by default
# PART 1: Get the status
if new_entry_domain_name not in domain_status_dictionary:
# This domain has no status...default to "Create"
# (For data analysis purposes, add domain name
# to list of all domains without status
# (avoid duplicate entries))
if new_entry_domain_name not in domains_without_status:
domains_without_status.append(new_entry_domain_name)
else:
# Map the status
original_status = domain_status_dictionary[new_entry_domain_name]
mapped_status = self.get_mapped_status(original_status)
if mapped_status is None:
# (For data analysis purposes, check for any statuses
# that don't have a mapping and add to list
# of "outlier statuses")
logger.info("Unknown status: " + original_status)
outlier_statuses.append(original_status)
else:
new_entry_status = mapped_status
# PART 2: Get the e-mail
if user_id not in user_emails_dictionary:
# this user has no e-mail...this should never happen
if user_id not in users_without_email:
users_without_email.append(user_id)
else:
new_entry_email = user_emails_dictionary[user_id]
# PART 3: Create the transition domain object
# Check for duplicate data in the file we are
# parsing so we do not add duplicates
# NOTE: Currently, we allow duplicate domains,
# but not duplicate domain-user pairs.
# However, track duplicate domains for now,
# since we are still deciding on whether
# to make this field unique or not. ~10/25/2023
existing_domain = next(
(x for x in to_create if x.domain_name == new_entry_domain_name),
None,
)
existing_domain_user_pair = next(
(
x
for x in to_create
if x.username == new_entry_email
and x.domain_name == new_entry_domain_name
),
None,
)
if existing_domain is not None:
# DEBUG:
self.print_debug(
debug_on,
f"{termColors.YELLOW} DUPLICATE file entries found for domain: {new_entry_domain_name} {termColors.ENDC}", # noqa
)
if new_entry_domain_name not in duplicate_domains:
duplicate_domains.append(new_entry_domain_name)
if existing_domain_user_pair is not None:
# DEBUG:
self.print_debug(
debug_on,
f"""{termColors.YELLOW} DUPLICATE file entries found for domain - user {termColors.BackgroundLightYellow} PAIR {termColors.ENDC}{termColors.YELLOW}:
{new_entry_domain_name} - {new_entry_email} {termColors.ENDC}""", # noqa
)
if existing_domain_user_pair not in duplicate_domain_user_combos:
duplicate_domain_user_combos.append(existing_domain_user_pair)
else:
entry_exists = TransitionDomain.objects.filter(
username=new_entry_email, domain_name=new_entry_domain_name
).exists()
if entry_exists:
try:
existing_entry = TransitionDomain.objects.get(
username=new_entry_email,
domain_name=new_entry_domain_name,
)
if existing_entry.status != new_entry_status:
# DEBUG:
self.print_debug(
debug_on,
f"{termColors.OKCYAN}"
f"Updating entry: {existing_entry}"
f"Status: {existing_entry.status} > {new_entry_status}" # noqa
f"Email Sent: {existing_entry.email_sent} > {new_entry_emailSent}" # noqa
f"{termColors.ENDC}",
)
existing_entry.status = new_entry_status
existing_entry.email_sent = new_entry_emailSent
existing_entry.save()
except TransitionDomain.MultipleObjectsReturned:
logger.info(
f"{termColors.FAIL}"
f"!!! ERROR: duplicate entries exist in the"
f"transtion_domain table for domain:"
f"{new_entry_domain_name}"
f"----------TERMINATING----------"
)
sys.exit()
else:
# no matching entry, make one
new_entry = TransitionDomain(
username=new_entry_email,
domain_name=new_entry_domain_name,
status=new_entry_status,
email_sent=new_entry_emailSent,
)
to_create.append(new_entry)
total_new_entries += 1
# DEBUG:
self.print_debug(
debug_on,
f"{termColors.OKCYAN} Adding entry {total_new_entries}: {new_entry} {termColors.ENDC}", # noqa
)
# Check Parse limit and exit loop if needed
if (
total_rows_parsed >= debug_max_entries_to_parse
and debug_max_entries_to_parse != 0
):
logger.info(
f"{termColors.YELLOW}"
f"----PARSE LIMIT REACHED. HALTING PARSER.----"
f"{termColors.ENDC}"
)
break
TransitionDomain.objects.bulk_create(to_create)
logger.info(
f"""{termColors.OKGREEN}
============= FINISHED ===============
Created {total_new_entries} transition domain entries,
updated {total_updated_domain_entries} transition domain entries
{termColors.ENDC}
"""
)
# Print a summary of findings (duplicate entries,
# missing data..etc.)
self.print_summary_duplications(
duplicate_domain_user_combos, duplicate_domains, users_without_email
)
self.print_summary_status_findings(domains_without_status, outlier_statuses)

View file

@ -0,0 +1,409 @@
import logging
import argparse
import sys
from django_fsm import TransitionNotAllowed # type: ignore
from django.core.management import BaseCommand
from registrar.models import TransitionDomain
from registrar.models import Domain
from registrar.models import DomainInvitation
logger = logging.getLogger(__name__)
class termColors:
"""Colors for terminal outputs
(makes reading the logs WAY easier)"""
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
YELLOW = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
BackgroundLightYellow = "\033[103m"
class Command(BaseCommand):
help = """Load data from transition domain tables
into main domain tables. Also create domain invitation
entries for every domain we ADD (but not for domains
we UPDATE)"""
def add_arguments(self, parser):
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
parser.add_argument(
"--limitParse",
default=0,
help="Sets max number of entries to load, set to 0 to load all entries",
)
def print_debug_mode_statements(
self, debug_on: bool, debug_max_entries_to_parse: int
):
"""Prints additional terminal statements to indicate if --debug
or --limitParse are in use"""
self.print_debug(
debug_on,
f"""{termColors.OKCYAN}
----------DEBUG MODE ON----------
Detailed print statements activated.
{termColors.ENDC}
""",
)
self.print_debug(
debug_max_entries_to_parse > 0,
f"""{termColors.OKCYAN}
----------LIMITER ON----------
Parsing of entries will be limited to
{debug_max_entries_to_parse} lines per file.")
Detailed print statements activated.
{termColors.ENDC}
""",
)
def print_debug(self, print_condition: bool, print_statement: str):
"""This function reduces complexity of debug statements
in other functions.
It uses the logger to write the given print_statement to the
terminal if print_condition is TRUE"""
# DEBUG:
if print_condition:
logger.info(print_statement)
def update_domain_status(
self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool
) -> bool:
"""Given a transition domain that matches an existing domain,
updates the existing domain object with that status of
the transition domain.
Returns TRUE if an update was made. FALSE if the states
matched and no update was made"""
transition_domain_status = transition_domain.status
existing_status = target_domain.state
if transition_domain_status != existing_status:
if transition_domain_status == TransitionDomain.StatusChoices.ON_HOLD:
target_domain.place_client_hold(ignoreEPP=True)
else:
target_domain.revert_client_hold(ignoreEPP=True)
target_domain.save()
# DEBUG:
self.print_debug(
debug_on,
f"""{termColors.YELLOW}
>> Updated {target_domain.name} state from
'{existing_status}' to '{target_domain.state}'
(no domain invitation entry added)
{termColors.ENDC}""",
)
return True
return False
def print_summary_of_findings(
self,
domains_to_create,
updated_domain_entries,
domain_invitations_to_create,
skipped_domain_entries,
debug_on,
):
"""Prints to terminal a summary of findings from
transferring transition domains to domains"""
total_new_entries = len(domains_to_create)
total_updated_domain_entries = len(updated_domain_entries)
total_domain_invitation_entries = len(domain_invitations_to_create)
logger.info(
f"""{termColors.OKGREEN}
============= FINISHED ===============
Created {total_new_entries} transition domain entries,
Updated {total_updated_domain_entries} transition domain entries
Created {total_domain_invitation_entries} domain invitation entries
(NOTE: no invitations are SENT in this script)
{termColors.ENDC}
"""
)
if len(skipped_domain_entries) > 0:
logger.info(
f"""{termColors.FAIL}
============= SKIPPED DOMAINS (ERRORS) ===============
{skipped_domain_entries}
{termColors.ENDC}
"""
)
# determine domainInvitations we SKIPPED
skipped_domain_invitations = []
for domain in domains_to_create:
skipped_domain_invitations.append(domain)
for domain_invite in domain_invitations_to_create:
if domain_invite.domain in skipped_domain_invitations:
skipped_domain_invitations.remove(domain_invite.domain)
if len(skipped_domain_invitations) > 0:
logger.info(
f"""{termColors.FAIL}
============= SKIPPED DOMAIN INVITATIONS (ERRORS) ===============
{skipped_domain_invitations}
{termColors.ENDC}
"""
)
# DEBUG:
self.print_debug(
debug_on,
f"""{termColors.YELLOW}
Created Domains:
{domains_to_create}
Updated Domains:
{updated_domain_entries}
{termColors.ENDC}
""",
)
def try_add_domain_invitation(
self, domain_email: str, associated_domain: Domain
) -> DomainInvitation | None:
"""If no domain invitation exists for the given domain and
e-mail, create and return a new domain invitation object.
If one already exists, or if the email is invalid, return NONE"""
# this should never happen, but adding it just in case
if associated_domain is None:
logger.warning(
f"""
{termColors.FAIL}
!!! ERROR: Domain cannot be null for a
Domain Invitation object!
RECOMMENDATION:
Somehow, an empty domain object is
being passed to the subroutine in charge
of making domain invitations. Walk through
the code to see what is amiss.
----------TERMINATING----------"""
)
sys.exit()
# check that the given e-mail is valid
if domain_email is not None and domain_email != "":
# check that a domain invitation doesn't already
# exist for this e-mail / Domain pair
domain_email_already_in_domain_invites = DomainInvitation.objects.filter(
email=domain_email.lower(), domain=associated_domain
).exists()
if not domain_email_already_in_domain_invites:
# Create new domain invitation
new_domain_invitation = DomainInvitation(
email=domain_email.lower(), domain=associated_domain
)
return new_domain_invitation
return None
def handle(
self,
**options,
):
"""Parse entries in TransitionDomain table
and create (or update) corresponding entries in the
Domain and DomainInvitation tables."""
# grab command line arguments and store locally...
debug_on = options.get("debug")
debug_max_entries_to_parse = int(
options.get("limitParse")
) # set to 0 to parse all entries
self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse)
# domains to ADD
domains_to_create = []
domain_invitations_to_create = []
# domains we UPDATED
updated_domain_entries = []
# domains we SKIPPED
skipped_domain_entries = []
# if we are limiting our parse (for testing purposes, keep
# track of total rows parsed)
total_rows_parsed = 0
logger.info(
f"""{termColors.OKGREEN}
==========================
Beginning Data Transfer
==========================
{termColors.ENDC}"""
)
for transition_domain in TransitionDomain.objects.all():
transition_domain_name = transition_domain.domain_name
transition_domain_status = transition_domain.status
transition_domain_email = transition_domain.username
# DEBUG:
self.print_debug(
debug_on,
f"""{termColors.OKCYAN}
Processing Transition Domain: {transition_domain_name}, {transition_domain_status}, {transition_domain_email}
{termColors.ENDC}""", # noqa
)
new_domain_invitation = None
# Check for existing domain entry
domain_exists = Domain.objects.filter(name=transition_domain_name).exists()
if domain_exists:
try:
# get the existing domain
domain_to_update = Domain.objects.get(name=transition_domain_name)
# DEBUG:
self.print_debug(
debug_on,
f"""{termColors.YELLOW}
> Found existing entry in Domain table for: {transition_domain_name}, {domain_to_update.state}
{termColors.ENDC}""", # noqa
)
# for existing entry, update the status to
# the transition domain status
update_made = self.update_domain_status(
transition_domain, domain_to_update, debug_on
)
if update_made:
# keep track of updated domains for data analysis purposes
updated_domain_entries.append(transition_domain.domain_name)
# check if we need to add a domain invitation
# (eg. for a new user)
new_domain_invitation = self.try_add_domain_invitation(
transition_domain_email, domain_to_update
)
except Domain.MultipleObjectsReturned:
# This exception was thrown once before during testing.
# While the circumstances that led to corrupt data in
# the domain table was a freak accident, and the possibility of it
# happening again is safe-guarded by a key constraint,
# better to keep an eye out for it since it would require
# immediate attention.
logger.warning(
f"""
{termColors.FAIL}
!!! ERROR: duplicate entries already exist in the
Domain table for the following domain:
{transition_domain_name}
RECOMMENDATION:
This means the Domain table is corrupt. Please
check the Domain table data as there should be a key
constraint which prevents duplicate entries.
----------TERMINATING----------"""
)
sys.exit()
except TransitionNotAllowed as err:
skipped_domain_entries.append(transition_domain_name)
logger.warning(
f"""{termColors.FAIL}
Unable to change state for {transition_domain_name}
RECOMMENDATION:
This indicates there might have been changes to the
Domain model which were not accounted for in this
migration script. Please check state change rules
in the Domain model and ensure we are following the
correct state transition pathways.
INTERNAL ERROR MESSAGE:
'TRANSITION NOT ALLOWED' exception
{err}
----------SKIPPING----------"""
)
else:
# no entry was found in the domain table
# for the given domain. Create a new entry.
# first see if we are already adding an entry for this domain.
# The unique key constraint does not allow duplicate domain entries
# even if there are different users.
existing_domain_in_to_create = next(
(x for x in domains_to_create if x.name == transition_domain_name),
None,
)
if existing_domain_in_to_create is not None:
self.print_debug(
debug_on,
f"""{termColors.YELLOW}
Duplicate Detected: {transition_domain_name}.
Cannot add duplicate entry for another username.
Violates Unique Key constraint.
Checking for unique user e-mail for Domain Invitations...
{termColors.ENDC}""",
)
new_domain_invitation = self.try_add_domain_invitation(
transition_domain_email, existing_domain_in_to_create
)
else:
# no matching entry, make one
new_domain = Domain(
name=transition_domain_name, state=transition_domain_status
)
domains_to_create.append(new_domain)
# DEBUG:
self.print_debug(
debug_on,
f"{termColors.OKCYAN} Adding domain: {new_domain} {termColors.ENDC}", # noqa
)
new_domain_invitation = self.try_add_domain_invitation(
transition_domain_email, new_domain
)
if new_domain_invitation is None:
logger.info(
f"{termColors.YELLOW} ! No new e-mail detected !" # noqa
f"(SKIPPED ADDING DOMAIN INVITATION){termColors.ENDC}"
)
else:
# DEBUG:
self.print_debug(
debug_on,
f"{termColors.OKCYAN} Adding domain invitation: {new_domain_invitation} {termColors.ENDC}", # noqa
)
domain_invitations_to_create.append(new_domain_invitation)
# Check parse limit and exit loop if parse limit has been reached
if (
debug_max_entries_to_parse > 0
and total_rows_parsed >= debug_max_entries_to_parse
):
logger.info(
f"""{termColors.YELLOW}
----PARSE LIMIT REACHED. HALTING PARSER.----
{termColors.ENDC}
"""
)
break
Domain.objects.bulk_create(domains_to_create)
DomainInvitation.objects.bulk_create(domain_invitations_to_create)
self.print_summary_of_findings(
domains_to_create,
updated_domain_entries,
domain_invitations_to_create,
skipped_domain_entries,
debug_on,
)

View file

@ -0,0 +1,22 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("registrar", "0038_create_groups_v02"),
]
operations = [
migrations.AlterField(
model_name="transitiondomain",
name="status",
field=models.CharField(
blank=True,
choices=[("ready", "Ready"), ("on hold", "On Hold")],
default="ready",
help_text="domain status during the transfer",
max_length=255,
verbose_name="Status",
),
),
]

View file

@ -1308,20 +1308,29 @@ class Domain(TimeStampedModel, DomainHelper):
@transition(
field="state", source=[State.READY, State.ON_HOLD], target=State.ON_HOLD
)
def place_client_hold(self):
"""place a clienthold on a domain (no longer should resolve)"""
def place_client_hold(self, ignoreEPP=False):
"""place a clienthold on a domain (no longer should resolve)
ignoreEPP (boolean) - set to true to by-pass EPP (used for transition domains)
"""
# TODO - ensure all requirements for client hold are made here
# (check prohibited statuses)
logger.info("clientHold()-> inside clientHold")
self._place_client_hold()
# In order to allow transition domains to by-pass EPP calls,
# include this ignoreEPP flag
if not ignoreEPP:
self._place_client_hold()
# TODO -on the client hold ticket any additional error handling here
@transition(field="state", source=[State.READY, State.ON_HOLD], target=State.READY)
def revert_client_hold(self):
"""undo a clienthold placed on a domain"""
def revert_client_hold(self, ignoreEPP=False):
"""undo a clienthold placed on a domain
ignoreEPP (boolean) - set to true to by-pass EPP (used for transition domains)
"""
logger.info("clientHold()-> inside clientHold")
self._remove_client_hold()
if not ignoreEPP:
self._remove_client_hold()
# TODO -on the client hold ticket any additional error handling here
@transition(

View file

@ -5,7 +5,7 @@ from .utility.time_stamped_model import TimeStampedModel
class StatusChoices(models.TextChoices):
READY = "ready", "Ready"
HOLD = "hold", "Hold"
ON_HOLD = "on hold", "On Hold"
class TransitionDomain(TimeStampedModel):
@ -13,6 +13,10 @@ class TransitionDomain(TimeStampedModel):
state of a domain upon transition between registry
providers"""
# This is necessary to expose the enum to external
# classes that import TransitionDomain
StatusChoices = StatusChoices
username = models.TextField(
null=False,
blank=False,