Merge branch 'main' into nmb/1294-slow-roll

This commit is contained in:
Neil Martinsen-Burrell 2023-11-13 11:27:43 -06:00
commit 329e4add40
No known key found for this signature in database
GPG key ID: 6A3C818CC10D0184
25 changed files with 2726 additions and 394 deletions

3
.gitignore vendored
View file

@ -171,3 +171,6 @@ node_modules
# Compliance/trestle related
docs/compliance/.trestle/cache
src/migrationdata/*

View file

@ -79,15 +79,54 @@ docker compose run app ./manage.py load_domain_invitations /app/escrow_domain_co
```
## Transition Domains (Part 1) - Setup Files for Import
We are provided with information about Transition Domains in 3 files:
#### STEP 1: obtain data files
We are provided with information about Transition Domains in the following files:
- FILE 1: **escrow_domain_contacts.daily.gov.GOV.txt** -> has the map of domain names to contact ID. Domains in this file will usually have 3 contacts each
- FILE 2: **escrow_contacts.daily.gov.GOV.txt** -> has the mapping of contact id to contact email address (which is what we care about for sending domain invitations)
- FILE 3: **escrow_domain_statuses.daily.gov.GOV.txt** -> has the map of domains and their statuses
- FILE 4: **escrow_domains.daily.dotgov.GOV.txt** -> has a map of domainname, expiration and creation dates
- FILE 5: **domainadditionaldatalink.adhoc.dotgov.txt** -> has the map of domains to other data like authority, organization, & domain type
- FILE 6: **domaintypes.adhoc.dotgov.txt** -> has data on federal type and organization type
- FILE 7: **organization.adhoc.dotgov.txt** -> has organization name data
- FILE 8: **authority.adhoc.dotgov.txt** -> has authority data which maps to an agency
- FILE 9: **agency.adhoc.dotgov.txt** -> has federal agency data
- FILE 10: **migrationFilepaths.json** -> A JSON which points towards all given filenames. Specified below.
#### STEP 2: obtain JSON file (for file locations)
Add a JSON file called "migrationFilepaths.json" with the following contents (update filenames and directory as needed):
```
{
"directory": "migrationdata",
"contacts_filename": "escrow_contacts.daily.dotgov.GOV.txt",
"domain_contacts_filename": "escrow_domain_contacts.daily.dotgov.GOV.txt",
"domain_statuses_filename": "escrow_domain_statuses.daily.dotgov.GOV.txt",
"domain_escrow_filename": "escrow_domains.daily.dotgov.GOV.txt",
"domain_additional_filename": "domainadditionaldatalink.adhoc.dotgov.txt",
"domain_adhoc_filename": "domaintypes.adhoc.dotgov.txt",
"organization_adhoc_filename": "organization.adhoc.dotgov.txt"
"authority_adhoc_filename": "authority.adhoc.dotgov.txt",
"agency_adhoc_filename": "agency.adhoc.dotgov.txt",
}
```
This JSON file can exist anywhere, but to keep things simple, add it to the same folder as used in step 1. `src/migrationdata`.
Directory specifies the directory that the given `filenames` exist in. For instance, a `contacts_filename` of `test.txt` with a `directory` of `migrationdata` would need to exist under `migrationdata/test.txt`.
Later on, we will bundle this file along with the others into its own folder. Keep it within the `migrationdata/` directory if you are passing data to your sandbox, for simplicity.
We need to run a few scripts to parse these files into our domain tables.
We can do this both locally and in a sandbox.
#### STEP 3: Bundle all relevant data files into an archive
Move all the files specified in Step 1 into a shared folder, and create a tar.gz.
Create a folder on your desktop called `datafiles` and move all of the obtained files into that. Add these files to a tar.gz archive using any method. See (here)[https://stackoverflow.com/questions/53283240/how-to-create-tar-file-with-7zip].
After this is created, move this archive into `src/migrationdata`.
### SECTION 1 - SANDBOX MIGRATION SETUP
Load migration data onto a production or sandbox environment
@ -111,8 +150,6 @@ cat {LOCAL_PATH_TO_FILE} | cf ssh {APP_NAME_IN_ENVIRONMENT} -c "cat > /home/vcap
CloudFoundry supports scp as means of transferring data locally to our environment. If you are dealing with a batch of files, try sending across a tar.gz and unpacking that.
##### Login to Cloud.gov
```bash
@ -183,7 +220,6 @@ tar -xvf migrationdata/{FILE_NAME}.tar.gz -C migrationdata/ --strip-components=1
*FILE_NAME* - Name of the desired file, ex: exportdata
#### Manual method
If the `cat_files_into_getgov.py` script isn't working, follow these steps instead.
@ -203,22 +239,27 @@ cat ../tmp/{filename} > migrationdata/{filename}
In order to run the scripts locally, we need to add the files to a folder under `src/`.
This will allow Docker to mount the files to a container (under `/app`) for our use.
- Add the above files to the `migrationdata/` folder
- Add the same files from section 1 to a TEMPORARY `tmp/` folder under `src/` (do not check this folder into our repo)
- Open a terminal and navigate to `src/`
*You are now ready to run migration scripts.*
## Transition Domains (Part 2) - Running the Migration Scripts
While keeping the same ssh instance open (if you are running on a sandbox), run through the following commands.If you cannot run `manage.py` commands, try running `/tmp/lifecycle/shell` in the ssh instance.
### STEP 1: Load Transition Domains
Run the following command, making sure the file paths point to the right location. This will parse the three given files and load the information into the TransitionDomain table.
Run the following command, making sure the file paths point to the right location. This will parse all given files and load the information into the TransitionDomain table. Make sure you have your migrationFilepaths.json file in the same directory.
(NOTE: If working in cloud.gov, change "/app/tmp" to point to the `migrationdata/` directory and and remove "docker compose run -T app" from the command)
```
##### LOCAL COMMAND
```shell
docker compose run -T app ./manage.py load_transition_domain /app/tmp/escrow_domain_contacts.daily.gov.GOV.txt /app/tmp/escrow_contacts.daily.gov.GOV.txt /app/tmp/escrow_domain_statuses.daily.gov.GOV.txt --debug
docker-compose exec app ./manage.py load_transition_domain migrationFilepaths.json --directory /app/tmp/ --debug --limitParse 10
```
##### SANDBOX COMMAND
```shell
./manage.py load_transition_domain migrationFilepaths.json --debug
```
##### COMMAND LINE ARGUMENTS:
@ -232,6 +273,40 @@ Directs the script to load only the first 100 entries into the table. You can a
`--resetTable`
This will delete all the data in transtion_domain. It is helpful if you want to see the entries reload from scratch or for clearing test data.
###### (arguments that override filepaths and directories if needed)
`--directory`
Defines the directory where all data files and the JSON are stored.
`--domain_contacts_filename`
Defines the filename for domain contact information.
`--contacts_filename`
Defines the filename for contact information.
`--domain_statuses_filename`
Defines the filename for domain status information.
`--agency_adhoc_filename`
Defines the filename for agency adhocs.
`--domain_additional_filename`
Defines the filename for additional domain data.
`--domain_escrow_filename`
Defines the filename for creation/expiration domain data.
`--domain_adhoc_filename`
Defines the filename for domain type adhocs.
`--organization_adhoc_filename`
Defines the filename for domain type adhocs.
`--authority_adhoc_filename`
Defines the filename for domain type adhocs.
`--infer_filenames`
Determines if we should infer filenames or not. This setting is not available for use in environments with the flag `settings.DEBUG` set to false, as it is intended for local development only.
### STEP 2: Transfer Transition Domain data into main Domain tables
@ -239,10 +314,14 @@ Now that we've loaded all the data into TransitionDomain, we need to update the
In the same terminal as used in STEP 1, run the command below;
(This will parse the data in TransitionDomain and either create a corresponding Domain object, OR, if a corresponding Domain already exists, it will update that Domain with the incoming status. It will also create DomainInvitation objects for each user associated with the domain):
(NOTE: If working in cloud.gov, and remove "docker compose run -T app" from the command)
##### LOCAL COMMAND
```shell
docker compose run -T app ./manage.py transfer_transition_domains_to_domains --debug
```
##### SANDBOX COMMAND
```shell
./manage.py transfer_transition_domains_to_domains --debug
```
##### COMMAND LINE ARGUMENTS:
@ -256,10 +335,14 @@ Directs the script to load only the first 100 entries into the table. You can a
To send invitation emails for every transition domain in the transition domain table, execute the following command:
(NOTE: If working in cloud.gov, and remove "docker compose run -T app" from the command)
##### LOCAL COMMAND
```shell
docker compose run -T app ./manage.py send_domain_invitations -s
```
##### SANDBOX COMMAND
```shell
./manage.py send_domain_invitations -s
```
### STEP 4: Test the results (Run the analyzer script)
@ -269,18 +352,27 @@ This script's main function is to scan the transition domain and domain tables f
To analyze our database without running migrations, execute the script without any optional arguments:
(NOTE: If working in cloud.gov, and remove "docker compose run -T app" from the command)
##### LOCAL COMMAND
```shell
docker compose run -T app ./manage.py master_domain_migrations --debug
```
##### SANDBOX COMMAND
```shell
./manage.py master_domain_migrations --debug
```
#### OPTION 2 - RUN MIGRATIONS FEATURE
To run the migrations again (all above migration steps) before analyzing, execute the following command (read the documentation on the terminal arguments below. Everything used by the migration scripts can also be passed into this script and will have the same effects). NOTE: --debug and --prompt allow you to step through the migration process and exit it after each step if you need to. It is recommended that you use these arguments when using the --runMigrations feature:
To run the migrations again (all above migration steps) before analyzing, execute the following command (read the documentation on the terminal arguments below. Everything used by the migration scripts can also be passed into this script and will have the same effects). NOTE: --debug provides detailed logging statements during the migration. It is recommended that you use this argument when using the --runMigrations feature:
(NOTE: If working in cloud.gov, and remove "docker compose run -T app" from the command)
(NOTE: If you named your JSON file something other than "migrationFilepaths.json" (all the way back in the "file setup" section). You will want to utilize the `--migrationJSON` argument in the following commands...)
##### LOCAL COMMAND
```shell
docker compose run -T app ./manage.py master_domain_migrations --runMigrations --debug --prompt
docker compose run -T app ./manage.py master_domain_migrations --migrationDirectory /app/tmp --runMigrations --debug
```
##### SANDBOX COMMAND
```shell
./manage.py master_domain_migrations --runMigrations --debug
```
##### COMMAND LINE ARGUMENTS
@ -291,25 +383,18 @@ Runs all scripts (in sequence) for transition domain migrations
`--migrationDirectory`
The location of the files used for load_transition_domain migration script.
The location of both the JSON file and all files needed for migration.
(default is "migrationdata" (This is the sandbox directory))
Example Usage:
*--migrationDirectory /app/tmp*
`--migrationFilenames`
`--migrationJSON`
The filenames used for load_transition_domain migration script.
Must appear *in oprder* and comma-delimited:
default is "escrow_domain_contacts.daily.gov.GOV.txt,escrow_contacts.daily.gov.GOV.txt,escrow_domain_statuses.daily.gov.GOV.txt"
where...
- domain_contacts_filename is the Data file with domain contact information
- contacts_filename is the Data file with contact information
- domain_statuses_filename is the Data file with domain status information
The filename of the JSON that holds all the filepath info needed for migrations.
Example Usage:
*--migrationFilenames domain_contacts_filename.txt,contacts_filename.txt,domain_statuses_filename.txt*
*--migrationJSON migrationFilepaths.json*
`--sep`
@ -320,9 +405,10 @@ Delimiter for the migration scripts to correctly parse the given text files.
Activates additional print statements
`--prompt`
`--disablePrompts`
Activates terminal prompts that allows the user to step through each portion of this script.
Disables the terminal prompts that allows the user to step through each portion of this script.
*used to facilitate unit tests. Not recommended for everyday use*
`--limitParse`

View file

@ -72,6 +72,8 @@ secret_registry_hostname = secret("REGISTRY_HOSTNAME")
BASE_DIR = path.resolve().parent.parent.parent
# SECURITY WARNING: don't run with debug turned on in production!
# TODO - Investigate the behaviour of this flag. Does not appear
# to function for the IS_PRODUCTION flag.
DEBUG = env_debug
# Controls production specific feature toggles

View file

@ -1,11 +1,15 @@
import json
import os
import sys
import csv
import logging
import argparse
from collections import defaultdict
from django.conf import settings
from django.core.management import BaseCommand
from registrar.management.commands.utility.epp_data_containers import EnumFilenames
from registrar.models import TransitionDomain
@ -14,6 +18,9 @@ from registrar.management.commands.utility.terminal_helper import (
TerminalHelper,
)
from .utility.transition_domain_arguments import TransitionDomainArguments
from .utility.extra_transition_domain_helper import LoadExtraTransitionDomain
logger = logging.getLogger(__name__)
@ -36,12 +43,10 @@ class Command(BaseCommand):
Use this to trigger a prompt for deleting all table entries. Useful
for testing purposes, but USE WITH CAUTION
"""
parser.add_argument("domain_contacts_filename", help="Data file with domain contact information")
parser.add_argument(
"contacts_filename",
help="Data file with contact information",
"migration_json_filename",
help=("A JSON file that holds the location and filenames" "of all the data files used for migrations"),
)
parser.add_argument("domain_statuses_filename", help="Data file with domain status information")
parser.add_argument("--sep", default="|", help="Delimiter character")
@ -55,6 +60,60 @@ class Command(BaseCommand):
action=argparse.BooleanOptionalAction,
)
# This option should only be available when developing locally.
# This should not be available to the end user.
if settings.DEBUG:
parser.add_argument(
"--infer_filenames",
action=argparse.BooleanOptionalAction,
help="Determines if we should infer filenames or not."
"Recommended to be enabled only in a development or testing setting.",
)
parser.add_argument("--directory", default="migrationdata", help="Desired directory")
parser.add_argument(
"--domain_contacts_filename",
help="Data file with domain contact information",
)
parser.add_argument(
"--contacts_filename",
help="Data file with contact information",
)
parser.add_argument(
"--domain_statuses_filename",
help="Data file with domain status information",
)
parser.add_argument(
"--agency_adhoc_filename",
default=EnumFilenames.AGENCY_ADHOC.value[1],
help="Defines the filename for agency adhocs",
)
parser.add_argument(
"--domain_additional_filename",
default=EnumFilenames.DOMAIN_ADDITIONAL.value[1],
help="Defines the filename for additional domain data",
)
parser.add_argument(
"--domain_escrow_filename",
default=EnumFilenames.DOMAIN_ESCROW.value[1],
help="Defines the filename for creation/expiration domain data",
)
parser.add_argument(
"--domain_adhoc_filename",
default=EnumFilenames.DOMAIN_ADHOC.value[1],
help="Defines the filename for domain type adhocs",
)
parser.add_argument(
"--organization_adhoc_filename",
default=EnumFilenames.ORGANIZATION_ADHOC.value[1],
help="Defines the filename for domain type adhocs",
)
parser.add_argument(
"--authority_adhoc_filename",
default=EnumFilenames.AUTHORITY_ADHOC.value[1],
help="Defines the filename for domain type adhocs",
)
def print_debug_mode_statements(self, debug_on: bool, debug_max_entries_to_parse: int):
"""Prints additional terminal statements to indicate if --debug
or --limitParse are in use"""
@ -95,6 +154,7 @@ class Command(BaseCommand):
logger.info("Reading contacts data file %s", contacts_filename)
with open(contacts_filename, "r") as contacts_file:
for row in csv.reader(contacts_file, delimiter=sep):
if row != []:
user_id = row[0]
user_email = row[6]
user_emails_dictionary[user_id] = user_email
@ -191,7 +251,7 @@ class Command(BaseCommand):
--------------------------------------------
Found {total_outlier_statuses} unaccounted
for statuses-
for statuses
--------------------------------------------
No mappings found for the following statuses
@ -222,30 +282,131 @@ class Command(BaseCommand):
)
TransitionDomain.objects.all().delete()
def parse_extra(self, options):
"""Loads additional information for each TransitionDomain
object based off supplied files."""
try:
# Parse data from files
extra_data = LoadExtraTransitionDomain(options)
# Update every TransitionDomain object where applicable
extra_data.update_transition_domain_models()
except Exception as err:
logger.error(f"Could not load additional TransitionDomain data. {err}")
raise err
# TODO: handle this better...needs more logging
def handle( # noqa: C901
self,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
migration_json_filename,
**options,
):
"""Parse the data files and create TransitionDomains."""
sep = options.get("sep")
if not settings.DEBUG:
options["infer_filenames"] = False
args = TransitionDomainArguments(**options)
# Desired directory for additional TransitionDomain data
# (In the event they are stored seperately)
directory = args.directory
# Add a slash if the last character isn't one
if directory and directory[-1] != "/":
directory += "/"
json_filepath = directory + migration_json_filename
# Process JSON file #
# If a JSON was provided, use its values instead of defaults.
# TODO: there is no way to discern user overrides from those args defaults.
with open(json_filepath, "r") as jsonFile:
# load JSON object as a dictionary
try:
data = json.load(jsonFile)
# Create an instance of TransitionDomainArguments
# Iterate over the data from the JSON file
for key, value in data.items():
# Check if the key exists in TransitionDomainArguments
if hasattr(args, key):
# If it does, update the options
options[key] = value
except Exception as err:
logger.error(
f"{TerminalColors.FAIL}"
"There was an error loading "
"the JSON responsible for providing filepaths."
f"{TerminalColors.ENDC}"
)
raise err
sep = args.sep
# If --resetTable was used, prompt user to confirm
# deletion of table data
if options.get("resetTable"):
if args.resetTable:
self.prompt_table_reset()
# Get --debug argument
debug_on = options.get("debug")
debug_on = args.debug
# Get --LimitParse argument
debug_max_entries_to_parse = int(options.get("limitParse")) # set to 0 to parse all entries
debug_max_entries_to_parse = int(args.limitParse) # set to 0 to parse all entries
# Variables for Additional TransitionDomain Information #
# Main script filenames - these do not have defaults
domain_contacts_filename = None
try:
domain_contacts_filename = directory + options.get("domain_contacts_filename")
except TypeError:
logger.error(
f"Invalid filename of '{args.domain_contacts_filename}'" " was provided for domain_contacts_filename"
)
contacts_filename = None
try:
contacts_filename = directory + options.get("contacts_filename")
except TypeError:
logger.error(f"Invalid filename of '{args.contacts_filename}'" " was provided for contacts_filename")
domain_statuses_filename = None
try:
domain_statuses_filename = directory + options.get("domain_statuses_filename")
except TypeError:
logger.error(
f"Invalid filename of '{args.domain_statuses_filename}'" " was provided for domain_statuses_filename"
)
# Agency information
agency_adhoc_filename = options.get("agency_adhoc_filename")
# Federal agency / organization type information
domain_adhoc_filename = options.get("domain_adhoc_filename")
# Organization name information
organization_adhoc_filename = options.get("organization_adhoc_filename")
# Creation date / expiration date information
domain_escrow_filename = options.get("domain_escrow_filename")
# Container for all additional TransitionDomain information
domain_additional_filename = options.get("domain_additional_filename")
# print message to terminal about which args are in use
self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse)
filenames = [
agency_adhoc_filename,
domain_adhoc_filename,
organization_adhoc_filename,
domain_escrow_filename,
domain_additional_filename,
]
# Do a top-level check to see if these files exist
for filename in filenames:
if not isinstance(filename, str):
raise TypeError(f"Filename must be a string, got {type(filename).__name__}")
full_path = os.path.join(directory, filename)
if not os.path.isfile(full_path):
raise FileNotFoundError(full_path)
# STEP 1:
# Create mapping of domain name -> status
domain_status_dictionary = self.get_domain_user_dict(domain_statuses_filename, sep)
@ -257,7 +418,6 @@ class Command(BaseCommand):
# STEP 3:
# Parse the domain_contacts file and create TransitionDomain objects,
# using the dictionaries from steps 1 & 2 to lookup needed information.
to_create = []
# keep track of statuses that don't match our available
@ -297,6 +457,11 @@ class Command(BaseCommand):
new_entry_email = ""
new_entry_emailSent = False # set to False by default
TerminalHelper.print_conditional(
True,
f"Processing item {total_rows_parsed}: {new_entry_domain_name}",
)
# PART 1: Get the status
if new_entry_domain_name not in domain_status_dictionary:
# This domain has no status...default to "Create"
@ -419,12 +584,28 @@ class Command(BaseCommand):
break
TransitionDomain.objects.bulk_create(to_create)
# Print a summary of findings (duplicate entries,
# missing data..etc.)
self.print_summary_duplications(duplicate_domain_user_combos, duplicate_domains, users_without_email)
self.print_summary_status_findings(domains_without_status, outlier_statuses)
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============
Created {total_new_entries} transition domain entries,
updated {total_updated_domain_entries} transition domain entries
Updated {total_updated_domain_entries} transition domain entries
{TerminalColors.YELLOW}
----- DUPLICATES FOUND -----
{len(duplicate_domain_user_combos)} DOMAIN - USER pairs
were NOT unique in the supplied data files.
{len(duplicate_domains)} DOMAINS were NOT unique in
the supplied data files.
----- STATUSES -----
{len(domains_without_status)} DOMAINS had NO status (defaulted to READY).
{len(outlier_statuses)} Statuses were invalid (defaulted to READY).
{TerminalColors.ENDC}
"""
)
@ -433,3 +614,54 @@ class Command(BaseCommand):
# missing data..etc.)
self.print_summary_duplications(duplicate_domain_user_combos, duplicate_domains, users_without_email)
self.print_summary_status_findings(domains_without_status, outlier_statuses)
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============
Created {total_new_entries} transition domain entries,
Updated {total_updated_domain_entries} transition domain entries
{TerminalColors.YELLOW}
----- DUPLICATES FOUND -----
{len(duplicate_domain_user_combos)} DOMAIN - USER pairs
were NOT unique in the supplied data files.
{len(duplicate_domains)} DOMAINS were NOT unique in
the supplied data files.
----- STATUSES -----
{len(domains_without_status)} DOMAINS had NO status (defaulted to READY).
{len(outlier_statuses)} Statuses were invalid (defaulted to READY).
{TerminalColors.ENDC}
"""
)
# Prompt the user if they want to load additional data on the domains
title = "Do you wish to load additional data for TransitionDomains?"
proceed = TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
!!! ENSURE THAT ALL FILENAMES ARE CORRECT BEFORE PROCEEDING
==Master data file==
domain_additional_filename: {domain_additional_filename}
==Federal agency information==
agency_adhoc_filename: {agency_adhoc_filename}
==Federal type / organization type information==
domain_adhoc_filename: {domain_adhoc_filename}
==Organization name information==
organization_adhoc_filename: {organization_adhoc_filename}
==Creation date / expiration date information==
domain_escrow_filename: {domain_escrow_filename}
==Containing directory==
directory: {directory}
""",
prompt_title=title,
)
if proceed:
arguments = TransitionDomainArguments(**options)
self.parse_extra(arguments)

View file

@ -7,7 +7,6 @@
import logging
import argparse
import sys
from django.core.management import BaseCommand
from django.core.management import call_command
@ -28,29 +27,27 @@ logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = """ """
help = """ """ # TODO: update this!
# ======================================================
# ================== ARGUMENTS ===================
# ======================================================
def add_arguments(self, parser):
"""
OPTIONAL ARGUMENTS:
--runMigrations
Triggers running all scripts (in sequence)
for transition domain migrations
A boolean (default to true), which triggers running
all scripts (in sequence) for transition domain migrations
--migrationDirectory
The location of the files used for load_transition_domain migration script
EXAMPLE USAGE:
> --migrationDirectory /app/tmp
--migrationFilenames
The files used for load_transition_domain migration script.
Must appear IN ORDER and comma-delimiteds:
--migrationJSON
The name of the JSON file used for load_transition_domain migration script
EXAMPLE USAGE:
> --migrationFilenames domain_contacts_filename.txt,contacts_filename.txt,domain_statuses_filename.txt
where...
- domain_contacts_filename is the Data file with domain contact information
- contacts_filename is the Data file with contact information
- domain_statuses_filename is the Data file with domain status information
> --migrationJSON migrationFilepaths.json
--sep
Delimiter for the migration scripts to correctly parse the given text files.
@ -59,9 +56,8 @@ class Command(BaseCommand):
--debug
Activates additional print statements
--prompt
Activates terminal prompts that allows
the user to step through each portion of this
--disablePrompts
Disables terminal prompts that allows the user to step through each portion of this
script.
--limitParse
@ -91,33 +87,28 @@ class Command(BaseCommand):
)
# The following file arguments have default values for running in the sandbox
# TODO: make this a mandatory argument
# (if/when we strip out defaults, it will be mandatory)
# TODO: use the migration directory arg or force user to type FULL filepath?
parser.add_argument(
"--migrationJSON",
default="migrationFilepaths.json",
help=("A JSON file that holds the location and filenames" "of all the data files used for migrations"),
)
# TODO: deprecate this once JSON module is done? (or keep as an override)
parser.add_argument(
"--migrationDirectory",
default="migrationdata",
help=("The location of the files used for load_transition_domain migration script"),
)
parser.add_argument(
"--migrationFilenames",
default="escrow_domain_contacts.daily.gov.GOV.txt,"
"escrow_contacts.daily.gov.GOV.txt,"
"escrow_domain_statuses.daily.gov.GOV.txt",
help="""The files used for load_transition_domain migration script.
Must appear IN ORDER and separated by commas:
domain_contacts_filename.txt,contacts_filename.txt,domain_statuses_filename.txt
where...
- domain_contacts_filename is the Data file with domain contact
information
- contacts_filename is the Data file with contact information
- domain_statuses_filename is the Data file with domain status
information""",
)
parser.add_argument("--sep", default="|", help="Delimiter character for the migration files")
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
parser.add_argument("--prompt", action=argparse.BooleanOptionalAction)
parser.add_argument("--disablePrompts", action=argparse.BooleanOptionalAction)
parser.add_argument("--limitParse", default=0, help="Sets max number of entries to load")
@ -127,6 +118,10 @@ class Command(BaseCommand):
action=argparse.BooleanOptionalAction,
)
# ======================================================
# =============== DATA ANALYSIS ==================
# ======================================================
def compare_tables(self, debug_on: bool):
"""Does a diff between the transition_domain and the following tables:
domain, domain_information and the domain_invitation.
@ -237,27 +232,26 @@ class Command(BaseCommand):
"""
)
# ======================================================
# ================= MIGRATIONS ===================
# ======================================================
def run_load_transition_domain_script(
self,
file_location: str,
domain_contacts_filename: str,
contacts_filename: str,
domain_statuses_filename: str,
migration_json_filename: str,
file_directory: str,
sep: str,
reset_table: bool,
debug_on: bool,
prompts_enabled: bool,
debug_max_entries_to_parse: int,
):
if file_directory and file_directory[-1] != "/":
file_directory += "/"
json_filepath = migration_json_filename
"""Runs the load_transition_domain script"""
# Create the command string
command_script = "load_transition_domain"
command_string = (
f"./manage.py {command_script} "
f"{file_location+domain_contacts_filename} "
f"{file_location+contacts_filename} "
f"{file_location+domain_statuses_filename} "
)
command_string = f"./manage.py {command_script} " f"{json_filepath} "
if sep is not None and sep != "|":
command_string += f"--sep {sep} "
if reset_table:
@ -266,26 +260,28 @@ class Command(BaseCommand):
command_string += "--debug "
if debug_max_entries_to_parse > 0:
command_string += f"--limitParse {debug_max_entries_to_parse} "
if file_directory:
command_string += f"--directory {file_directory}"
# Execute the command string
proceed = False
if prompts_enabled:
system_exit_on_terminate = True
TerminalHelper.prompt_for_execution(
system_exit_on_terminate,
proceed = TerminalHelper.prompt_for_execution(
True,
command_string,
"Running load_transition_domain script",
)
# TODO: make this somehow run inside TerminalHelper prompt
if proceed or not prompts_enabled:
call_command(
command_script,
f"{file_location+domain_contacts_filename}",
f"{file_location+contacts_filename}",
f"{file_location+domain_statuses_filename}",
json_filepath,
sep=sep,
resetTable=reset_table,
debug=debug_on,
limitParse=debug_max_entries_to_parse,
directory=file_directory,
)
def run_transfer_script(self, debug_on: bool, prompts_enabled: bool):
@ -296,15 +292,15 @@ class Command(BaseCommand):
if debug_on:
command_string += "--debug "
# Execute the command string
proceed = False
if prompts_enabled:
system_exit_on_terminate = True
TerminalHelper.prompt_for_execution(
system_exit_on_terminate,
proceed = TerminalHelper.prompt_for_execution(
True,
command_string,
"Running transfer_transition_domains_to_domains script",
)
# TODO: make this somehow run inside TerminalHelper prompt
if proceed or not prompts_enabled:
call_command(command_script)
def run_send_invites_script(self, debug_on: bool, prompts_enabled: bool):
@ -313,23 +309,22 @@ class Command(BaseCommand):
command_script = "send_domain_invitations"
command_string = f"./manage.py {command_script} -s"
# Execute the command string
proceed = False
if prompts_enabled:
system_exit_on_terminate = True
TerminalHelper.prompt_for_execution(
system_exit_on_terminate,
proceed = TerminalHelper.prompt_for_execution(
False,
command_string,
"Running send_domain_invitations script",
)
# TODO: make this somehow run inside TerminalHelper prompt
if proceed or not prompts_enabled:
call_command(command_script, send_emails=True)
def run_migration_scripts(
self,
migration_json_filename,
file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
@ -352,10 +347,8 @@ class Command(BaseCommand):
The migration scripts are looking in directory....
{file_location}
....for the following files:
- domain contacts: {domain_contacts_filename}
- contacts: {contacts_filename}
- domain statuses: {domain_statuses_filename}
....for the following JSON:
{migration_json_filename}
{TerminalColors.FAIL}
Does this look correct?{TerminalColors.ENDC}"""
@ -370,17 +363,15 @@ class Command(BaseCommand):
f"""
{TerminalColors.YELLOW}
PLEASE Re-Run the script with the correct
file location and filenames:
JSON filename and directory:
"""
)
return
# Proceed executing the migration scripts
self.run_load_transition_domain_script(
migration_json_filename,
file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,
@ -413,7 +404,7 @@ class Command(BaseCommand):
# Get arguments
debug_on = options.get("debug")
prompts_enabled = options.get("prompt")
prompts_enabled = not options.get("disablePrompts")
run_migrations_enabled = options.get("runMigrations")
TerminalHelper.print_conditional(
@ -462,33 +453,13 @@ class Command(BaseCommand):
debug_max_entries_to_parse = int(options.get("limitParse"))
# Grab filepath information from the arguments
file_location = options.get("migrationDirectory") + "/"
filenames = options.get("migrationFilenames").split(",")
if len(filenames) < 3:
filenames_as_string = "{}".format(", ".join(map(str, filenames)))
logger.info(
f"""
{TerminalColors.FAIL}
--migrationFilenames expected 3 filenames to follow it,
but only {len(filenames)} were given:
{filenames_as_string}
PLEASE MODIFY THE SCRIPT AND TRY RUNNING IT AGAIN
============= TERMINATING =============
{TerminalColors.ENDC}
"""
)
sys.exit()
domain_contacts_filename = filenames[0]
contacts_filename = filenames[1]
domain_statuses_filename = filenames[2]
file_location = options.get("migrationDirectory")
migration_json_filename = options.get("migrationJSON")
# Run migration scripts
self.run_migration_scripts(
migration_json_filename,
file_location,
domain_contacts_filename,
contacts_filename,
domain_statuses_filename,
sep,
reset_table,
debug_on,

View file

@ -14,6 +14,9 @@ from registrar.management.commands.utility.terminal_helper import (
TerminalColors,
TerminalHelper,
)
from registrar.models.domain_application import DomainApplication
from registrar.models.domain_information import DomainInformation
from registrar.models.user import User
logger = logging.getLogger(__name__)
@ -24,6 +27,9 @@ class Command(BaseCommand):
entries for every domain we ADD (but not for domains
we UPDATE)"""
# ======================================================
# ===================== ARGUMENTS =====================
# ======================================================
def add_arguments(self, parser):
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
@ -33,6 +39,9 @@ class Command(BaseCommand):
help="Sets max number of entries to load, set to 0 to load all entries",
)
# ======================================================
# ===================== PRINTING ======================
# ======================================================
def print_debug_mode_statements(self, debug_on: bool, debug_max_entries_to_parse: int):
"""Prints additional terminal statements to indicate if --debug
or --limitParse are in use"""
@ -55,30 +64,13 @@ class Command(BaseCommand):
""",
)
def update_domain_status(self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool) -> bool:
"""Given a transition domain that matches an existing domain,
updates the existing domain object with that status of
the transition domain.
Returns TRUE if an update was made. FALSE if the states
matched and no update was made"""
transition_domain_status = transition_domain.status
existing_status = target_domain.state
if transition_domain_status != existing_status:
if transition_domain_status == TransitionDomain.StatusChoices.ON_HOLD:
target_domain.place_client_hold(ignoreEPP=True)
else:
target_domain.revert_client_hold(ignoreEPP=True)
target_domain.save()
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
def parse_limit_reached(self, debug_max_entries_to_parse: bool, total_rows_parsed: int) -> bool:
if debug_max_entries_to_parse > 0 and total_rows_parsed >= debug_max_entries_to_parse:
logger.info(
f"""{TerminalColors.YELLOW}
>> Updated {target_domain.name} state from
'{existing_status}' to '{target_domain.state}'
(no domain invitation entry added)
{TerminalColors.ENDC}""",
----PARSE LIMIT REACHED. HALTING PARSER.----
{TerminalColors.ENDC}
"""
)
return True
return False
@ -89,6 +81,8 @@ class Command(BaseCommand):
updated_domain_entries,
domain_invitations_to_create,
skipped_domain_entries,
domain_information_to_create,
updated_domain_information,
debug_on,
):
"""Prints to terminal a summary of findings from
@ -98,16 +92,22 @@ class Command(BaseCommand):
total_updated_domain_entries = len(updated_domain_entries)
total_domain_invitation_entries = len(domain_invitations_to_create)
total_new_domain_information_entries = len(domain_information_to_create)
total_updated_domain_information_entries = len(updated_domain_information)
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============
Created {total_new_entries} domain entries,
Updated {total_updated_domain_entries} domain entries
Created {total_new_domain_information_entries} domain information entries,
Updated {total_updated_domain_information_entries} domain information entries,
Created {total_domain_invitation_entries} domain invitation entries
(NOTE: no invitations are SENT in this script)
{TerminalColors.ENDC}
"""
""" # noqa
)
if len(skipped_domain_entries) > 0:
logger.info(
@ -149,6 +149,139 @@ class Command(BaseCommand):
""",
)
# ======================================================
# =================== DOMAIN =====================
# ======================================================
def update_or_create_domain(self, transition_domain: TransitionDomain, debug_on: bool):
"""Given a transition domain, either finds & updates an existing
corresponding domain, or creates a new corresponding domain in
the Domain table.
Returns the corresponding Domain object and a boolean
that is TRUE if that Domain was newly created.
"""
# Create some local variables to make data tracing easier
transition_domain_name = transition_domain.domain_name
transition_domain_status = transition_domain.status
transition_domain_creation_date = transition_domain.epp_creation_date
transition_domain_expiration_date = transition_domain.epp_expiration_date
domain_exists = Domain.objects.filter(name=transition_domain_name).exists()
if domain_exists:
try:
# ----------------------- UPDATE DOMAIN -----------------------
# ---- GET THE DOMAIN
target_domain = Domain.objects.get(name=transition_domain_name)
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}
> Found existing entry in Domain table for: {transition_domain_name}, {target_domain.state}
{TerminalColors.ENDC}""", # noqa
)
# ---- UPDATE THE DOMAIN
# update the status
self.update_domain_status(transition_domain, target_domain, debug_on)
# TODO: not all domains need to be updated
# (the information is the same).
# Need to bubble this up to the final report.
# update dates (creation and expiration)
if transition_domain_creation_date is not None:
# TODO: added this because I ran into a situation where
# the created_at date was null (violated a key constraint).
# How do we want to handle this case?
target_domain.created_at = transition_domain_creation_date
if transition_domain_expiration_date is not None:
target_domain.expiration_date = transition_domain_expiration_date
target_domain.save()
return (target_domain, False)
except Domain.MultipleObjectsReturned:
# This exception was thrown once before during testing.
# While the circumstances that led to corrupt data in
# the domain table was a freak accident, and the possibility of it
# happening again is safe-guarded by a key constraint,
# better to keep an eye out for it since it would require
# immediate attention.
logger.warning(
f"""
{TerminalColors.FAIL}
!!! ERROR: duplicate entries already exist in the
Domain table for the following domain:
{transition_domain_name}
RECOMMENDATION:
This means the Domain table is corrupt. Please
check the Domain table data as there should be a key
constraint which prevents duplicate entries.
----------TERMINATING----------"""
)
sys.exit()
except TransitionNotAllowed as err:
logger.warning(
f"""{TerminalColors.FAIL}
Unable to change state for {transition_domain_name}
RECOMMENDATION:
This indicates there might have been changes to the
Domain model which were not accounted for in this
migration script. Please check state change rules
in the Domain model and ensure we are following the
correct state transition pathways.
INTERNAL ERROR MESSAGE:
'TRANSITION NOT ALLOWED' exception
{err}
----------SKIPPING----------"""
)
return (None, False)
else:
# ----------------------- CREATE DOMAIN -----------------------
# no matching entry, make one
target_domain = Domain(
name=str(transition_domain_name),
state=transition_domain_status,
expiration_date=transition_domain_expiration_date,
)
return (target_domain, True)
def update_domain_status(self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool) -> bool:
"""Given a transition domain that matches an existing domain,
updates the existing domain object with that status of
the transition domain.
Returns TRUE if an update was made. FALSE if the states
matched and no update was made"""
transition_domain_status = transition_domain.status
existing_status = target_domain.state
if transition_domain_status != existing_status:
if transition_domain_status == TransitionDomain.StatusChoices.ON_HOLD:
target_domain.place_client_hold(ignoreEPP=True)
else:
target_domain.revert_client_hold(ignoreEPP=True)
target_domain.save()
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}
>> Updated {target_domain.name} state from
'{existing_status}' to '{target_domain.state}'
(no domain invitation entry added)
{TerminalColors.ENDC}""",
)
return True
return False
# ======================================================
# ================ DOMAIN INVITATION ==================
# ======================================================
def try_add_domain_invitation(self, domain_email: str, associated_domain: Domain) -> DomainInvitation | None:
"""If no domain invitation exists for the given domain and
e-mail, create and return a new domain invitation object.
@ -185,6 +318,380 @@ class Command(BaseCommand):
return new_domain_invitation
return None
# ======================================================
# ================ DOMAIN INFORMATION =================
# ======================================================
def update_domain_information(self, current: DomainInformation, target: DomainInformation, debug_on: bool) -> bool:
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.OKCYAN}" f"Updating: {current}" f"{TerminalColors.ENDC}"), # noqa
)
updated = False
fields_to_update = [
"organization_type",
"federal_type",
"federal_agency",
"organization_name",
]
defaults = {field: getattr(target, field) for field in fields_to_update}
if current != target:
current = target
DomainInformation.objects.filter(domain=current.domain).update(**defaults)
updated = True
return updated
def try_add_domain_information(self):
pass
def create_new_domain_info(
self,
transition_domain: TransitionDomain,
domain: Domain,
agency_choices,
fed_choices,
org_choices,
debug_on,
) -> DomainInformation:
org_type_current = transition_domain.organization_type
fed_type = transition_domain.federal_type
fed_agency = transition_domain.federal_agency
org_type = ("", "")
match org_type_current:
case "Federal":
org_type = ("federal", "Federal")
case "Interstate":
org_type = ("interstate", "Interstate")
case "State":
org_type = ("state_or_territory", "State or territory")
case "Tribal":
org_type = ("tribal", "Tribal")
case "County":
org_type = ("county", "County")
case "City":
org_type = ("city", "City")
case "Independent Intrastate":
org_type = ("special_district", "Special district")
valid_org_type = org_type in org_choices
valid_fed_type = fed_type in fed_choices
valid_fed_agency = fed_agency in agency_choices
default_creator, _ = User.objects.get_or_create(username="System")
new_domain_info_data = {
"domain": domain,
"organization_name": transition_domain.organization_name,
"creator": default_creator,
}
if valid_org_type:
new_domain_info_data["organization_type"] = org_type[0]
elif debug_on:
logger.debug(f"No org type found on {domain.name}")
if valid_fed_type and isinstance(fed_type, str):
new_domain_info_data["federal_type"] = fed_type.lower()
elif debug_on:
logger.debug(f"No federal type found on {domain.name}")
if valid_fed_agency:
new_domain_info_data["federal_agency"] = fed_agency
elif debug_on:
logger.debug(f"No federal agency found on {domain.name}")
new_domain_info = DomainInformation(**new_domain_info_data)
# DEBUG:
TerminalHelper.print_conditional(
True,
(
f"{TerminalColors.MAGENTA}"
f"Created Domain Information template for: {new_domain_info}"
f"{TerminalColors.ENDC}"
), # noqa
)
return new_domain_info
def update_or_create_domain_information(
self,
transition_domain: TransitionDomain,
agency_choices,
fed_choices,
org_choices,
debug_on: bool,
):
transition_domain_name = transition_domain.domain_name
# Get associated domain
domain_data = Domain.objects.filter(name=transition_domain.domain_name)
if not domain_data.exists():
logger.warn(
f"{TerminalColors.FAIL}"
f"WARNING: No Domain exists for:"
f"{transition_domain_name}"
f"{TerminalColors.ENDC}\n"
)
return (None, None, False)
domain = domain_data.get()
template_domain_information = self.create_new_domain_info(
transition_domain,
domain,
agency_choices,
fed_choices,
org_choices,
debug_on,
)
target_domain_information = None
domain_information_exists = DomainInformation.objects.filter(domain__name=transition_domain_name).exists()
if domain_information_exists:
try:
# get the existing domain information object
target_domain_information = DomainInformation.objects.get(domain__name=transition_domain_name)
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(
f"{TerminalColors.FAIL}"
f"Found existing entry in Domain Information table for:"
f"{transition_domain_name}"
f"{TerminalColors.ENDC}"
), # noqa
)
# for existing entry, update the status to
# the transition domain status
self.update_domain_information(target_domain_information, template_domain_information, debug_on)
# TODO: not all domains need to be updated
# (the information is the same).
# Need to bubble this up to the final report.
return (target_domain_information, domain, False)
except DomainInformation.MultipleObjectsReturned:
# This should never happen (just like with the Domain Table).
# However, because such an error did occur in the past,
# we will watch for it in this script
logger.warning(
f"""
{TerminalColors.FAIL}
!!! ERROR: duplicate entries already exist in the
Domain Information table for the following domain:
{transition_domain_name}
RECOMMENDATION:
This means the Domain Information table is corrupt. Please
check the Domain Information table data as there should be a key
constraint which prevents duplicate entries.
----------TERMINATING----------"""
)
sys.exit()
else:
# no matching entry, make one
target_domain_information = template_domain_information
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(
f"{TerminalColors.OKCYAN}"
f"Adding domain information for: "
f"{transition_domain_name}"
f"{TerminalColors.ENDC}"
),
)
return (target_domain_information, domain, True)
# C901 'Command.handle' is too complex
def process_domain_information(
self,
valid_agency_choices,
valid_fed_choices,
valid_org_choices,
debug_on,
skipped_domain_information_entries,
domain_information_to_create,
updated_domain_information,
debug_max_entries_to_parse,
total_rows_parsed,
):
for transition_domain in TransitionDomain.objects.all():
(
target_domain_information,
associated_domain,
was_created,
) = self.update_or_create_domain_information(
transition_domain,
valid_agency_choices,
valid_fed_choices,
valid_org_choices,
debug_on,
)
debug_string = ""
if target_domain_information is None:
# ---------------- SKIPPED ----------------
skipped_domain_information_entries.append(target_domain_information)
debug_string = f"skipped domain information: {target_domain_information}"
elif was_created:
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(
f"{TerminalColors.OKCYAN}"
f"Checking duplicates for: {target_domain_information}"
f"{TerminalColors.ENDC}"
), # noqa
)
# ---------------- DUPLICATE ----------------
# The unique key constraint does not allow multiple domain
# information objects to share the same domain
existing_domain_information_in_to_create = next(
(x for x in domain_information_to_create if x.domain.name == target_domain_information.domain.name),
None,
)
# TODO: this is redundant.
# Currently debugging....
# running into unique key constraint error....
existing_domain_info = DomainInformation.objects.filter(
domain__name=target_domain_information.domain.name
).exists()
if existing_domain_information_in_to_create is not None or existing_domain_info:
debug_string = f"""{TerminalColors.YELLOW}
Duplicate Detected: {existing_domain_information_in_to_create}.
Cannot add duplicate Domain Information object
{TerminalColors.ENDC}"""
else:
# ---------------- CREATED ----------------
domain_information_to_create.append(target_domain_information)
debug_string = f"created domain information: {target_domain_information}"
elif not was_created:
# ---------------- UPDATED ----------------
updated_domain_information.append(target_domain_information)
debug_string = f"updated domain information: {target_domain_information}"
else:
debug_string = "domain information already exists and "
f"matches incoming data (NO CHANGES MADE): {target_domain_information}"
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.OKCYAN}{debug_string}{TerminalColors.ENDC}"),
)
# ------------------ Parse limit reached? ------------------
# Check parse limit and exit loop if parse limit has been reached
if self.parse_limit_reached(debug_max_entries_to_parse, total_rows_parsed):
break
return (
skipped_domain_information_entries,
domain_information_to_create,
updated_domain_information,
)
# C901 'Command.handle' is too complex
def process_domain_and_invitations(
self,
debug_on,
skipped_domain_entries,
domains_to_create,
updated_domain_entries,
domain_invitations_to_create,
debug_max_entries_to_parse,
total_rows_parsed,
):
for transition_domain in TransitionDomain.objects.all():
# Create some local variables to make data tracing easier
transition_domain_name = transition_domain.domain_name
transition_domain_status = transition_domain.status
transition_domain_email = transition_domain.username
transition_domain_creation_date = transition_domain.epp_creation_date
transition_domain_expiration_date = transition_domain.epp_expiration_date
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
f"{TerminalColors.OKCYAN}"
"Processing Transition Domain: "
f"{transition_domain_name},"
f" {transition_domain_status},"
f" {transition_domain_email}"
f", {transition_domain_creation_date}, "
f"{transition_domain_expiration_date}"
f"{TerminalColors.ENDC}", # noqa
)
# ======================================================
# ====================== DOMAIN =======================
target_domain, was_created = self.update_or_create_domain(transition_domain, debug_on)
debug_string = ""
if target_domain is None:
# ---------------- SKIPPED ----------------
skipped_domain_entries.append(transition_domain_name)
debug_string = f"skipped domain: {target_domain}"
elif was_created:
# ---------------- DUPLICATE ----------------
# The unique key constraint does not allow duplicate domain entries
# even if there are different users.
existing_domain_in_to_create = next(
(x for x in domains_to_create if x.name == transition_domain_name),
None,
)
if existing_domain_in_to_create is not None:
debug_string = f"""{TerminalColors.YELLOW}
Duplicate Detected: {transition_domain_name}.
Cannot add duplicate entry for another username.
Violates Unique Key constraint.
{TerminalColors.ENDC}"""
else:
# ---------------- CREATED ----------------
domains_to_create.append(target_domain)
debug_string = f"created domain: {target_domain}"
elif not was_created:
# ---------------- UPDATED ----------------
updated_domain_entries.append(transition_domain.domain_name)
debug_string = f"updated domain: {target_domain}"
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.OKCYAN} {debug_string} {TerminalColors.ENDC}"),
)
# ======================================================
# ================ DOMAIN INVITATIONS ==================
new_domain_invitation = self.try_add_domain_invitation(transition_domain_email, target_domain)
if new_domain_invitation is None:
logger.info(
f"{TerminalColors.YELLOW} ! No new e-mail detected !" # noqa
f"(SKIPPED ADDING DOMAIN INVITATION){TerminalColors.ENDC}"
)
else:
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
f"{TerminalColors.OKCYAN} Adding domain invitation: {new_domain_invitation} {TerminalColors.ENDC}", # noqa
)
domain_invitations_to_create.append(new_domain_invitation)
# ------------------ Parse limit reached? ------------------
# Check parse limit and exit loop if parse limit has been reached
if self.parse_limit_reached(debug_max_entries_to_parse, total_rows_parsed):
break
return (
skipped_domain_entries,
domains_to_create,
updated_domain_entries,
domain_invitations_to_create,
)
# ======================================================
# ===================== HANDLE ========================
# ======================================================
def handle(
self,
**options,
@ -201,168 +708,110 @@ class Command(BaseCommand):
# domains to ADD
domains_to_create = []
domain_invitations_to_create = []
domain_information_to_create = []
# domains we UPDATED
updated_domain_entries = []
updated_domain_information = []
# domains we SKIPPED
skipped_domain_entries = []
skipped_domain_information_entries = []
# domain invitations to ADD
domain_invitations_to_create = []
# if we are limiting our parse (for testing purposes, keep
# track of total rows parsed)
total_rows_parsed = 0
logger.info(
f"""{TerminalColors.OKGREEN}
f"""{TerminalColors.OKCYAN}
==========================
Beginning Data Transfer
==========================
{TerminalColors.ENDC}"""
)
for transition_domain in TransitionDomain.objects.all():
transition_domain_name = transition_domain.domain_name
transition_domain_status = transition_domain.status
transition_domain_email = transition_domain.username
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
logger.info(
f"""{TerminalColors.OKCYAN}
Processing Transition Domain: {transition_domain_name}, {transition_domain_status}, {transition_domain_email}
{TerminalColors.ENDC}""", # noqa
========= Adding Domains and Domain Invitations =========
{TerminalColors.ENDC}"""
)
new_domain_invitation = None
# Check for existing domain entry
domain_exists = Domain.objects.filter(name=transition_domain_name).exists()
if domain_exists:
try:
# get the existing domain
domain_to_update = Domain.objects.get(name=transition_domain_name)
# DEBUG:
TerminalHelper.print_conditional(
(
skipped_domain_entries,
domains_to_create,
updated_domain_entries,
domain_invitations_to_create,
) = self.process_domain_and_invitations(
debug_on,
f"""{TerminalColors.YELLOW}
> Found existing entry in Domain table for: {transition_domain_name}, {domain_to_update.state}
{TerminalColors.ENDC}""", # noqa
skipped_domain_entries,
domains_to_create,
updated_domain_entries,
domain_invitations_to_create,
debug_max_entries_to_parse,
total_rows_parsed,
)
# for existing entry, update the status to
# the transition domain status
update_made = self.update_domain_status(transition_domain, domain_to_update, debug_on)
if update_made:
# keep track of updated domains for data analysis purposes
updated_domain_entries.append(transition_domain.domain_name)
# check if we need to add a domain invitation
# (eg. for a new user)
new_domain_invitation = self.try_add_domain_invitation(transition_domain_email, domain_to_update)
except Domain.MultipleObjectsReturned:
# This exception was thrown once before during testing.
# While the circumstances that led to corrupt data in
# the domain table was a freak accident, and the possibility of it
# happening again is safe-guarded by a key constraint,
# better to keep an eye out for it since it would require
# immediate attention.
logger.warning(
f"""
{TerminalColors.FAIL}
!!! ERROR: duplicate entries already exist in the
Domain table for the following domain:
{transition_domain_name}
RECOMMENDATION:
This means the Domain table is corrupt. Please
check the Domain table data as there should be a key
constraint which prevents duplicate entries.
----------TERMINATING----------"""
)
sys.exit()
except TransitionNotAllowed as err:
skipped_domain_entries.append(transition_domain_name)
logger.warning(
f"""{TerminalColors.FAIL}
Unable to change state for {transition_domain_name}
RECOMMENDATION:
This indicates there might have been changes to the
Domain model which were not accounted for in this
migration script. Please check state change rules
in the Domain model and ensure we are following the
correct state transition pathways.
INTERNAL ERROR MESSAGE:
'TRANSITION NOT ALLOWED' exception
{err}
----------SKIPPING----------"""
)
else:
# no entry was found in the domain table
# for the given domain. Create a new entry.
# first see if we are already adding an entry for this domain.
# The unique key constraint does not allow duplicate domain entries
# even if there are different users.
existing_domain_in_to_create = next(
(x for x in domains_to_create if x.name == transition_domain_name),
None,
)
if existing_domain_in_to_create is not None:
TerminalHelper.print_conditional(
debug_on,
f"""{TerminalColors.YELLOW}
Duplicate Detected: {transition_domain_name}.
Cannot add duplicate entry for another username.
Violates Unique Key constraint.
Checking for unique user e-mail for Domain Invitations...
{TerminalColors.ENDC}""",
)
new_domain_invitation = self.try_add_domain_invitation(
transition_domain_email, existing_domain_in_to_create
)
else:
# no matching entry, make one
new_domain = Domain(name=transition_domain_name, state=transition_domain_status)
domains_to_create.append(new_domain)
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
f"{TerminalColors.OKCYAN} Adding domain: {new_domain} {TerminalColors.ENDC}", # noqa
)
new_domain_invitation = self.try_add_domain_invitation(transition_domain_email, new_domain)
if new_domain_invitation is None:
logger.info(
f"{TerminalColors.YELLOW} ! No new e-mail detected !" # noqa
f"(SKIPPED ADDING DOMAIN INVITATION){TerminalColors.ENDC}"
)
else:
# DEBUG:
TerminalHelper.print_conditional(
debug_on,
f"{TerminalColors.OKCYAN} Adding domain invitation: {new_domain_invitation} {TerminalColors.ENDC}", # noqa
)
domain_invitations_to_create.append(new_domain_invitation)
# Check parse limit and exit loop if parse limit has been reached
if debug_max_entries_to_parse > 0 and total_rows_parsed >= debug_max_entries_to_parse:
logger.info(
f"""{TerminalColors.YELLOW}
----PARSE LIMIT REACHED. HALTING PARSER.----
{TerminalColors.ENDC}
"""
)
break
# First, save all Domain objects to the database
Domain.objects.bulk_create(domains_to_create)
DomainInvitation.objects.bulk_create(domain_invitations_to_create)
# DomainInvitation.objects.bulk_create(domain_invitations_to_create)
# TODO: this is to resolve an error where bulk_create
# doesn't save to database in a way that invitation objects can
# utilize.
# Then, create DomainInvitation objects
for invitation in domain_invitations_to_create:
if debug_on:
logger.info(f"Pairing invite to its domain...{invitation}")
existing_domain = Domain.objects.filter(name=invitation.domain.name)
# Make sure the related Domain object is saved
if existing_domain.exists():
invitation.domain = existing_domain.get()
else:
# Raise an err for now
raise Exception(f"Domain {existing_domain} wants to be added" "but doesn't exist in the DB")
invitation.save()
valid_org_choices = [(name, value) for name, value in DomainApplication.OrganizationChoices.choices]
valid_fed_choices = [value for name, value in DomainApplication.BranchChoices.choices]
valid_agency_choices = DomainApplication.AGENCIES
# ======================================================
# ================= DOMAIN INFORMATION =================
logger.info(
f"""{TerminalColors.OKCYAN}
========= Adding Domains Information Objects =========
{TerminalColors.ENDC}"""
)
(
skipped_domain_information_entries,
domain_information_to_create,
updated_domain_information,
) = self.process_domain_information(
valid_agency_choices,
valid_fed_choices,
valid_org_choices,
debug_on,
skipped_domain_information_entries,
domain_information_to_create,
updated_domain_information,
debug_max_entries_to_parse,
total_rows_parsed,
)
TerminalHelper.print_conditional(
debug_on,
(f"{TerminalColors.YELLOW}" f"Trying to add: {domain_information_to_create}" f"{TerminalColors.ENDC}"),
)
DomainInformation.objects.bulk_create(domain_information_to_create)
self.print_summary_of_findings(
domains_to_create,
updated_domain_entries,
domain_invitations_to_create,
skipped_domain_entries,
domain_information_to_create,
updated_domain_information,
debug_on,
)

View file

@ -0,0 +1,100 @@
"""
A list of helper classes to facilitate handling data from verisign data exports.
Regarding our dataclasses:
Not intended to be used as models but rather as an alternative to storing as a dictionary.
By keeping it as a dataclass instead of a dictionary, we can maintain data consistency.
""" # noqa
from dataclasses import dataclass, field
from datetime import date
from enum import Enum
from typing import List, Optional
@dataclass
class AgencyAdhoc:
"""Defines the structure given in the AGENCY_ADHOC file"""
agencyid: Optional[int] = field(default=None, repr=True)
agencyname: Optional[str] = field(default=None, repr=True)
active: Optional[str] = field(default=None, repr=True)
isfederal: Optional[str] = field(default=None, repr=True)
@dataclass
class DomainAdditionalData:
"""Defines the structure given in the DOMAIN_ADDITIONAL file"""
domainname: Optional[str] = field(default=None, repr=True)
domaintypeid: Optional[int] = field(default=None, repr=True)
authorityid: Optional[int] = field(default=None, repr=True)
orgid: Optional[int] = field(default=None, repr=True)
securitycontactemail: Optional[str] = field(default=None, repr=True)
dnsseckeymonitor: Optional[str] = field(default=None, repr=True)
domainpurpose: Optional[str] = field(default=None, repr=True)
@dataclass
class DomainTypeAdhoc:
"""Defines the structure given in the DOMAIN_ADHOC file"""
domaintypeid: Optional[int] = field(default=None, repr=True)
domaintype: Optional[str] = field(default=None, repr=True)
code: Optional[str] = field(default=None, repr=True)
active: Optional[str] = field(default=None, repr=True)
@dataclass
class OrganizationAdhoc:
"""Defines the structure given in the ORGANIZATION_ADHOC file"""
orgid: Optional[int] = field(default=None, repr=True)
orgname: Optional[str] = field(default=None, repr=True)
orgstreet: Optional[str] = field(default=None, repr=True)
orgcity: Optional[str] = field(default=None, repr=True)
orgstate: Optional[str] = field(default=None, repr=True)
orgzip: Optional[str] = field(default=None, repr=True)
orgcountrycode: Optional[str] = field(default=None, repr=True)
@dataclass
class AuthorityAdhoc:
"""Defines the structure given in the AUTHORITY_ADHOC file"""
authorityid: Optional[int] = field(default=None, repr=True)
firstname: Optional[str] = field(default=None, repr=True)
middlename: Optional[str] = field(default=None, repr=True)
lastname: Optional[str] = field(default=None, repr=True)
email: Optional[str] = field(default=None, repr=True)
phonenumber: Optional[str] = field(default=None, repr=True)
agencyid: Optional[int] = field(default=None, repr=True)
addlinfo: Optional[List[str]] = field(default=None, repr=True)
@dataclass
class DomainEscrow:
"""Defines the structure given in the DOMAIN_ESCROW file"""
domainname: Optional[str] = field(default=None, repr=True)
creationdate: Optional[date] = field(default=None, repr=True)
expirationdate: Optional[date] = field(default=None, repr=True)
class EnumFilenames(Enum):
"""Returns a tuple mapping for (filetype, default_file_name).
For instance, AGENCY_ADHOC = ("agency_adhoc", "agency.adhoc.dotgov.txt")
"""
# We are sourcing data from many different locations, so its better to track this
# as an Enum rather than multiple spread out variables.
# We store the "type" as [0], and we store the "default_filepath" as [1].
AGENCY_ADHOC = ("agency_adhoc", "agency.adhoc.dotgov.txt")
DOMAIN_ADDITIONAL = (
"domain_additional",
"domainadditionaldatalink.adhoc.dotgov.txt",
)
DOMAIN_ESCROW = ("domain_escrow", "escrow_domains.daily.dotgov.GOV.txt")
DOMAIN_ADHOC = ("domain_adhoc", "domaintypes.adhoc.dotgov.txt")
ORGANIZATION_ADHOC = ("organization_adhoc", "organization.adhoc.dotgov.txt")
AUTHORITY_ADHOC = ("authority_adhoc", "authority.adhoc.dotgov.txt")

View file

@ -0,0 +1,983 @@
""""""
import csv
from dataclasses import dataclass
from datetime import datetime
import io
import glob
import re
import logging
import os
import sys
from typing import Dict
from registrar.models.transition_domain import TransitionDomain
from .epp_data_containers import (
AgencyAdhoc,
DomainAdditionalData,
DomainEscrow,
DomainTypeAdhoc,
OrganizationAdhoc,
AuthorityAdhoc,
EnumFilenames,
)
from .transition_domain_arguments import TransitionDomainArguments
from .terminal_helper import TerminalColors, TerminalHelper, LogCode
logger = logging.getLogger(__name__)
class FileTransitionLog:
"""Container for storing event logs. Used to lessen
the complexity of storing multiple logs across multiple
variables.
self.logs: dict -> {
EnumFilenames.DOMAIN_ADHOC: List[LogItem],
EnumFilenames.AGENCY_ADHOC: List[LogItem],
EnumFilenames.ORGANIZATION_ADHOC: List[LogItem],
EnumFilenames.DOMAIN_ADDITIONAL: List[LogItem],
}
"""
def __init__(self):
self.logs = {}
class LogItem:
"""Used for storing data about logger information."""
def __init__(self, file_type, code, message, domain_name):
self.file_type = file_type
self.code = code
self.message = message
self.domain_name = domain_name
def add_log(self, file_type, code, message, domain_name):
"""Adds a log item to self.logs
file_type -> Which enum to associate with,
ex. EnumFilenames.DOMAIN_ADHOC
code -> Log severity or other metadata, ex. LogCode.ERROR
message -> Message to display
domain_name -> Name of the domain, i.e. "igorville.gov"
"""
log = self.LogItem(file_type, code, message, domain_name)
dict_name = (file_type, domain_name)
self._add_to_log_list(dict_name, log)
def create_log_item(
self,
file_type,
code,
message,
domain_name=None,
add_to_list=True,
minimal_logging=True,
):
"""Creates and returns an LogItem object.
add_to_list: bool -> If enabled, add it to the logs array.
"""
log = self.LogItem(file_type, code, message, domain_name)
if not add_to_list:
return log
dict_name = (file_type, domain_name)
self._add_to_log_list(dict_name, log)
restrict_type = []
if minimal_logging:
restrict_type = [LogCode.INFO, LogCode.WARNING]
TerminalHelper.print_conditional(
log.code not in restrict_type,
log.message,
log.code,
)
return log
def _add_to_log_list(self, log_name, log):
if log_name not in self.logs:
self.logs[log_name] = [log]
else:
self.logs[log_name].append(log)
def display_all_logs(self):
"""Logs every LogItem contained in this object"""
for parent_log in self.logs:
for child_log in parent_log:
TerminalHelper.print_conditional(True, child_log.message, child_log.severity)
def display_logs_by_domain_name(self, domain_name, restrict_type=LogCode.DEFAULT):
"""Displays all logs of a given domain_name.
Will log with the correct severity depending on code.
domain_name: str -> The domain to target, such as "igorville.gov"
restrict_type: LogCode -> Determines if only errors of a certain
type should be displayed, such as LogCode.ERROR.
"""
for file_type in EnumFilenames:
domain_logs = self.get_logs(file_type, domain_name)
if domain_logs is None:
return None
for log in domain_logs:
TerminalHelper.print_conditional(restrict_type != log.code, log.message, log.code)
def get_logs(self, file_type, domain_name):
"""Grabs the logs associated with
a particular file_type and domain_name"""
log_name = (file_type, domain_name)
return self.logs.get(log_name)
class LoadExtraTransitionDomain:
"""Grabs additional data for TransitionDomains."""
def __init__(self, options: TransitionDomainArguments):
# Globally stores event logs and organizes them
self.parse_logs = FileTransitionLog()
self.debug = options.debug
# Reads and parses migration files
self.parsed_data_container = ExtraTransitionDomain(options)
self.parsed_data_container.parse_all_files(options.infer_filenames)
def update_transition_domain_models(self):
"""Updates TransitionDomain objects based off the file content
given in self.parsed_data_container"""
all_transition_domains = TransitionDomain.objects.all()
if not all_transition_domains.exists():
raise ValueError("No TransitionDomain objects exist.")
updated_transition_domains = []
failed_transition_domains = []
for transition_domain in all_transition_domains:
domain_name = transition_domain.domain_name
updated_transition_domain = transition_domain
try:
# STEP 1: Parse organization data
updated_transition_domain = self.parse_org_data(domain_name, transition_domain)
# STEP 2: Parse domain type data
updated_transition_domain = self.parse_domain_type_data(domain_name, transition_domain)
# STEP 3: Parse agency data
updated_transition_domain = self.parse_agency_data(domain_name, transition_domain)
# STEP 4: Parse creation and expiration data
updated_transition_domain = self.parse_creation_expiration_data(domain_name, transition_domain)
# Check if the instance has changed before saving
updated_transition_domain.save()
updated_transition_domains.append(updated_transition_domain)
logger.info(f"{TerminalColors.OKCYAN}" f"Successfully updated {domain_name}" f"{TerminalColors.ENDC}")
# If we run into an exception on this domain,
# Just skip over it and log that it happened.
# Q: Should we just throw an exception?
except Exception as err:
logger.debug(err)
logger.error(
f"{TerminalColors.FAIL}"
f"Exception encountered on {domain_name}. Could not update."
f"{TerminalColors.ENDC}"
)
failed_transition_domains.append(domain_name)
failed_count = len(failed_transition_domains)
if failed_count == 0:
if self.debug:
for domain in updated_transition_domains:
logger.debug(domain.display_transition_domain())
logger.info(
f"""{TerminalColors.OKGREEN}
============= FINISHED ===============
Updated {len(updated_transition_domains)} transition domain entries
{TerminalColors.ENDC}
"""
)
else:
# TODO - update
TerminalHelper.print_conditional(
self.debug,
f"{TerminalHelper.array_as_string(updated_transition_domains)}",
)
logger.error(
f"""{TerminalColors.FAIL}
============= FINISHED WITH ERRORS ===============
Updated {len(updated_transition_domains)} transition domain entries,
Failed to update {failed_count} transition domain entries:
{[domain for domain in failed_transition_domains]}
{TerminalColors.ENDC}
"""
)
# DATA INTEGRITY CHECK
# Make sure every Transition Domain got updated
total_transition_domains = len(updated_transition_domains)
total_updates_made = TransitionDomain.objects.all().count()
if total_transition_domains != total_updates_made:
# noqa here for line length
logger.error(
f"""{TerminalColors.FAIL}
WARNING: something went wrong processing domain information data.
Total Transition Domains expecting a data update: {total_transition_domains}
Total updates made: {total_updates_made}
^ These totals should match, but they don't. This
error should never occur, but could indicate
corrupt data. Please check logs to diagnose.
----- TERMINATING ----
""" # noqa
)
sys.exit()
def parse_creation_expiration_data(self, domain_name, transition_domain):
"""Grabs expiration_date from the parsed files and associates it
with a transition_domain object, then returns that object."""
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
info = self.get_domain_escrow_info(domain_name)
if info is None:
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ESCROW,
LogCode.ERROR,
"Could not add epp_creation_date and epp_expiration_date " f"on {domain_name}, no data exists.",
domain_name,
not self.debug,
)
return transition_domain
creation_exists = transition_domain.epp_creation_date is not None
expiration_exists = transition_domain.epp_expiration_date is not None
transition_domain.epp_creation_date = info.creationdate
transition_domain.epp_expiration_date = info.expirationdate
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
EnumFilenames.DOMAIN_ESCROW,
"epp_creation_date",
transition_domain.epp_creation_date,
domain_name,
creation_exists,
)
self._add_or_change_message(
EnumFilenames.DOMAIN_ESCROW,
"epp_expiration_date",
transition_domain.epp_expiration_date,
domain_name,
expiration_exists,
)
return transition_domain
def parse_agency_data(self, domain_name, transition_domain) -> TransitionDomain:
"""Grabs federal_agency from the parsed files and associates it
with a transition_domain object, then returns that object."""
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
info = self.get_agency_info(domain_name)
if info is None:
self.parse_logs.create_log_item(
EnumFilenames.AGENCY_ADHOC,
LogCode.ERROR,
f"Could not add federal_agency on {domain_name}, no data exists.",
domain_name,
not self.debug,
)
return transition_domain
agency_exists = transition_domain.federal_agency is not None and transition_domain.federal_agency.strip() != ""
if not isinstance(info.active, str) or not info.active.lower() == "y":
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR,
f"Could not add inactive agency {info.agencyname} on {domain_name}",
domain_name,
not self.debug,
)
return transition_domain
if not isinstance(info.isfederal, str) or not info.isfederal.lower() == "y":
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ADHOC,
LogCode.INFO,
f"Adding non-federal agency {info.agencyname} on {domain_name}",
domain_name,
not self.debug,
)
transition_domain.federal_agency = info.agencyname
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
EnumFilenames.AGENCY_ADHOC,
"federal_agency",
transition_domain.federal_agency,
domain_name,
agency_exists,
)
return transition_domain
def parse_domain_type_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain:
"""Grabs organization_type and federal_type from the parsed files
and associates it with a transition_domain object, then returns that object."""
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
info = self.get_domain_type_info(domain_name)
if info is None:
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR,
f"Could not add domain_type on {domain_name}, no data exists.",
domain_name,
not self.debug,
)
return transition_domain
# This data is stored as follows: FEDERAL - Judicial
# For all other records, it is stored as so: Interstate
# We can infer if it is federal or not based on this fact.
domain_type = []
if isinstance(info.domaintype, str):
domain_type = info.domaintype.split("-")
domain_type_length = len(domain_type)
if domain_type_length < 1 or domain_type_length > 2:
raise ValueError("Found invalid data on DOMAIN_ADHOC")
# Then, just grab the organization type.
new_organization_type = domain_type[0].strip()
# Check if this domain_type is active or not.
# If not, we don't want to add this.
if not isinstance(info.active, str) or not info.active.lower() == "y":
self.parse_logs.create_log_item(
EnumFilenames.DOMAIN_ADHOC,
LogCode.ERROR,
f"Could not add inactive domain_type {domain_type[0]} on {domain_name}",
domain_name,
not self.debug,
)
return transition_domain
# Are we updating data that already exists,
# or are we adding new data in its place?
organization_type_exists = (
transition_domain.organization_type is not None and transition_domain.organization_type.strip() != ""
)
federal_type_exists = (
transition_domain.federal_type is not None and transition_domain.federal_type.strip() != ""
)
# If we get two records, then we know it is federal.
# needs to be lowercase for federal type
is_federal = domain_type_length == 2
if is_federal:
new_federal_type = domain_type[1].strip()
transition_domain.organization_type = new_organization_type
transition_domain.federal_type = new_federal_type
else:
transition_domain.organization_type = new_organization_type
transition_domain.federal_type = None
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
EnumFilenames.DOMAIN_ADHOC,
"organization_type",
transition_domain.organization_type,
domain_name,
organization_type_exists,
)
self._add_or_change_message(
EnumFilenames.DOMAIN_ADHOC,
"federal_type",
transition_domain.federal_type,
domain_name,
federal_type_exists,
)
return transition_domain
def parse_org_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain:
"""Grabs organization_name from the parsed files and associates it
with a transition_domain object, then returns that object."""
if not isinstance(transition_domain, TransitionDomain):
raise ValueError("Not a valid object, must be TransitionDomain")
org_info = self.get_org_info(domain_name)
if org_info is None:
self.parse_logs.create_log_item(
EnumFilenames.ORGANIZATION_ADHOC,
LogCode.ERROR,
f"Could not add organization_name on {domain_name}, no data exists.",
domain_name,
not self.debug,
)
return transition_domain
desired_property_exists = (
transition_domain.organization_name is not None and transition_domain.organization_name.strip() != ""
)
transition_domain.organization_name = org_info.orgname
# Logs if we either added to this property,
# or modified it.
self._add_or_change_message(
EnumFilenames.ORGANIZATION_ADHOC,
"organization_name",
transition_domain.organization_name,
domain_name,
desired_property_exists,
)
return transition_domain
def _add_or_change_message(self, file_type, var_name, changed_value, domain_name, is_update=False):
"""Creates a log instance when a property
is successfully changed on a given TransitionDomain."""
if not is_update:
self.parse_logs.create_log_item(
file_type,
LogCode.INFO,
f"Added {var_name} as '{changed_value}' on {domain_name}",
domain_name,
not self.debug,
)
else:
self.parse_logs.create_log_item(
file_type,
LogCode.WARNING,
f"Updated existing {var_name} to '{changed_value}' on {domain_name}",
domain_name,
not self.debug,
)
# Property getters, i.e. orgid or domaintypeid
def get_org_info(self, domain_name) -> OrganizationAdhoc:
"""Maps an id given in get_domain_data to a organization_adhoc
record which has its corresponding definition"""
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
org_id = domain_info.orgid
return self.get_organization_adhoc(org_id)
def get_domain_type_info(self, domain_name) -> DomainTypeAdhoc:
"""Maps an id given in get_domain_data to a domain_type_adhoc
record which has its corresponding definition"""
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
type_id = domain_info.domaintypeid
return self.get_domain_adhoc(type_id)
def get_agency_info(self, domain_name) -> AgencyAdhoc:
"""Maps an id given in get_domain_data to a agency_adhoc
record which has its corresponding definition"""
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
# The agency record is within the authority adhoc
authority_id = domain_info.authorityid
authority = self.get_authority_adhoc(authority_id)
type_id = None
if authority is not None:
type_id = authority.agencyid
return self.get_agency_adhoc(type_id)
def get_authority_info(self, domain_name):
"""Maps an id given in get_domain_data to a authority_adhoc
record which has its corresponding definition"""
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
type_id = domain_info.authorityid
return self.get_authority_adhoc(type_id)
def get_domain_escrow_info(self, domain_name):
domain_info = self.get_domain_data(domain_name)
if domain_info is None:
return None
type_id = domain_info.domainname
return self.get_domain_escrow(type_id)
# Object getters, i.e. DomainAdditionalData or OrganizationAdhoc
def get_domain_data(self, desired_id) -> DomainAdditionalData:
"""Grabs a corresponding row within the DOMAIN_ADDITIONAL file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.DOMAIN_ADDITIONAL, desired_id)
def get_organization_adhoc(self, desired_id) -> OrganizationAdhoc:
"""Grabs a corresponding row within the ORGANIZATION_ADHOC file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.ORGANIZATION_ADHOC, desired_id)
def get_domain_adhoc(self, desired_id) -> DomainTypeAdhoc:
"""Grabs a corresponding row within the DOMAIN_ADHOC file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.DOMAIN_ADHOC, desired_id)
def get_agency_adhoc(self, desired_id) -> AgencyAdhoc:
"""Grabs a corresponding row within the AGENCY_ADHOC file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.AGENCY_ADHOC, desired_id)
def get_authority_adhoc(self, desired_id) -> AuthorityAdhoc:
"""Grabs a corresponding row within the AUTHORITY_ADHOC file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.AUTHORITY_ADHOC, desired_id)
def get_domain_escrow(self, desired_id) -> DomainEscrow:
"""Grabs a corresponding row within the DOMAIN_ESCROW file,
based off a desired_id"""
return self.get_object_by_id(EnumFilenames.DOMAIN_ESCROW, desired_id)
# TODO - renamed / needs a return section
def get_object_by_id(self, file_type: EnumFilenames, desired_id):
"""Returns a field in a dictionary based off the type and id.
vars:
file_type: (constant) EnumFilenames -> Which data file to target.
An example would be `EnumFilenames.DOMAIN_ADHOC`.
desired_id: str -> Which id you want to search on.
An example would be `"12"` or `"igorville.gov"`
Explanation:
Each data file has an associated type (file_type) for tracking purposes.
Each file_type is a dictionary which
contains a dictionary of row[id_field]: object.
In practice, this would look like:
EnumFilenames.AUTHORITY_ADHOC: {
"1": AuthorityAdhoc(...),
"2": AuthorityAdhoc(...),
...
}
desired_id will then specify which id to grab. If we wanted "1",
then this function will return the value of id "1".
So, `AuthorityAdhoc(...)`
"""
# Grabs a dict associated with the file_type.
# For example, EnumFilenames.DOMAIN_ADDITIONAL.
desired_type = self.parsed_data_container.file_data.get(file_type)
if desired_type is None:
self.parse_logs.create_log_item(
file_type,
LogCode.ERROR,
f"Type {file_type} does not exist",
)
return None
# Grab the value given an Id within that file_type dict.
# For example, "igorville.gov".
obj = desired_type.data.get(desired_id)
if obj is None:
self.parse_logs.create_log_item(
file_type,
LogCode.ERROR,
f"Id {desired_id} does not exist for {file_type.value[0]}",
)
return obj
# TODO - change name
@dataclass
class FileDataHolder:
"""Helper class that holds data about a requested file.
filename: str -> The desired filename to target. If no filename is given,
it is assumed that you are passing in a filename pattern and it will look
for a filename that matches the given postfix you pass in.
regex: re.Pattern -> Defines what regex you want to use when inferring
filenames. If none, no matching occurs.
data_type: type -> Metadata about the desired type for data.
id_field: str -> Defines which field should act as the id in data.
This is necessary as we store lists of "data_type" in ExtraTransitionDomain as follows:
{
id_field: data_type(...),
id_field: data_type(...),
...
}
""" # noqa
def __init__(
self,
filename: str,
regex: re.Pattern,
data_type: type,
id_field: str,
):
# Metadata #
# = Filename inference metadata =#
self.regex = regex
self.could_infer = False
# = "data" object metadata =#
# == Where the data is sourced from ==#
self.filename = filename
# == What type the data is ==#
self.data_type = data_type
# == What the id should be in the holding dict ==#
# TODO - rename to id_field_name
self.id_field = id_field
# Object data #
self.data: Dict[str, type] = {}
# This is used ONLY for development purposes. This behaviour
# is controlled by the --infer_filename flag which is defaulted
# to false. The purpose of this check is to speed up development,
# but it cannot be used by the enduser
def try_infer_filename(self, current_file_name, default_file_name):
"""Tries to match a given filename to a regex,
then uses that match to generate the filename."""
# returns (filename, inferred_successfully)
return self._infer_filename(self.regex, current_file_name, default_file_name)
def _infer_filename(self, regex: re.Pattern, matched_file_name, default_file_name):
if not isinstance(regex, re.Pattern):
return (self.filename, False)
match = regex.match(matched_file_name)
if not match:
return (self.filename, False)
total_groups = len(match.groups())
# If no matches exist or if we have too many
# matches, then we shouldn't infer
if total_groups == 0 or total_groups > 2:
return (self.filename, False)
# If only one match is returned,
# it means that our default matches our request
if total_groups == 1:
return (self.filename, True)
# Otherwise, if two are returned, then
# its likely the pattern we want
date = match.group(1)
filename_without_date = match.group(2)
# After stripping out the date,
# do the two filenames match?
can_infer = filename_without_date == default_file_name
if not can_infer:
return (self.filename, False)
# If they do, recreate the filename and return it
full_filename = date + "." + filename_without_date
return (full_filename, can_infer)
class ExtraTransitionDomain:
"""Helper class to aid in storing TransitionDomain data spread across
multiple files."""
strip_date_regex = re.compile(r"(?:.*\/)?(\d+)\.(.+)")
def __init__(self, options: TransitionDomainArguments):
# Add a slash if the last character isn't one
if options.directory and options.directory[-1] != "/":
options.directory += "/"
self.directory = options.directory
self.seperator = options.sep
self.all_files = glob.glob(f"{self.directory}*")
# Create a set with filenames as keys for quick lookup
self.all_files_set = {os.path.basename(file) for file in self.all_files}
# Used for a container of values at each filename.
# Instead of tracking each in a seperate variable, we can declare
# metadata about each file and associate it with an enum.
# That way if we want the data located at the agency_adhoc file,
# we can just call EnumFilenames.AGENCY_ADHOC.
pattern_map_params = [
(
EnumFilenames.AGENCY_ADHOC,
options.agency_adhoc_filename,
AgencyAdhoc,
"agencyid",
),
(
EnumFilenames.DOMAIN_ADDITIONAL,
options.domain_additional_filename,
DomainAdditionalData,
"domainname",
),
(
EnumFilenames.DOMAIN_ESCROW,
options.domain_escrow_filename,
DomainEscrow,
"domainname",
),
(
EnumFilenames.DOMAIN_ADHOC,
options.domain_adhoc_filename,
DomainTypeAdhoc,
"domaintypeid",
),
(
EnumFilenames.ORGANIZATION_ADHOC,
options.organization_adhoc_filename,
OrganizationAdhoc,
"orgid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
(
EnumFilenames.AUTHORITY_ADHOC,
options.authority_adhoc_filename,
AuthorityAdhoc,
"authorityid",
),
]
self.file_data = self.populate_file_data(pattern_map_params)
# TODO - revise comment
def populate_file_data(self, pattern_map_params):
"""Populates the self.file_data field given a set
of tuple params.
pattern_map_params must adhere to this format:
[
(file_type, filename, data_type, id_field),
]
vars:
file_type (EnumFilenames) -> The name of the dictionary.
Defined as a value on EnumFilenames, such as
EnumFilenames.AGENCY_ADHOC
filename (str) -> The filepath of the given
"file_type", such as migrationdata/test123.txt
data_type (type) -> The type of data to be read
at the location of the filename. For instance,
each row of test123.txt may return data of type AgencyAdhoc
id_field (str) -> Given the "data_type" of each row,
this specifies what the "id" of that row is.
For example, "agencyid". This is used so we can
store each record in a dictionary rather than
a list of values.
return example:
EnumFilenames.AUTHORITY_ADHOC: FileDataHolder(
authority_adhoc_filename,
self.strip_date_regex,
AuthorityAdhoc,
"authorityid",
),
"""
file_data = {}
for file_type, filename, data_type, id_field in pattern_map_params:
file_data[file_type] = FileDataHolder(
filename,
self.strip_date_regex,
data_type,
id_field,
)
return file_data
def parse_all_files(self, infer_filenames=True):
"""Clears all preexisting data then parses each related CSV file.
infer_filenames: bool -> Determines if we should try to
infer the filename if a default is passed in
"""
for name, value in self.file_data.items():
is_domain_escrow = name == EnumFilenames.DOMAIN_ESCROW
filename = f"{value.filename}"
if filename in self.all_files_set:
_file = f"{self.directory}{value.filename}"
value.data = self.parse_csv_file(
_file,
self.seperator,
value.data_type,
value.id_field,
is_domain_escrow,
)
else:
if not infer_filenames:
raise FileNotFoundError(
f"{TerminalColors.FAIL}" f"Could not find file {filename} for {name}" f"{TerminalColors.ENDC}"
)
# Infer filename logic #
# This mode is used for
# internal development use and testing only.
# Rather than havingto manually define the
# filename each time, we can infer what the filename
# actually is.
# Not intended for use outside of that, as it is better to assume
# the end-user wants to be specific.
logger.warning(f"Attempting to infer filename: {filename}")
for filename in self.all_files:
default_name = name.value[1]
match = value.try_infer_filename(filename, default_name)
filename = match[0]
can_infer = match[1]
if can_infer:
break
if filename in self.all_files_set:
logger.info(f"Infer success. Found file {filename}")
_file = f"{self.directory}{filename}"
value.data = self.parse_csv_file(
_file,
self.seperator,
value.data_type,
value.id_field,
is_domain_escrow,
)
continue
raise FileNotFoundError(
f"{TerminalColors.FAIL}" f"Could not find file {filename} for {name}" f"{TerminalColors.ENDC}"
)
def clear_file_data(self):
for item in self.file_data.values():
file_type: FileDataHolder = item
file_type.data = {}
def parse_csv_file(self, file, seperator, dataclass_type, id_field, is_domain_escrow=False):
# Domain escrow is an edge case
if is_domain_escrow:
item_to_return = self._read_domain_escrow(file, seperator)
return item_to_return
else:
item_to_return = self._read_csv_file(file, seperator, dataclass_type, id_field)
return item_to_return
# Domain escrow is an edgecase given that its structured differently data-wise.
def _read_domain_escrow(self, file, seperator):
dict_data = {}
with open(file, "r", encoding="utf-8-sig") as requested_file:
reader = csv.reader(requested_file, delimiter=seperator)
for row in reader:
domain_name = row[0]
date_format = "%Y-%m-%dT%H:%M:%SZ"
# TODO - add error handling
creation_date = datetime.strptime(row[7], date_format)
expiration_date = datetime.strptime(row[11], date_format)
dict_data[domain_name] = DomainEscrow(domain_name, creation_date, expiration_date)
return dict_data
def _grab_row_id(self, row, id_field, file, dataclass_type):
try:
row_id = row[id_field]
except KeyError as err:
logger.error(
f"{TerminalColors.FAIL}"
"\n Key mismatch! Did you upload the wrong file?"
f"\n File: {file}"
f"\n Expected type: {dataclass_type}"
f"{TerminalColors.ENDC}"
)
raise err
else:
return row_id
def _read_csv_file(self, file, seperator, dataclass_type, id_field):
dict_data = {}
# Used when we encounter bad data
updated_file_content = None
with open(file, "r", encoding="utf-8-sig") as requested_file:
reader = csv.DictReader(requested_file, delimiter=seperator)
for row in reader:
# Checks if we encounter any bad data.
# If we do, we (non-destructively) clean the file
if None in row:
logger.warning(
f"{TerminalColors.YELLOW}"
f"Found bad data in {file}. Attempting to clean."
f"{TerminalColors.ENDC}"
)
updated_file_content = self.replace_bad_seperators(file, f"{seperator}", ";badseperator;")
dict_data = {}
break
row_id = self._grab_row_id(row, id_field, file, dataclass_type)
# To maintain pairity with the load_transition_domain
# script, we store this data in lowercase.
if id_field == "domainname" and row_id is not None:
row_id = row_id.lower()
dict_data[row_id] = dataclass_type(**row)
# After we clean the data, try to parse it again
if updated_file_content:
logger.info(f"{TerminalColors.MAGENTA}" f"Retrying load for {file}" f"{TerminalColors.ENDC}")
# Store the file locally rather than writing to the file.
# This is to avoid potential data corruption.
updated_file = io.StringIO(updated_file_content)
reader = csv.DictReader(updated_file, delimiter=seperator)
for row in reader:
row_id = row[id_field]
# If the key is still none, something
# is wrong with the file.
if None in row:
logger.error(
f"{TerminalColors.FAIL}" f"Corrupt data found for {row_id}. Skipping." f"{TerminalColors.ENDC}"
)
continue
for key, value in row.items():
if value is not None and isinstance(value, str):
value = value.replace(";badseperator;", f" {seperator} ")
row[key] = value
# To maintain pairity with the load_transition_domain
# script, we store this data in lowercase.
if id_field == "domainname" and row_id is not None:
row_id = row_id.lower()
dict_data[row_id] = dataclass_type(**row)
return dict_data
def replace_bad_seperators(self, filename, delimiter, special_character):
with open(filename, "r", encoding="utf-8-sig") as file:
contents = file.read()
new_content = re.sub(rf" \{delimiter} ", special_character, contents)
return new_content

View file

@ -1,9 +1,29 @@
from enum import Enum
import logging
import sys
from typing import List
logger = logging.getLogger(__name__)
class LogCode(Enum):
"""Stores the desired log severity
Overview of error codes:
- 1 ERROR
- 2 WARNING
- 3 INFO
- 4 DEBUG
- 5 DEFAULT
"""
ERROR = 1
WARNING = 2
INFO = 3
DEBUG = 4
DEFAULT = 5
class TerminalColors:
"""Colors for terminal outputs
(makes reading the logs WAY easier)"""
@ -23,7 +43,7 @@ class TerminalColors:
class TerminalHelper:
@staticmethod
def query_yes_no(question: str, default="yes") -> bool:
def query_yes_no(question: str, default="yes"):
"""Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
@ -54,13 +74,79 @@ class TerminalHelper:
logger.info("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
@staticmethod
def print_conditional(print_condition: bool, print_statement: str):
def query_yes_no_exit(question: str, default="yes"):
"""Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
"""
valid = {
"yes": True,
"y": True,
"ye": True,
"no": False,
"n": False,
"e": "exit",
}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
logger.info(question + prompt)
choice = input().lower()
if default is not None and choice == "":
return valid[default]
elif choice in valid:
if valid[choice] == "exit":
sys.exit()
return valid[choice]
else:
logger.info("Please respond with a valid selection.\n")
@staticmethod
def array_as_string(array_to_convert: List[str]) -> str:
array_as_string = "{}".format("\n".join(map(str, array_to_convert)))
return array_as_string
@staticmethod
def print_conditional(
print_condition: bool,
print_statement: str,
log_severity: LogCode = LogCode.DEFAULT,
):
"""This function reduces complexity of debug statements
in other functions.
It uses the logger to write the given print_statement to the
terminal if print_condition is TRUE"""
terminal if print_condition is TRUE.
print_condition: bool -> Prints if print_condition is TRUE
print_statement: str -> The statement to print
log_severity: str -> Determines the severity to log at
"""
# DEBUG:
if print_condition:
match log_severity:
case LogCode.ERROR:
logger.error(print_statement)
case LogCode.WARNING:
logger.warning(print_statement)
case LogCode.INFO:
logger.info(print_statement)
case LogCode.DEBUG:
logger.debug(print_statement)
case _:
logger.info(print_statement)
@staticmethod
@ -68,16 +154,17 @@ class TerminalHelper:
"""Create to reduce code complexity.
Prompts the user to inspect the given string
and asks if they wish to proceed.
Returns true if the user responds (y),
Returns false if the user responds (n)"""
If the user responds (y), returns TRUE
If the user responds (n), either returns FALSE
or exits the system if system_exit_on_terminate = TRUE"""
action_description_for_selecting_no = "skip"
action_description_for_selecting_no = "skip, E = exit"
if system_exit_on_terminate:
action_description_for_selecting_no = "exit"
# Allow the user to inspect the command string
# and ask if they wish to proceed
proceed_execution = TerminalHelper.query_yes_no(
proceed_execution = TerminalHelper.query_yes_no_exit(
f"""{TerminalColors.OKCYAN}
=====================================================
{prompt_title}
@ -96,5 +183,25 @@ class TerminalHelper:
if system_exit_on_terminate:
sys.exit()
return False
return True
@staticmethod
def get_file_line_count(filepath: str) -> int:
with open(filepath, "r") as file:
li = file.readlines()
total_line = len(li)
return total_line
@staticmethod
def print_to_file_conditional(print_condition: bool, filename: str, file_directory: str, file_contents: str):
"""Sometimes logger outputs get insanely huge."""
if print_condition:
# Add a slash if the last character isn't one
if file_directory and file_directory[-1] != "/":
file_directory += "/"
# Assemble filepath
filepath = f"{file_directory}{filename}.txt"
# Write to file
logger.info(f"{TerminalColors.MAGENTA}Writing to file " f" {filepath}..." f"{TerminalColors.ENDC}")
with open(f"{filepath}", "w+") as f:
f.write(file_contents)

View file

@ -0,0 +1,55 @@
from dataclasses import dataclass, field
from typing import Optional
from registrar.management.commands.utility.epp_data_containers import EnumFilenames
@dataclass
class TransitionDomainArguments:
"""Stores arguments for load_transition_domain, structurally a mix
of a dataclass and a regular class, meaning we get a hardcoded
representation of the values we want, while maintaining flexiblity
and reducing boilerplate.
All pre-defined fields are optional but will remain on the model definition.
In this event, they are provided a default value if none is given.
"""
# Maintains an internal kwargs list and sets values
# that match the class definition.
def __init__(self, **kwargs):
self.kwargs = kwargs
for k, v in kwargs.items():
if hasattr(self, k):
setattr(self, k, v)
# These all use field() to minimize typing and/or lambda.
# Since this file is bound to expand, we can save time
# by reducing the line count from 2-3 to just 1 line
# each time we want to add a new filename or option.
# This approach is also used in EppLib internally for similar reasons.
# Settings #
directory: Optional[str] = field(default="migrationdata", repr=True)
sep: Optional[str] = field(default="|", repr=True)
limitParse: Optional[int] = field(default=None, repr=True)
# Filenames #
# = Adhocs =#
agency_adhoc_filename: Optional[str] = field(default=EnumFilenames.AGENCY_ADHOC.value[1], repr=True)
domain_adhoc_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADHOC.value[1], repr=True)
organization_adhoc_filename: Optional[str] = field(default=EnumFilenames.ORGANIZATION_ADHOC.value[1], repr=True)
authority_adhoc_filename: Optional[str] = field(default=EnumFilenames.AUTHORITY_ADHOC.value[1], repr=True)
# = Data files =#
domain_escrow_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ESCROW.value[1], repr=True)
domain_additional_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADDITIONAL.value[1], repr=True)
domain_contacts_filename: Optional[str] = field(default=None, repr=True)
domain_statuses_filename: Optional[str] = field(default=None, repr=True)
contacts_filename: Optional[str] = field(default=None, repr=True)
# Flags #
debug: Optional[bool] = field(default=False, repr=True)
resetTable: Optional[bool] = field(default=False, repr=True)
infer_filenames: Optional[bool] = field(default=False, repr=True)

View file

@ -0,0 +1,53 @@
# Generated by Django 4.2.1 on 2023-10-24 20:44
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("registrar", "0044_create_groups_v04"),
]
operations = [
migrations.AddField(
model_name="transitiondomain",
name="federal_agency",
field=models.TextField(blank=True, help_text="Federal agency", null=True),
),
migrations.AddField(
model_name="transitiondomain",
name="federal_type",
field=models.TextField(
blank=True,
help_text="Federal government branch",
max_length=50,
null=True,
),
),
migrations.AddField(
model_name="transitiondomain",
name="organization_type",
field=models.TextField(blank=True, help_text="Type of organization", max_length=255, null=True),
),
migrations.AddField(
model_name="transitiondomain",
name="organization_name",
field=models.TextField(blank=True, db_index=True, help_text="Organization name", null=True),
),
migrations.AddField(
model_name="transitiondomain",
name="epp_creation_date",
field=models.DateField(
help_text="Duplication of registry's creation date saved for ease of reporting",
null=True,
),
),
migrations.AddField(
model_name="transitiondomain",
name="epp_expiration_date",
field=models.DateField(
help_text="Duplication of registry's expiration date saved for ease of reporting",
null=True,
),
),
]

View file

@ -164,20 +164,30 @@ class DomainApplication(TimeStampedModel):
"Administrative Conference of the United States",
"Advisory Council on Historic Preservation",
"American Battle Monuments Commission",
"AMTRAK",
"Appalachian Regional Commission",
("Appraisal Subcommittee of the Federal Financial Institutions Examination Council"),
("Appraisal Subcommittee of the Federal Financial " "Institutions Examination Council"),
"Appraisal Subcommittee",
"Architect of the Capitol",
"Armed Forces Retirement Home",
"Barry Goldwater Scholarship and Excellence in Education Foundation",
"Barry Goldwater Scholarship and Excellence in Education Program",
"Central Intelligence Agency",
"Chemical Safety Board",
"Christopher Columbus Fellowship Foundation",
"Civil Rights Cold Case Records Review Board",
"Commission for the Preservation of America's Heritage Abroad",
"Commission of Fine Arts",
"Committee for Purchase From People Who Are Blind or Severely Disabled",
"Commodity Futures Trading Commission",
"Congressional Budget Office",
"Consumer Financial Protection Bureau",
"Consumer Product Safety Commission",
"Corporation for National & Community Service",
"Corporation for National and Community Service",
"Council of Inspectors General on Integrity and Efficiency",
"Court Services and Offender Supervision",
"Cyberspace Solarium Commission",
"DC Court Services and Offender Supervision Agency",
"DC Pre-trial Services",
"Defense Nuclear Facilities Safety Board",
@ -203,12 +213,15 @@ class DomainApplication(TimeStampedModel):
"Election Assistance Commission",
"Environmental Protection Agency",
"Equal Employment Opportunity Commission",
"Executive Office of the President",
"Export-Import Bank of the United States",
"Export/Import Bank of the U.S.",
"Farm Credit Administration",
"Farm Credit System Insurance Corporation",
"Federal Communications Commission",
"Federal Deposit Insurance Corporation",
"Federal Election Commission",
"Federal Energy Regulatory Commission",
"Federal Financial Institutions Examination Council",
"Federal Housing Finance Agency",
"Federal Judiciary",
@ -216,11 +229,18 @@ class DomainApplication(TimeStampedModel):
"Federal Maritime Commission",
"Federal Mediation and Conciliation Service",
"Federal Mine Safety and Health Review Commission",
"Federal Permitting Improvement Steering Council",
"Federal Reserve Board of Governors",
"Federal Reserve System",
"Federal Trade Commission",
"General Services Administration",
"gov Administration",
"Government Accountability Office",
"Government Publishing Office",
"Gulf Coast Ecosystem Restoration Council",
"Harry S Truman Scholarship Foundation",
"Harry S. Truman Scholarship Foundation",
"Institute of Museum and Library Services",
"Institute of Peace",
"Inter-American Foundation",
"International Boundary and Water Commission: United States and Mexico",
@ -228,38 +248,55 @@ class DomainApplication(TimeStampedModel):
"International Joint Commission: United States and Canada",
"James Madison Memorial Fellowship Foundation",
"Japan-United States Friendship Commission",
"Japan-US Friendship Commission",
"John F. Kennedy Center for Performing Arts",
"John F. Kennedy Center for the Performing Arts",
"Legal Services Corporation",
"Legislative Branch",
"Library of Congress",
"Marine Mammal Commission",
"Medicaid and CHIP Payment and Access Commission",
"Medical Payment Advisory Commission",
"Medicare Payment Advisory Commission",
"Merit Systems Protection Board",
"Millennium Challenge Corporation",
"Morris K. Udall and Stewart L. Udall Foundation",
"National Aeronautics and Space Administration",
"National Archives and Records Administration",
"National Capital Planning Commission",
"National Council on Disability",
"National Credit Union Administration",
"National Endowment for the Arts",
"National Endowment for the Humanities",
"National Foundation on the Arts and the Humanities",
"National Gallery of Art",
"National Indian Gaming Commission",
"National Labor Relations Board",
"National Mediation Board",
"National Science Foundation",
"National Security Commission on Artificial Intelligence",
"National Transportation Safety Board",
"Networking Information Technology Research and Development",
"Non-Federal Agency",
"Northern Border Regional Commission",
"Nuclear Regulatory Commission",
"Nuclear Safety Oversight Committee",
"Nuclear Waste Technical Review Board",
"Occupational Safety & Health Review Commission",
"Occupational Safety and Health Review Commission",
"Office of Compliance",
"Office of Congressional Workplace Rights",
"Office of Government Ethics",
"Office of Navajo and Hopi Indian Relocation",
"Office of Personnel Management",
"Open World Leadership Center",
"Overseas Private Investment Corporation",
"Peace Corps",
"Pension Benefit Guaranty Corporation",
"Postal Regulatory Commission",
"Presidio Trust",
"Privacy and Civil Liberties Oversight Board",
"Public Buildings Reform Board",
"Public Defender Service for the District of Columbia",
"Railroad Retirement Board",
"Securities and Exchange Commission",
@ -267,30 +304,58 @@ class DomainApplication(TimeStampedModel):
"Small Business Administration",
"Smithsonian Institution",
"Social Security Administration",
"Social Security Advisory Board",
"Southeast Crescent Regional Commission",
"Southwest Border Regional Commission",
"State Justice Institute",
"State, Local, and Tribal Government",
"Stennis Center for Public Service",
"Surface Transportation Board",
"Tennessee Valley Authority",
"The Executive Office of the President",
"The Intelligence Community",
"The Legislative Branch",
"The Supreme Court",
"The United States World War One Centennial Commission",
"U.S. Access Board",
"U.S. Agency for Global Media",
"U.S. Agency for International Development",
"U.S. Capitol Police",
"U.S. Chemical Safety Board",
"U.S. China Economic and Security Review Commission",
"U.S. Commission for the Preservation of Americas Heritage Abroad",
"U.S. Commission of Fine Arts",
"U.S. Commission on Civil Rights",
"U.S. Commission on International Religious Freedom",
"U.S. Courts",
"U.S. Department of Agriculture",
"U.S. Interagency Council on Homelessness",
"U.S. International Trade Commission",
"U.S. Nuclear Waste Technical Review Board",
"U.S. Office of Special Counsel",
"U.S. Peace Corps",
"U.S. Postal Service",
"U.S. Semiquincentennial Commission",
"U.S. Trade and Development Agency",
"U.S.-China Economic and Security Review Commission",
"Udall Foundation",
"United States AbilityOne",
"United States Access Board",
"United States African Development Foundation",
"United States Agency for Global Media",
"United States Arctic Research Commission",
"United States Global Change Research Program",
"United States Holocaust Memorial Museum",
"United States Institute of Peace",
"United States Interagency Council on Homelessness",
"United States International Development Finance Corporation",
"United States International Trade Commission",
"United States Postal Service",
"United States Senate",
"United States Trade and Development Agency",
"Utah Reclamation Mitigation and Conservation Commission",
"Vietnam Education Foundation",
"Western Hemisphere Drug Policy Commission",
"Woodrow Wilson International Center for Scholars",
"World War I Centennial Commission",
]

View file

@ -1,5 +1,4 @@
from django.db import models
from .utility.time_stamped_model import TimeStampedModel
@ -43,11 +42,53 @@ class TransitionDomain(TimeStampedModel):
verbose_name="email sent",
help_text="indicates whether email was sent",
)
organization_type = models.TextField(
max_length=255,
null=True,
blank=True,
help_text="Type of organization",
)
organization_name = models.TextField(
null=True,
blank=True,
help_text="Organization name",
db_index=True,
)
federal_type = models.TextField(
max_length=50,
null=True,
blank=True,
help_text="Federal government branch",
)
federal_agency = models.TextField(
null=True,
blank=True,
help_text="Federal agency",
)
epp_creation_date = models.DateField(
null=True,
help_text=("Duplication of registry's creation " "date saved for ease of reporting"),
)
epp_expiration_date = models.DateField(
null=True,
help_text=("Duplication of registry's expiration " "date saved for ease of reporting"),
)
def __str__(self):
return f"{self.username}, {self.domain_name}"
def display_transition_domain(self):
"""Displays all information about a TransitionDomain in string format"""
return (
f"username: {self.username} "
f"domainName: {self.domain_name} "
f"status: {self.status} "
f"email sent: {self.email_sent} "
f"\n-----TRANSITION DOMAIN------\n"
f"domainName: {self.domain_name}, \n"
f"username: {self.username}, \n"
f"status: {self.status}, \n"
f"email sent: {self.email_sent}, \n"
f"organization type: {self.organization_type}, \n"
f"organization_name: {self.organization_name}, \n"
f"federal_type: {self.federal_type}, \n"
f"federal_agency: {self.federal_agency}, \n"
f"epp_creation_date: {self.epp_creation_date}, \n"
f"epp_expiration_date: {self.epp_expiration_date}, \n"
)

View file

@ -0,0 +1,6 @@
agencyid|agencyname|active|isfederal
1|Thoughtstorm|N|Y
2|Minyx|Y|N
3|Demivee|N|Y
4|Department of Commerce|Y|Y
5|InnoZ|Y|Y

View file

@ -0,0 +1,6 @@
authorityid|firstname|middlename|lastname|email|phonenumber|agencyid|addlinfo
1|Gregoor|middle|Kalinke|gkalinke0@indiegogo.com|(773) 172-5515|1|Asparagus - Mexican
2|Fayre||Filippozzi|ffilippozzi1@hugedomains.com|(357) 487-4280|2|Steampan - Foil
3|Gabey||Lightbody|glightbody2@fc2.com|(332) 816-5691|3|Soup - Campbells, Minestrone
4|Seline||Tower|stower3@answers.com|(151) 539-6028|4|Kiwi Gold Zespri
5|Joe||Smoe|joe@smoe.gov|(111) 111-1111|5|Kiwi Gold Zespri

View file

@ -5,3 +5,6 @@ USER3|12356_CONTACT|123-123-1234||918-000-0000||stephania.winters4@test.com|GSA|
USER4|12357_CONTACT|123-123-1234||918-000-0000||alexandra.bobbitt5@test.com|GSA|SOMECOMPANY|ctldbatch|2021-06-30T15:23:10Z|SOMECOMPANY|ctldbatch|2021-08-02T22:13:09Z|
USER5|12362_CONTACT|123-123-1234||918-000-0000||jospeh.mcdowell3@test.com|GSA|SOMECOMPANY|ctldbatch|2021-06-30T17:58:09Z|SOMECOMPANY|ctldbatch|2021-06-30T18:33:09Z|
USER6|12363_CONTACT|123-123-1234||918-000-0000||reginald.ratcliff4@test.com|GSA|SOMECOMPANY|ctldbatch|2021-06-30T17:58:09Z|SOMECOMPANY|ctldbatch|2021-06-30T18:18:09Z|
USER7|12364_CONTACT|123-123-1234||918-000-0000||reginald.ratcliff4@test.com|GSA|SOMECOMPANY|ctldbatch|2021-06-30T17:58:09Z|SOMECOMPANY|ctldbatch|2021-06-30T18:18:09Z|
USER8|12365_CONTACT|123-123-1234||918-000-0000||reginald.ratcliff4@test.com|GSA|SOMECOMPANY|ctldbatch|2021-06-30T17:58:09Z|SOMECOMPANY|ctldbatch|2021-06-30T18:18:09Z|
USER9|12366_CONTACT|123-123-1234||918-000-0000||reginald.ratcliff4@test.com|GSA|SOMECOMPANY|ctldbatch|2021-06-30T17:58:09Z|SOMECOMPANY|ctldbatch|2021-06-30T18:18:09Z|

View file

@ -0,0 +1,6 @@
domainname|domaintypeid|authorityid|orgid|securitycontactemail|dnsseckeymonitor|domainpurpose
Anomaly.gov|1|1|1|ggennrich0@utexas.edu|N|Praesent id massa id nisl venenatis lacinia.
TESTDOMAIN.GOV|2|2|2|lrome1@uol.com.br|Y|In tempor, turpis nec euismod scelerisque, quam turpis adipiscing lorem, vitae mattis nibh ligula nec sem.
FakeWebsite1.gov|3|3|3|ybrommage2@vistaprint.com|Y|In hac habitasse platea dictumst.
FakeWebsite2.gov|4|4|4|plarderot3@t.co|Y|Morbi quis tortor id nulla ultrices aliquet. Maecenas leo odio, condimentum id, luctus nec, molestie sed, justo. Pellentesque viverra pede ac diam.
FakeWebsite3.gov|13|5|5|ybrommage2@vistaprint.com|Y|In hac habitasse platea dictumst.

View file

@ -1,8 +1,11 @@
Anomaly.gov|ANOMALY|tech
TestDomain.gov|TESTUSER|admin
FakeWebsite1|USER1|admin
FakeWebsite1|USER2|tech
FakeWebsite1|USER3|billing
FakeWebsite2.GOV|USER4|admin
FakeWebsite2.GOV|USER5|billing
FakeWebsite2.GOV|USER6|tech
FakeWebsite1.gov|USER1|admin
FakeWebsite1.gov|USER2|tech
FakeWebsite1.gov|USER3|billing
FakeWebsite2.gov|USER4|admin
FakeWebsite2.gov|USER5|billing
FakeWebsite2.gov|USER6|tech
FakeWebsite3.gov|USER7|admin
FakeWebsite3.gov|USER8|billing
FakeWebsite3.gov|USER9|tech

View file

@ -1,4 +1,5 @@
Anomaly.gov|muahaha|
TestDomain.gov|ok|
FakeWebsite1.GOV|serverHold|
FakeWebsite2.GOV|Hold|
FakeWebsite1.gov|serverHold|
FakeWebsite2.gov|Hold|
FakeWebsite3.gov|ok|

View file

@ -0,0 +1,18 @@
domaintypeid|domaintype|code|active
14|Federal - Legislative|FL|Y
15|Federal - Judicial|FJ|Y
16|Interstate|IA|Y
1|Commercial|CO|N
2|Courts|CT|N
3|Federal Government Program|FD|N
4|Federal - Executive|FE|Y
5|Foreign|FO|N
6|Individual|IN|N
7|Military|MI|N
8|Not Top-Level|NT|N
9|State|ST|Y
10|Tribal|TN|Y
11|Domain type 1|1|N
12|County|CY|Y
13|City|CI|Y
17|Independent Intrastate|II|Y

View file

@ -0,0 +1,5 @@
Anomaly.gov|SOME_STRING||data|data|data|data|2008-03-09T16:12:47Z|DATA2|ctldbatch|2022-06-06T01:33:10Z|2023-03-09T16:12:47Z|2023-02-09T16:12:47Z
TestDomain.gov|SOME_STRING|data|data|data|data|data|2014-03-15T15:45:05Z|DATA2|ctldbatch|2022-02-13T17:33:07Z|2023-03-15T15:45:05Z|2023-02-15T15:45:05Z
FakeWebsite1.gov|SOME_STRING||data|data|data|data|2020-06-14T16:30:06Z|DATA2|ctldbatch|2022-05-16T14:58:10Z|2023-06-14T16:30:06Z|2023-05-14T16:30:06Z
FakeWebsite2.gov|SOME_STRING||data|data|data|data|2004-05-07T04:00:00Z|DATA2|ctldbatch|2022-08-18T15:23:09Z|2023-09-30T18:37:39Z|2023-08-30T18:37:39Z
FakeWebsite3.gov|SOME_STRING||data|data|data|data|2004-05-07T04:00:00Z|DATA2|ctldbatch|2022-08-18T15:23:09Z|2023-09-30T18:37:39Z|2023-08-30T18:37:39Z

View file

@ -0,0 +1,12 @@
{
"directory": "registrar/tests/data",
"agency_adhoc_filename": "test_agency_adhoc.txt",
"authority_adhoc_filename": "test_authority_adhoc.txt",
"organization_adhoc_filename": "test_organization_adhoc.txt",
"domain_adhoc_filename": "test_domain_types_adhoc.txt",
"domain_additional_filename": "test_domain_additional.txt",
"domain_contacts_filename": "test_domain_contacts.txt",
"domain_escrow_filename": "test_escrow_domains_daily.txt",
"domain_statuses_filename": "test_domain_statuses.txt",
"contacts_filename": "test_contacts.txt"
}

View file

@ -0,0 +1,6 @@
orgid|orgname|orgstreet|orgcity|orgstate|orgzip|orgcountrycode
1|Flashdog|298 Monument Hill|Lakeland|Florida|33805|US
2|Gigaclub|782 Mosinee Lane|Alexandria|Louisiana|71307|US
3|corrupt data|376 Joe Pass|Waco | corruption|Texas|76705|US
4|Fanoodle|93001 Arizona Drive|Columbus|Ohio|43268|US
5|Sushi|9999 Sushi Way|Columbus|Ohio|43268|US

View file

@ -1,3 +1,5 @@
import datetime
from io import StringIO
from django.test import TestCase
@ -12,55 +14,72 @@ from registrar.models import (
)
from django.core.management import call_command
from unittest.mock import patch
from .common import less_console_noise
class TestLogins(TestCase):
""" """
class TestMigrations(TestCase):
def setUp(self):
""" """
# self.load_transition_domain_script = "load_transition_domain",
# self.transfer_script = "transfer_transition_domains_to_domains",
# self.master_script = "load_transition_domain",
self.test_data_file_location = "/app/registrar/tests/data"
self.test_data_file_location = "registrar/tests/data"
self.test_domain_contact_filename = "test_domain_contacts.txt"
self.test_contact_filename = "test_contacts.txt"
self.test_domain_status_filename = "test_domain_statuses.txt"
# Files for parsing additional TransitionDomain data
self.test_agency_adhoc_filename = "test_agency_adhoc.txt"
self.test_authority_adhoc_filename = "test_authority_adhoc.txt"
self.test_domain_additional = "test_domain_additional.txt"
self.test_domain_types_adhoc = "test_domain_types_adhoc.txt"
self.test_escrow_domains_daily = "test_escrow_domains_daily"
self.test_organization_adhoc = "test_organization_adhoc.txt"
self.migration_json_filename = "test_migrationFilepaths.json"
def tearDown(self):
super().tearDown()
# Delete domain information
TransitionDomain.objects.all().delete()
Domain.objects.all().delete()
DomainInvitation.objects.all().delete()
DomainInformation.objects.all().delete()
# Delete users
User.objects.all().delete()
UserDomainRole.objects.all().delete()
def run_load_domains(self):
# noqa here because splitting this up makes it confusing.
# ES501
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa
return_value=True,
):
call_command(
"load_transition_domain",
f"{self.test_data_file_location}/{self.test_domain_contact_filename}",
f"{self.test_data_file_location}/{self.test_contact_filename}",
f"{self.test_data_file_location}/{self.test_domain_status_filename}",
self.migration_json_filename,
directory=self.test_data_file_location,
)
def run_transfer_domains(self):
call_command("transfer_transition_domains_to_domains")
def run_master_script(self):
# noqa here (E501) because splitting this up makes it
# confusing to read.
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa
return_value=True,
):
call_command(
"master_domain_migrations",
runMigrations=True,
migrationDirectory=f"{self.test_data_file_location}",
migrationFilenames=(
f"{self.test_domain_contact_filename},"
f"{self.test_contact_filename},"
f"{self.test_domain_status_filename}"
),
migrationDirectory=self.test_data_file_location,
migrationJSON=self.migration_json_filename,
disablePrompts=True,
)
def compare_tables(
@ -120,7 +139,7 @@ class TestLogins(TestCase):
total_missing_domains = {len(missing_domains)}
total_duplicate_domains = {len(duplicate_domains)}
total_missing_domain_informations = {len(missing_domain_informations)}
total_missing_domain_invitations = {len(missing_domain_invites)}
total_missing_domain_invitations = {total_missing_domain_invitations}
total_transition_domains = {len(TransitionDomain.objects.all())}
total_domains = {len(Domain.objects.all())}
@ -128,16 +147,15 @@ class TestLogins(TestCase):
total_domain_invitations = {len(DomainInvitation.objects.all())}
"""
)
self.assertEqual(total_missing_domains, expected_missing_domains)
self.assertEqual(total_duplicate_domains, expected_duplicate_domains)
self.assertEqual(total_missing_domain_informations, expected_missing_domain_informations)
self.assertEqual(total_missing_domain_invitations, expected_missing_domain_invitations)
self.assertTrue(total_missing_domains == expected_missing_domains)
self.assertTrue(total_duplicate_domains == expected_duplicate_domains)
self.assertTrue(total_missing_domain_informations == expected_missing_domain_informations)
self.assertTrue(total_missing_domain_invitations == expected_missing_domain_invitations)
self.assertTrue(total_transition_domains == expected_total_transition_domains)
self.assertTrue(total_domains == expected_total_domains)
self.assertTrue(total_domain_informations == expected_total_domain_informations)
self.assertTrue(total_domain_invitations == expected_total_domain_invitations)
self.assertEqual(total_transition_domains, expected_total_transition_domains)
self.assertEqual(total_domains, expected_total_domains)
self.assertEqual(total_domain_informations, expected_total_domain_informations)
self.assertEqual(total_domain_invitations, expected_total_domain_invitations)
def test_master_migration_functions(self):
"""Run the full master migration script using local test data.
@ -150,16 +168,14 @@ class TestLogins(TestCase):
# STEP 2: (analyze the tables just like the
# migration script does, but add assert statements)
expected_total_transition_domains = 8
expected_total_domains = 4
expected_total_domain_informations = 0
expected_total_domain_invitations = 7
expected_total_transition_domains = 9
expected_total_domains = 5
expected_total_domain_informations = 5
expected_total_domain_invitations = 8
expected_missing_domains = 0
expected_duplicate_domains = 0
# we expect 8 missing domain invites since the
# migration does not auto-login new users
expected_missing_domain_informations = 8
expected_missing_domain_informations = 0
# we expect 1 missing invite from anomaly.gov (an injected error)
expected_missing_domain_invitations = 1
self.compare_tables(
@ -173,20 +189,21 @@ class TestLogins(TestCase):
expected_missing_domain_invitations,
)
def test_load_transition_domain(self):
def test_load_empty_transition_domain(self):
"""Loads TransitionDomains without additional data"""
self.run_load_domains()
# STEP 2: (analyze the tables just like the migration
# script does, but add assert statements)
expected_total_transition_domains = 8
expected_total_transition_domains = 9
expected_total_domains = 0
expected_total_domain_informations = 0
expected_total_domain_invitations = 0
expected_missing_domains = 8
expected_missing_domains = 9
expected_duplicate_domains = 0
expected_missing_domain_informations = 8
expected_missing_domain_invitations = 8
expected_missing_domain_informations = 9
expected_missing_domain_invitations = 9
self.compare_tables(
expected_total_transition_domains,
expected_total_domains,
@ -198,20 +215,122 @@ class TestLogins(TestCase):
expected_missing_domain_invitations,
)
def test_transfer_transition_domains_to_domains(self):
# TODO: setup manually instead of calling other script
def test_load_full_domain(self):
self.run_load_domains()
self.run_transfer_domains()
# Analyze the tables
expected_total_transition_domains = 8
expected_total_domains = 4
expected_total_domain_informations = 0
expected_total_domain_invitations = 7
expected_total_transition_domains = 9
expected_total_domains = 5
expected_total_domain_informations = 5
expected_total_domain_invitations = 8
expected_missing_domains = 0
expected_duplicate_domains = 0
expected_missing_domain_informations = 8
expected_missing_domain_informations = 0
expected_missing_domain_invitations = 1
self.compare_tables(
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_total_domain_invitations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
expected_missing_domain_invitations,
)
# Test created domains
anomaly_domains = Domain.objects.filter(name="anomaly.gov")
self.assertEqual(anomaly_domains.count(), 1)
anomaly = anomaly_domains.get()
self.assertEqual(anomaly.expiration_date, datetime.date(2023, 3, 9))
self.assertEqual(anomaly.name, "anomaly.gov")
self.assertEqual(anomaly.state, "ready")
testdomain_domains = Domain.objects.filter(name="fakewebsite2.gov")
self.assertEqual(testdomain_domains.count(), 1)
testdomain = testdomain_domains.get()
self.assertEqual(testdomain.expiration_date, datetime.date(2023, 9, 30))
self.assertEqual(testdomain.name, "fakewebsite2.gov")
self.assertEqual(testdomain.state, "on hold")
def test_load_full_domain_information(self):
self.run_load_domains()
self.run_transfer_domains()
# Analyze the tables
expected_total_transition_domains = 9
expected_total_domains = 5
expected_total_domain_informations = 5
expected_total_domain_invitations = 8
expected_missing_domains = 0
expected_duplicate_domains = 0
expected_missing_domain_informations = 0
expected_missing_domain_invitations = 1
self.compare_tables(
expected_total_transition_domains,
expected_total_domains,
expected_total_domain_informations,
expected_total_domain_invitations,
expected_missing_domains,
expected_duplicate_domains,
expected_missing_domain_informations,
expected_missing_domain_invitations,
)
# Test created Domain Information objects
domain = Domain.objects.filter(name="anomaly.gov").get()
anomaly_domain_infos = DomainInformation.objects.filter(domain=domain)
self.assertEqual(anomaly_domain_infos.count(), 1)
# This domain should be pretty barebones. Something isnt
# parsing right if we get a lot of data.
anomaly = anomaly_domain_infos.get()
self.assertEqual(anomaly.organization_name, "Flashdog")
self.assertEqual(anomaly.organization_type, None)
self.assertEqual(anomaly.federal_agency, None)
self.assertEqual(anomaly.federal_type, None)
# Check for the "system" creator user
Users = User.objects.filter(username="System")
self.assertEqual(Users.count(), 1)
self.assertEqual(anomaly.creator, Users.get())
domain = Domain.objects.filter(name="fakewebsite2.gov").get()
fakewebsite_domain_infos = DomainInformation.objects.filter(domain=domain)
self.assertEqual(fakewebsite_domain_infos.count(), 1)
fakewebsite = fakewebsite_domain_infos.get()
self.assertEqual(fakewebsite.organization_name, "Fanoodle")
self.assertEqual(fakewebsite.organization_type, "federal")
self.assertEqual(fakewebsite.federal_agency, "Department of Commerce")
self.assertEqual(fakewebsite.federal_type, "executive")
# Check for the "system" creator user
Users = User.objects.filter(username="System")
self.assertEqual(Users.count(), 1)
self.assertEqual(anomaly.creator, Users.get())
def test_transfer_transition_domains_to_domains(self):
self.run_load_domains()
self.run_transfer_domains()
# Analyze the tables
expected_total_transition_domains = 9
expected_total_domains = 5
expected_total_domain_informations = 5
expected_total_domain_invitations = 8
expected_missing_domains = 0
expected_duplicate_domains = 0
expected_missing_domain_informations = 0
expected_missing_domain_invitations = 1
self.compare_tables(
expected_total_transition_domains,
@ -236,14 +355,14 @@ class TestLogins(TestCase):
user.on_each_login()
# Analyze the tables
expected_total_transition_domains = 8
expected_total_domains = 4
expected_total_domain_informations = 3
expected_total_domain_invitations = 7
expected_total_transition_domains = 9
expected_total_domains = 5
expected_total_domain_informations = 5
expected_total_domain_invitations = 8
expected_missing_domains = 0
expected_duplicate_domains = 0
expected_missing_domain_informations = 1
expected_missing_domain_informations = 0
expected_missing_domain_invitations = 1
self.compare_tables(
expected_total_transition_domains,