diff --git a/docs/django-admin/roles.md b/docs/django-admin/roles.md index 91c2949eb..458029e07 100644 --- a/docs/django-admin/roles.md +++ b/docs/django-admin/roles.md @@ -13,7 +13,8 @@ For more details, refer to the [user group model](../../src/registrar/models/use We can edit and deploy new group permissions by: -1. editing `user_group` then: +1. Editing `user_group` then: 2. Duplicating migration `0036_create_groups_01` and running migrations (append the name with a version number -to help django detect the migration eg 0037_create_groups_02) \ No newline at end of file +to help django detect the migration eg 0037_create_groups_02) +3. Making sure to update the dependency on the new migration with the previous migration \ No newline at end of file diff --git a/docs/operations/data_migration.md b/docs/operations/data_migration.md index c677554de..192db0db8 100644 --- a/docs/operations/data_migration.md +++ b/docs/operations/data_migration.md @@ -1,11 +1,12 @@ # Registrar Data Migration -There is an existing registrar/registry at Verisign. They will provide us with an -export of the data from that system. The goal of our data migration is to take -the provided data and use it to create as much as possible a _matching_ state +The original system has an existing registrar/registry that we will import. +The company of that system will provide us with an export of the data. +The goal of our data migration is to take the provided data and use +it to create as much as possible a _matching_ state in our registrar. -There is no way to make our registrar _identical_ to the Verisign system +There is no way to make our registrar _identical_ to the original system because we have a different data model and workflow model. Instead, we should focus our migration efforts on creating a state in our new registrar that will primarily allow users of the system to perform the tasks that they want to do. @@ -18,7 +19,7 @@ Login.gov account can make an account on the new registrar, and the first time that person logs in through Login.gov, we make a corresponding account in our user table. Because we cannot know the Universal Unique ID (UUID) for a person's Login.gov account, we cannot pre-create user accounts for individuals -in our new registrar based on the data from Verisign. +in our new registrar based on the original data. ## Domains @@ -27,7 +28,7 @@ information is the registry, but the registrar needs a copy of that information to make connections between registry users and the domains that they manage. The registrar stores very few fields about a domain except for its name, so it could be straightforward to import the exported list of domains -from Verisign's `escrow_domains.daily.dotgov.GOV.txt`. It doesn't appear that +from `escrow_domains.daily.dotgov.GOV.txt`. It doesn't appear that that table stores a flag for active or inactive. An example Django management command that can load the delimited text file @@ -43,7 +44,7 @@ docker compose run -T app ./manage.py load_domains_data < /tmp/escrow_domains.da ## User access to domains -The Verisign data contains a `escrow_domain_contacts.daily.dotgov.txt` file +The data export contains a `escrow_domain_contacts.daily.dotgov.txt` file that links each domain to three different types of contacts: `billing`, `tech`, and `admin`. The ID of the contact in this linking table corresponds to the ID of a contact in the `escrow_contacts.daily.dotgov.txt` file. In the @@ -59,9 +60,9 @@ invitation's domain. For the purposes of migration, we can prime the invitation system by creating an invitation in the system for each email address listed in the `domain_contacts` file. This means that if a person is currently a user in the -Verisign system, and they use the same email address with Login.gov, then they +original system, and they use the same email address with Login.gov, then they will end up with access to the same domains in the new registrar that they -were associated with in the Verisign system. +were associated with in the original system. A management command that does this needs to process two data files, one for the contact information and one for the domain/contact association, so we @@ -76,3 +77,56 @@ An example script using this technique is in ```shell docker compose run app ./manage.py load_domain_invitations /app/escrow_domain_contacts.daily.dotgov.GOV.txt /app/escrow_contacts.daily.dotgov.GOV.txt ``` + +## Transition Domains +We are provided with information about Transition Domains in 3 files: +FILE 1: **escrow_domain_contacts.daily.gov.GOV.txt** -> has the map of domain names to contact ID. Domains in this file will usually have 3 contacts each +FILE 2: **escrow_contacts.daily.gov.GOV.txt** -> has the mapping of contact id to contact email address (which is what we care about for sending domain invitations) +FILE 3: **escrow_domain_statuses.daily.gov.GOV.txt** -> has the map of domains and their statuses + +Transferring this data from these files into our domain tables happens in two steps; + +***IMPORTANT: only run the following locally, to avoid publicizing PII in our public repo.*** + +### STEP 1: Load Transition Domain data into TransitionDomain table + +**SETUP** +In order to use the management command, we need to add the files to a folder under `src/`. +This will allow Docker to mount the files to a container (under `/app`) for our use. + + - Create a folder called `tmp` underneath `src/` + - Add the above files to this folder + - Open a terminal and navigate to `src/` + +Then run the following command (This will parse the three files in your `tmp` folder and load the information into the TransitionDomain table); +```shell +docker compose run -T app ./manage.py load_transition_domain /app/tmp/escrow_domain_contacts.daily.gov.GOV.txt /app/tmp/escrow_contacts.daily.gov.GOV.txt /app/tmp/escrow_domain_statuses.daily.gov.GOV.txt +``` + +**OPTIONAL COMMAND LINE ARGUMENTS**: +`--debug` +This will print out additional, detailed logs. + +`--limitParse 100` +Directs the script to load only the first 100 entries into the table. You can adjust this number as needed for testing purposes. + +`--resetTable` +This will delete all the data loaded into transtion_domain. It is helpful if you want to see the entries reload from scratch or for clearing test data. + + +### STEP 2: Transfer Transition Domain data into main Domain tables + +Now that we've loaded all the data into TransitionDomain, we need to update the main Domain and DomainInvitation tables with this information. + +In the same terminal as used in STEP 1, run the command below; +(This will parse the data in TransitionDomain and either create a corresponding Domain object, OR, if a corresponding Domain already exists, it will update that Domain with the incoming status. It will also create DomainInvitation objects for each user associated with the domain): +```shell +docker compose run -T app ./manage.py transfer_transition_domains_to_domains +``` + +**OPTIONAL COMMAND LINE ARGUMENTS**: +`--debug` +This will print out additional, detailed logs. + +`--limitParse 100` +Directs the script to load only the first 100 entries into the table. You can adjust this number as needed for testing purposes. \ No newline at end of file diff --git a/src/registrar/admin.py b/src/registrar/admin.py index eccfa1750..8d0ed8c2e 100644 --- a/src/registrar/admin.py +++ b/src/registrar/admin.py @@ -138,24 +138,12 @@ class MyUserAdmin(BaseUserAdmin): "email", "first_name", "last_name", + # Group is a custom property defined within this file, + # rather than in a model like the other properties "group", "status", ) - list_filter = ( - "is_active", - "groups", - ) - - # Let's define First group - # (which should in theory be the ONLY group) - def group(self, obj): - if obj.groups.filter(name="full_access_group").exists(): - return "Full access" - elif obj.groups.filter(name="cisa_analysts_group").exists(): - return "Analyst" - return "" - fieldsets = ( ( None, @@ -222,6 +210,20 @@ class MyUserAdmin(BaseUserAdmin): "date_joined", ] + list_filter = ( + "is_active", + "groups", + ) + + # Let's define First group + # (which should in theory be the ONLY group) + def group(self, obj): + if obj.groups.filter(name="full_access_group").exists(): + return "Full access" + elif obj.groups.filter(name="cisa_analysts_group").exists(): + return "Analyst" + return "" + def get_list_display(self, request): # The full_access_permission perm will load onto the full_access_group # which is equivalent to superuser. The other group we use to manage @@ -340,6 +342,12 @@ class DomainInvitationAdmin(ListHeaderAdmin): ] search_help_text = "Search by email or domain." + # Mark the FSM field 'status' as readonly + # to allow admin users to create Domain Invitations + # without triggering the FSM Transition Not Allowed + # error. + readonly_fields = ["status"] + class DomainInformationAdmin(ListHeaderAdmin): """Customize domain information admin class.""" @@ -811,7 +819,8 @@ class DomainAdmin(ListHeaderAdmin): else: self.message_user( request, - ("Domain statuses are %s" ". Thanks!") % statuses, + f"The registry statuses are {statuses}. " + "These statuses are from the provider of the .gov registry.", ) return HttpResponseRedirect(".") diff --git a/src/registrar/fixtures_users.py b/src/registrar/fixtures_users.py index 6b6e191d8..dfe51785b 100644 --- a/src/registrar/fixtures_users.py +++ b/src/registrar/fixtures_users.py @@ -39,6 +39,7 @@ class UserFixture: "username": "70488e0a-e937-4894-a28c-16f5949effd4", "first_name": "Gaby", "last_name": "DiSarli", + "email": "gaby@truss.works", }, { "username": "83c2b6dd-20a2-4cac-bb40-e22a72d2955c", @@ -129,7 +130,7 @@ class UserFixture: "username": "0eb6f326-a3d4-410f-a521-aa4c1fad4e47", "first_name": "Gaby-Analyst", "last_name": "DiSarli-Analyst", - "email": "gaby@truss.works", + "email": "gaby+1@truss.works", }, { "username": "cfe7c2fc-e24a-480e-8b78-28645a1459b3", diff --git a/src/registrar/forms/domain.py b/src/registrar/forms/domain.py index f14448bcf..79fe46add 100644 --- a/src/registrar/forms/domain.py +++ b/src/registrar/forms/domain.py @@ -20,7 +20,8 @@ class DomainNameserverForm(forms.Form): """Form for changing nameservers.""" - server = forms.CharField(label="Name server") + server = forms.CharField(label="Name server", strip=True) + # when adding IPs to this form ensure they are stripped as well NameserverFormset = formset_factory( @@ -64,7 +65,7 @@ class DomainSecurityEmailForm(forms.Form): """Form for adding or editing a security email to a domain.""" - security_email = forms.EmailField(label="Security email") + security_email = forms.EmailField(label="Security email", required=False) class DomainOrgNameAddressForm(forms.ModelForm): diff --git a/src/registrar/management/commands/load_transition_domain.py b/src/registrar/management/commands/load_transition_domain.py new file mode 100644 index 000000000..206589c33 --- /dev/null +++ b/src/registrar/management/commands/load_transition_domain.py @@ -0,0 +1,524 @@ +import sys +import csv +import logging +import argparse + +from collections import defaultdict + +from django.core.management import BaseCommand + +from registrar.models import TransitionDomain + +logger = logging.getLogger(__name__) + + +class termColors: + """Colors for terminal outputs + (makes reading the logs WAY easier)""" + + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + YELLOW = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + BackgroundLightYellow = "\033[103m" + + +def query_yes_no(question: str, default="yes") -> bool: + """Ask a yes/no question via raw_input() and return their answer. + + "question" is a string that is presented to the user. + "default" is the presumed answer if the user just hits . + It must be "yes" (the default), "no" or None (meaning + an answer is required of the user). + + The "answer" return value is True for "yes" or False for "no". + """ + valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} + if default is None: + prompt = " [y/n] " + elif default == "yes": + prompt = " [Y/n] " + elif default == "no": + prompt = " [y/N] " + else: + raise ValueError("invalid default answer: '%s'" % default) + + while True: + logger.info(question + prompt) + choice = input().lower() + if default is not None and choice == "": + return valid[default] + elif choice in valid: + return valid[choice] + else: + logger.info("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n") + + +class Command(BaseCommand): + help = """Loads data for domains that are in transition + (populates transition_domain model objects).""" + + def add_arguments(self, parser): + """Add our three filename arguments (in order: domain contacts, + contacts, and domain statuses) + OPTIONAL ARGUMENTS: + --sep + The default delimiter is set to "|", but may be changed using --sep + --debug + A boolean (default to true), which activates additional print statements + --limitParse + Used to set a limit for the number of data entries to insert. Set to 0 + (or just don't use this argument) to parse every entry. + --resetTable + Use this to trigger a prompt for deleting all table entries. Useful + for testing purposes, but USE WITH CAUTION + """ + parser.add_argument( + "domain_contacts_filename", help="Data file with domain contact information" + ) + parser.add_argument( + "contacts_filename", + help="Data file with contact information", + ) + parser.add_argument( + "domain_statuses_filename", help="Data file with domain status information" + ) + + parser.add_argument("--sep", default="|", help="Delimiter character") + + parser.add_argument("--debug", action=argparse.BooleanOptionalAction) + + parser.add_argument( + "--limitParse", default=0, help="Sets max number of entries to load" + ) + + parser.add_argument( + "--resetTable", + help="Deletes all data in the TransitionDomain table", + action=argparse.BooleanOptionalAction, + ) + + def print_debug_mode_statements( + self, debug_on: bool, debug_max_entries_to_parse: int + ): + """Prints additional terminal statements to indicate if --debug + or --limitParse are in use""" + if debug_on: + logger.info( + f"""{termColors.OKCYAN} + ----------DEBUG MODE ON---------- + Detailed print statements activated. + {termColors.ENDC} + """ + ) + if debug_max_entries_to_parse > 0: + logger.info( + f"""{termColors.OKCYAN} + ----------LIMITER ON---------- + Parsing of entries will be limited to + {debug_max_entries_to_parse} lines per file.") + Detailed print statements activated. + {termColors.ENDC} + """ + ) + + def get_domain_user_dict( + self, domain_statuses_filename: str, sep: str + ) -> defaultdict[str, str]: + """Creates a mapping of domain name -> status""" + domain_status_dictionary = defaultdict(str) + logger.info("Reading domain statuses data file %s", domain_statuses_filename) + with open(domain_statuses_filename, "r") as domain_statuses_file: # noqa + for row in csv.reader(domain_statuses_file, delimiter=sep): + domainName = row[0].lower() + domainStatus = row[1].lower() + domain_status_dictionary[domainName] = domainStatus + logger.info("Loaded statuses for %d domains", len(domain_status_dictionary)) + return domain_status_dictionary + + def get_user_emails_dict( + self, contacts_filename: str, sep + ) -> defaultdict[str, str]: + """Creates mapping of userId -> emails""" + user_emails_dictionary = defaultdict(str) + logger.info("Reading domain-contacts data file %s", contacts_filename) + with open(contacts_filename, "r") as contacts_file: + for row in csv.reader(contacts_file, delimiter=sep): + user_id = row[0] + user_email = row[6] + user_emails_dictionary[user_id] = user_email + logger.info("Loaded emails for %d users", len(user_emails_dictionary)) + return user_emails_dictionary + + def get_mapped_status(self, status_to_map: str): + """ + Given a verisign domain status, return a corresponding + status defined for our domains. + + We map statuses as follows; + "serverHold” fields will map to hold, clientHold to hold + and any ok state should map to Ready. + """ + status_maps = { + "hold": TransitionDomain.StatusChoices.ON_HOLD, + "serverhold": TransitionDomain.StatusChoices.ON_HOLD, + "clienthold": TransitionDomain.StatusChoices.ON_HOLD, + "created": TransitionDomain.StatusChoices.READY, + "ok": TransitionDomain.StatusChoices.READY, + } + mapped_status = status_maps.get(status_to_map) + return mapped_status + + def print_summary_duplications( + self, + duplicate_domain_user_combos: list[TransitionDomain], + duplicate_domains: list[TransitionDomain], + users_without_email: list[str], + ): + """Called at the end of the script execution to print out a summary of + data anomalies in the imported Verisign data. Currently, we check for: + - duplicate domains + - duplicate domain - user pairs + - any users without e-mails (this would likely only happen if the contacts + file is missing a user found in the domain_contacts file) + """ + total_duplicate_pairs = len(duplicate_domain_user_combos) + total_duplicate_domains = len(duplicate_domains) + total_users_without_email = len(users_without_email) + if total_users_without_email > 0: + users_without_email_as_string = "{}".format( + ", ".join(map(str, duplicate_domain_user_combos)) + ) + logger.warning( + f"{termColors.YELLOW} No e-mails found for users: {users_without_email_as_string}" # noqa + ) + if total_duplicate_pairs > 0 or total_duplicate_domains > 0: + duplicate_pairs_as_string = "{}".format( + ", ".join(map(str, duplicate_domain_user_combos)) + ) + duplicate_domains_as_string = "{}".format( + ", ".join(map(str, duplicate_domains)) + ) + logger.warning( + f"""{termColors.YELLOW} + + ----DUPLICATES FOUND----- + + {total_duplicate_pairs} DOMAIN - USER pairs + were NOT unique in the supplied data files; + + {duplicate_pairs_as_string} + + {total_duplicate_domains} DOMAINS were NOT unique in + the supplied data files; + + {duplicate_domains_as_string} + {termColors.ENDC}""" + ) + + def print_summary_status_findings( + self, domains_without_status: list[str], outlier_statuses: list[str] + ): + """Called at the end of the script execution to print out a summary of + status anomolies in the imported Verisign data. Currently, we check for: + - domains without a status + - any statuses not accounted for in our status mappings (see + get_mapped_status() function) + """ + total_domains_without_status = len(domains_without_status) + total_outlier_statuses = len(outlier_statuses) + if total_domains_without_status > 0: + domains_without_status_as_string = "{}".format( + ", ".join(map(str, domains_without_status)) + ) + logger.warning( + f"""{termColors.YELLOW} + + -------------------------------------------- + Found {total_domains_without_status} domains + without a status (defaulted to READY) + --------------------------------------------- + + {domains_without_status_as_string} + {termColors.ENDC}""" + ) + + if total_outlier_statuses > 0: + domains_without_status_as_string = "{}".format( + ", ".join(map(str, outlier_statuses)) + ) # noqa + logger.warning( + f"""{termColors.YELLOW} + + -------------------------------------------- + Found {total_outlier_statuses} unaccounted + for statuses- + -------------------------------------------- + + No mappings found for the following statuses + (defaulted to Ready): + + {domains_without_status_as_string} + {termColors.ENDC}""" + ) + + def print_debug(self, print_condition: bool, print_statement: str): + """This function reduces complexity of debug statements + in other functions. + It uses the logger to write the given print_statement to the + terminal if print_condition is TRUE""" + # DEBUG: + if print_condition: + logger.info(print_statement) + + def prompt_table_reset(self): + """Brings up a prompt in the terminal asking + if the user wishes to delete data in the + TransitionDomain table. If the user confirms, + deletes all the data in the TransitionDomain table""" + confirm_reset = query_yes_no( + f""" + {termColors.FAIL} + WARNING: Resetting the table will permanently delete all + the data! + Are you sure you want to continue?{termColors.ENDC}""" + ) + if confirm_reset: + logger.info( + f"""{termColors.YELLOW} + ----------Clearing Table Data---------- + (please wait) + {termColors.ENDC}""" + ) + TransitionDomain.objects.all().delete() + + def handle( # noqa: C901 + self, + domain_contacts_filename, + contacts_filename, + domain_statuses_filename, + **options, + ): + """Parse the data files and create TransitionDomains.""" + sep = options.get("sep") + + # If --resetTable was used, prompt user to confirm + # deletion of table data + if options.get("resetTable"): + self.prompt_table_reset() + + # Get --debug argument + debug_on = options.get("debug") + + # Get --LimitParse argument + debug_max_entries_to_parse = int( + options.get("limitParse") + ) # set to 0 to parse all entries + + # print message to terminal about which args are in use + self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse) + + # STEP 1: + # Create mapping of domain name -> status + domain_status_dictionary = self.get_domain_user_dict( + domain_statuses_filename, sep + ) + + # STEP 2: + # Create mapping of userId -> email + user_emails_dictionary = self.get_user_emails_dict(contacts_filename, sep) + + # STEP 3: + # Parse the domain_contacts file and create TransitionDomain objects, + # using the dictionaries from steps 1 & 2 to lookup needed information. + + to_create = [] + + # keep track of statuses that don't match our available + # status values + outlier_statuses = [] + + # keep track of domains that have no known status + domains_without_status = [] + + # keep track of users that have no e-mails + users_without_email = [] + + # keep track of duplications.. + duplicate_domains = [] + duplicate_domain_user_combos = [] + + # keep track of domains we ADD or UPDATE + total_updated_domain_entries = 0 + total_new_entries = 0 + + # if we are limiting our parse (for testing purposes, keep + # track of total rows parsed) + total_rows_parsed = 0 + + # Start parsing the main file and create TransitionDomain objects + logger.info("Reading domain-contacts data file %s", domain_contacts_filename) + with open(domain_contacts_filename, "r") as domain_contacts_file: + for row in csv.reader(domain_contacts_file, delimiter=sep): + total_rows_parsed += 1 + + # fields are just domain, userid, role + # lowercase the domain names + new_entry_domain_name = row[0].lower() + user_id = row[1] + + new_entry_status = TransitionDomain.StatusChoices.READY + new_entry_email = "" + new_entry_emailSent = False # set to False by default + + # PART 1: Get the status + if new_entry_domain_name not in domain_status_dictionary: + # This domain has no status...default to "Create" + # (For data analysis purposes, add domain name + # to list of all domains without status + # (avoid duplicate entries)) + if new_entry_domain_name not in domains_without_status: + domains_without_status.append(new_entry_domain_name) + else: + # Map the status + original_status = domain_status_dictionary[new_entry_domain_name] + mapped_status = self.get_mapped_status(original_status) + if mapped_status is None: + # (For data analysis purposes, check for any statuses + # that don't have a mapping and add to list + # of "outlier statuses") + logger.info("Unknown status: " + original_status) + outlier_statuses.append(original_status) + else: + new_entry_status = mapped_status + + # PART 2: Get the e-mail + if user_id not in user_emails_dictionary: + # this user has no e-mail...this should never happen + if user_id not in users_without_email: + users_without_email.append(user_id) + else: + new_entry_email = user_emails_dictionary[user_id] + + # PART 3: Create the transition domain object + # Check for duplicate data in the file we are + # parsing so we do not add duplicates + # NOTE: Currently, we allow duplicate domains, + # but not duplicate domain-user pairs. + # However, track duplicate domains for now, + # since we are still deciding on whether + # to make this field unique or not. ~10/25/2023 + existing_domain = next( + (x for x in to_create if x.domain_name == new_entry_domain_name), + None, + ) + existing_domain_user_pair = next( + ( + x + for x in to_create + if x.username == new_entry_email + and x.domain_name == new_entry_domain_name + ), + None, + ) + if existing_domain is not None: + # DEBUG: + self.print_debug( + debug_on, + f"{termColors.YELLOW} DUPLICATE file entries found for domain: {new_entry_domain_name} {termColors.ENDC}", # noqa + ) + if new_entry_domain_name not in duplicate_domains: + duplicate_domains.append(new_entry_domain_name) + if existing_domain_user_pair is not None: + # DEBUG: + self.print_debug( + debug_on, + f"""{termColors.YELLOW} DUPLICATE file entries found for domain - user {termColors.BackgroundLightYellow} PAIR {termColors.ENDC}{termColors.YELLOW}: + {new_entry_domain_name} - {new_entry_email} {termColors.ENDC}""", # noqa + ) + if existing_domain_user_pair not in duplicate_domain_user_combos: + duplicate_domain_user_combos.append(existing_domain_user_pair) + else: + entry_exists = TransitionDomain.objects.filter( + username=new_entry_email, domain_name=new_entry_domain_name + ).exists() + if entry_exists: + try: + existing_entry = TransitionDomain.objects.get( + username=new_entry_email, + domain_name=new_entry_domain_name, + ) + + if existing_entry.status != new_entry_status: + # DEBUG: + self.print_debug( + debug_on, + f"{termColors.OKCYAN}" + f"Updating entry: {existing_entry}" + f"Status: {existing_entry.status} > {new_entry_status}" # noqa + f"Email Sent: {existing_entry.email_sent} > {new_entry_emailSent}" # noqa + f"{termColors.ENDC}", + ) + existing_entry.status = new_entry_status + existing_entry.email_sent = new_entry_emailSent + existing_entry.save() + except TransitionDomain.MultipleObjectsReturned: + logger.info( + f"{termColors.FAIL}" + f"!!! ERROR: duplicate entries exist in the" + f"transtion_domain table for domain:" + f"{new_entry_domain_name}" + f"----------TERMINATING----------" + ) + sys.exit() + else: + # no matching entry, make one + new_entry = TransitionDomain( + username=new_entry_email, + domain_name=new_entry_domain_name, + status=new_entry_status, + email_sent=new_entry_emailSent, + ) + to_create.append(new_entry) + total_new_entries += 1 + + # DEBUG: + self.print_debug( + debug_on, + f"{termColors.OKCYAN} Adding entry {total_new_entries}: {new_entry} {termColors.ENDC}", # noqa + ) + + # Check Parse limit and exit loop if needed + if ( + total_rows_parsed >= debug_max_entries_to_parse + and debug_max_entries_to_parse != 0 + ): + logger.info( + f"{termColors.YELLOW}" + f"----PARSE LIMIT REACHED. HALTING PARSER.----" + f"{termColors.ENDC}" + ) + break + + TransitionDomain.objects.bulk_create(to_create) + + logger.info( + f"""{termColors.OKGREEN} + ============= FINISHED =============== + Created {total_new_entries} transition domain entries, + updated {total_updated_domain_entries} transition domain entries + {termColors.ENDC} + """ + ) + + # Print a summary of findings (duplicate entries, + # missing data..etc.) + self.print_summary_duplications( + duplicate_domain_user_combos, duplicate_domains, users_without_email + ) + self.print_summary_status_findings(domains_without_status, outlier_statuses) diff --git a/src/registrar/management/commands/transfer_transition_domains_to_domains.py b/src/registrar/management/commands/transfer_transition_domains_to_domains.py new file mode 100644 index 000000000..b98e8e2a9 --- /dev/null +++ b/src/registrar/management/commands/transfer_transition_domains_to_domains.py @@ -0,0 +1,409 @@ +import logging +import argparse +import sys + +from django_fsm import TransitionNotAllowed # type: ignore + +from django.core.management import BaseCommand + +from registrar.models import TransitionDomain +from registrar.models import Domain +from registrar.models import DomainInvitation + +logger = logging.getLogger(__name__) + + +class termColors: + """Colors for terminal outputs + (makes reading the logs WAY easier)""" + + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + YELLOW = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + BackgroundLightYellow = "\033[103m" + + +class Command(BaseCommand): + help = """Load data from transition domain tables + into main domain tables. Also create domain invitation + entries for every domain we ADD (but not for domains + we UPDATE)""" + + def add_arguments(self, parser): + parser.add_argument("--debug", action=argparse.BooleanOptionalAction) + + parser.add_argument( + "--limitParse", + default=0, + help="Sets max number of entries to load, set to 0 to load all entries", + ) + + def print_debug_mode_statements( + self, debug_on: bool, debug_max_entries_to_parse: int + ): + """Prints additional terminal statements to indicate if --debug + or --limitParse are in use""" + self.print_debug( + debug_on, + f"""{termColors.OKCYAN} + ----------DEBUG MODE ON---------- + Detailed print statements activated. + {termColors.ENDC} + """, + ) + self.print_debug( + debug_max_entries_to_parse > 0, + f"""{termColors.OKCYAN} + ----------LIMITER ON---------- + Parsing of entries will be limited to + {debug_max_entries_to_parse} lines per file.") + Detailed print statements activated. + {termColors.ENDC} + """, + ) + + def print_debug(self, print_condition: bool, print_statement: str): + """This function reduces complexity of debug statements + in other functions. + It uses the logger to write the given print_statement to the + terminal if print_condition is TRUE""" + # DEBUG: + if print_condition: + logger.info(print_statement) + + def update_domain_status( + self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool + ) -> bool: + """Given a transition domain that matches an existing domain, + updates the existing domain object with that status of + the transition domain. + Returns TRUE if an update was made. FALSE if the states + matched and no update was made""" + + transition_domain_status = transition_domain.status + existing_status = target_domain.state + if transition_domain_status != existing_status: + if transition_domain_status == TransitionDomain.StatusChoices.ON_HOLD: + target_domain.place_client_hold(ignoreEPP=True) + else: + target_domain.revert_client_hold(ignoreEPP=True) + target_domain.save() + + # DEBUG: + self.print_debug( + debug_on, + f"""{termColors.YELLOW} + >> Updated {target_domain.name} state from + '{existing_status}' to '{target_domain.state}' + (no domain invitation entry added) + {termColors.ENDC}""", + ) + return True + return False + + def print_summary_of_findings( + self, + domains_to_create, + updated_domain_entries, + domain_invitations_to_create, + skipped_domain_entries, + debug_on, + ): + """Prints to terminal a summary of findings from + transferring transition domains to domains""" + + total_new_entries = len(domains_to_create) + total_updated_domain_entries = len(updated_domain_entries) + total_domain_invitation_entries = len(domain_invitations_to_create) + + logger.info( + f"""{termColors.OKGREEN} + ============= FINISHED =============== + Created {total_new_entries} transition domain entries, + Updated {total_updated_domain_entries} transition domain entries + + Created {total_domain_invitation_entries} domain invitation entries + (NOTE: no invitations are SENT in this script) + {termColors.ENDC} + """ + ) + if len(skipped_domain_entries) > 0: + logger.info( + f"""{termColors.FAIL} + ============= SKIPPED DOMAINS (ERRORS) =============== + {skipped_domain_entries} + {termColors.ENDC} + """ + ) + + # determine domainInvitations we SKIPPED + skipped_domain_invitations = [] + for domain in domains_to_create: + skipped_domain_invitations.append(domain) + for domain_invite in domain_invitations_to_create: + if domain_invite.domain in skipped_domain_invitations: + skipped_domain_invitations.remove(domain_invite.domain) + if len(skipped_domain_invitations) > 0: + logger.info( + f"""{termColors.FAIL} + ============= SKIPPED DOMAIN INVITATIONS (ERRORS) =============== + {skipped_domain_invitations} + {termColors.ENDC} + """ + ) + + # DEBUG: + self.print_debug( + debug_on, + f"""{termColors.YELLOW} + + Created Domains: + {domains_to_create} + + Updated Domains: + {updated_domain_entries} + + {termColors.ENDC} + """, + ) + + def try_add_domain_invitation( + self, domain_email: str, associated_domain: Domain + ) -> DomainInvitation | None: + """If no domain invitation exists for the given domain and + e-mail, create and return a new domain invitation object. + If one already exists, or if the email is invalid, return NONE""" + + # this should never happen, but adding it just in case + if associated_domain is None: + logger.warning( + f""" + {termColors.FAIL} + !!! ERROR: Domain cannot be null for a + Domain Invitation object! + + RECOMMENDATION: + Somehow, an empty domain object is + being passed to the subroutine in charge + of making domain invitations. Walk through + the code to see what is amiss. + + ----------TERMINATING----------""" + ) + sys.exit() + + # check that the given e-mail is valid + if domain_email is not None and domain_email != "": + # check that a domain invitation doesn't already + # exist for this e-mail / Domain pair + domain_email_already_in_domain_invites = DomainInvitation.objects.filter( + email=domain_email.lower(), domain=associated_domain + ).exists() + if not domain_email_already_in_domain_invites: + # Create new domain invitation + new_domain_invitation = DomainInvitation( + email=domain_email.lower(), domain=associated_domain + ) + return new_domain_invitation + return None + + def handle( + self, + **options, + ): + """Parse entries in TransitionDomain table + and create (or update) corresponding entries in the + Domain and DomainInvitation tables.""" + + # grab command line arguments and store locally... + debug_on = options.get("debug") + debug_max_entries_to_parse = int( + options.get("limitParse") + ) # set to 0 to parse all entries + + self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse) + + # domains to ADD + domains_to_create = [] + domain_invitations_to_create = [] + # domains we UPDATED + updated_domain_entries = [] + # domains we SKIPPED + skipped_domain_entries = [] + # if we are limiting our parse (for testing purposes, keep + # track of total rows parsed) + total_rows_parsed = 0 + + logger.info( + f"""{termColors.OKGREEN} + ========================== + Beginning Data Transfer + ========================== + {termColors.ENDC}""" + ) + + for transition_domain in TransitionDomain.objects.all(): + transition_domain_name = transition_domain.domain_name + transition_domain_status = transition_domain.status + transition_domain_email = transition_domain.username + + # DEBUG: + self.print_debug( + debug_on, + f"""{termColors.OKCYAN} + Processing Transition Domain: {transition_domain_name}, {transition_domain_status}, {transition_domain_email} + {termColors.ENDC}""", # noqa + ) + + new_domain_invitation = None + # Check for existing domain entry + domain_exists = Domain.objects.filter(name=transition_domain_name).exists() + if domain_exists: + try: + # get the existing domain + domain_to_update = Domain.objects.get(name=transition_domain_name) + # DEBUG: + self.print_debug( + debug_on, + f"""{termColors.YELLOW} + > Found existing entry in Domain table for: {transition_domain_name}, {domain_to_update.state} + {termColors.ENDC}""", # noqa + ) + + # for existing entry, update the status to + # the transition domain status + update_made = self.update_domain_status( + transition_domain, domain_to_update, debug_on + ) + if update_made: + # keep track of updated domains for data analysis purposes + updated_domain_entries.append(transition_domain.domain_name) + + # check if we need to add a domain invitation + # (eg. for a new user) + new_domain_invitation = self.try_add_domain_invitation( + transition_domain_email, domain_to_update + ) + + except Domain.MultipleObjectsReturned: + # This exception was thrown once before during testing. + # While the circumstances that led to corrupt data in + # the domain table was a freak accident, and the possibility of it + # happening again is safe-guarded by a key constraint, + # better to keep an eye out for it since it would require + # immediate attention. + logger.warning( + f""" + {termColors.FAIL} + !!! ERROR: duplicate entries already exist in the + Domain table for the following domain: + {transition_domain_name} + + RECOMMENDATION: + This means the Domain table is corrupt. Please + check the Domain table data as there should be a key + constraint which prevents duplicate entries. + + ----------TERMINATING----------""" + ) + sys.exit() + except TransitionNotAllowed as err: + skipped_domain_entries.append(transition_domain_name) + logger.warning( + f"""{termColors.FAIL} + Unable to change state for {transition_domain_name} + + RECOMMENDATION: + This indicates there might have been changes to the + Domain model which were not accounted for in this + migration script. Please check state change rules + in the Domain model and ensure we are following the + correct state transition pathways. + + INTERNAL ERROR MESSAGE: + 'TRANSITION NOT ALLOWED' exception + {err} + ----------SKIPPING----------""" + ) + else: + # no entry was found in the domain table + # for the given domain. Create a new entry. + + # first see if we are already adding an entry for this domain. + # The unique key constraint does not allow duplicate domain entries + # even if there are different users. + existing_domain_in_to_create = next( + (x for x in domains_to_create if x.name == transition_domain_name), + None, + ) + if existing_domain_in_to_create is not None: + self.print_debug( + debug_on, + f"""{termColors.YELLOW} + Duplicate Detected: {transition_domain_name}. + Cannot add duplicate entry for another username. + Violates Unique Key constraint. + + Checking for unique user e-mail for Domain Invitations... + {termColors.ENDC}""", + ) + new_domain_invitation = self.try_add_domain_invitation( + transition_domain_email, existing_domain_in_to_create + ) + else: + # no matching entry, make one + new_domain = Domain( + name=transition_domain_name, state=transition_domain_status + ) + domains_to_create.append(new_domain) + # DEBUG: + self.print_debug( + debug_on, + f"{termColors.OKCYAN} Adding domain: {new_domain} {termColors.ENDC}", # noqa + ) + new_domain_invitation = self.try_add_domain_invitation( + transition_domain_email, new_domain + ) + + if new_domain_invitation is None: + logger.info( + f"{termColors.YELLOW} ! No new e-mail detected !" # noqa + f"(SKIPPED ADDING DOMAIN INVITATION){termColors.ENDC}" + ) + else: + # DEBUG: + self.print_debug( + debug_on, + f"{termColors.OKCYAN} Adding domain invitation: {new_domain_invitation} {termColors.ENDC}", # noqa + ) + domain_invitations_to_create.append(new_domain_invitation) + + # Check parse limit and exit loop if parse limit has been reached + if ( + debug_max_entries_to_parse > 0 + and total_rows_parsed >= debug_max_entries_to_parse + ): + logger.info( + f"""{termColors.YELLOW} + ----PARSE LIMIT REACHED. HALTING PARSER.---- + {termColors.ENDC} + """ + ) + break + + Domain.objects.bulk_create(domains_to_create) + DomainInvitation.objects.bulk_create(domain_invitations_to_create) + + self.print_summary_of_findings( + domains_to_create, + updated_domain_entries, + domain_invitations_to_create, + skipped_domain_entries, + debug_on, + ) diff --git a/src/registrar/migrations/0037_create_groups_v01.py b/src/registrar/migrations/0037_create_groups_v01.py index 27a14f8b9..3540ea2f3 100644 --- a/src/registrar/migrations/0037_create_groups_v01.py +++ b/src/registrar/migrations/0037_create_groups_v01.py @@ -2,11 +2,14 @@ # It is dependent on 0035 (which populates ContentType and Permissions) # If permissions on the groups need changing, edit CISA_ANALYST_GROUP_PERMISSIONS # in the user_group model then: +# [NOT RECOMMENDED] # step 1: docker-compose exec app ./manage.py migrate --fake registrar 0035_contenttypes_permissions # step 2: docker-compose exec app ./manage.py migrate registrar 0036_create_groups # step 3: fake run the latest migration in the migrations list +# [RECOMMENDED] # Alternatively: -# Only step: duplicate the migtation that loads data and run: docker-compose exec app ./manage.py migrate +# step 1: duplicate the migration that loads data +# step 2: docker-compose exec app ./manage.py migrate from django.db import migrations from registrar.models import UserGroup diff --git a/src/registrar/migrations/0038_create_groups_v02.py b/src/registrar/migrations/0038_create_groups_v02.py new file mode 100644 index 000000000..fc61db3c0 --- /dev/null +++ b/src/registrar/migrations/0038_create_groups_v02.py @@ -0,0 +1,37 @@ +# This migration creates the create_full_access_group and create_cisa_analyst_group groups +# It is dependent on 0035 (which populates ContentType and Permissions) +# If permissions on the groups need changing, edit CISA_ANALYST_GROUP_PERMISSIONS +# in the user_group model then: +# [NOT RECOMMENDED] +# step 1: docker-compose exec app ./manage.py migrate --fake registrar 0035_contenttypes_permissions +# step 2: docker-compose exec app ./manage.py migrate registrar 0036_create_groups +# step 3: fake run the latest migration in the migrations list +# [RECOMMENDED] +# Alternatively: +# step 1: duplicate the migration that loads data +# step 2: docker-compose exec app ./manage.py migrate + +from django.db import migrations +from registrar.models import UserGroup +from typing import Any + + +# For linting: RunPython expects a function reference, +# so let's give it one +def create_groups(apps, schema_editor) -> Any: + UserGroup.create_cisa_analyst_group(apps, schema_editor) + UserGroup.create_full_access_group(apps, schema_editor) + + +class Migration(migrations.Migration): + dependencies = [ + ("registrar", "0037_create_groups_v01"), + ] + + operations = [ + migrations.RunPython( + create_groups, + reverse_code=migrations.RunPython.noop, + atomic=True, + ), + ] diff --git a/src/registrar/migrations/0039_alter_transitiondomain_status.py b/src/registrar/migrations/0039_alter_transitiondomain_status.py new file mode 100644 index 000000000..b6ac08770 --- /dev/null +++ b/src/registrar/migrations/0039_alter_transitiondomain_status.py @@ -0,0 +1,22 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("registrar", "0038_create_groups_v02"), + ] + + operations = [ + migrations.AlterField( + model_name="transitiondomain", + name="status", + field=models.CharField( + blank=True, + choices=[("ready", "Ready"), ("on hold", "On Hold")], + default="ready", + help_text="domain status during the transfer", + max_length=255, + verbose_name="Status", + ), + ), + ] diff --git a/src/registrar/models/domain.py b/src/registrar/models/domain.py index 1980bd087..ae9d80c25 100644 --- a/src/registrar/models/domain.py +++ b/src/registrar/models/domain.py @@ -1,12 +1,14 @@ from itertools import zip_longest import logging import inspect +import ipaddress +import re from datetime import date from string import digits from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore from django.db import models - +from typing import Any from epplibwrapper import ( CLIENT as registry, commands, @@ -16,7 +18,15 @@ from epplibwrapper import ( RegistryError, ErrorCode, ) -from registrar.models.utility.contact_error import ContactError + +from registrar.utility.errors import ( + ActionNotAllowed, + NameserverError, + NameserverErrorCodes as nsErrorCodes, +) + +from registrar.models.utility.contact_error import ContactError, ContactErrorCodes + from .utility.domain_field import DomainField from .utility.domain_helper import DomainHelper @@ -246,13 +256,13 @@ class Domain(TimeStampedModel, DomainHelper): raise NotImplementedError() @Cache - def nameservers(self) -> list[tuple[str]]: + def nameservers(self) -> list[tuple[str, list]]: """ Get or set a complete list of nameservers for this domain. Hosts are provided as a list of tuples, e.g. - [("ns1.example.com",), ("ns1.example.gov", "0.0.0.0")] + [("ns1.example.com",), ("ns1.example.gov", ["0.0.0.0"])] Subordinate hosts (something.your-domain.gov) MUST have IP addresses, while non-subordinate hosts MUST NOT. @@ -260,39 +270,21 @@ class Domain(TimeStampedModel, DomainHelper): try: hosts = self._get_property("hosts") except Exception as err: - # Don't throw error as this is normal for a new domain - # TODO - 433 error handling ticket should address this + # Do not raise error when missing nameservers + # this is a standard occurence when a domain + # is first created logger.info("Domain is missing nameservers %s" % err) return [] + # TODO-687 fix this return value hostList = [] for host in hosts: - # TODO - this should actually have a second tuple value with the ip address - # ignored because uncertain if we will even have a way to display mult. - # and adresses can be a list of mult address - hostList.append((host["name"],)) + hostList.append((host["name"], host["addrs"])) return hostList - def _check_host(self, hostnames: list[str]): - """check if host is available, True if available - returns boolean""" - checkCommand = commands.CheckHost(hostnames) - try: - response = registry.send(checkCommand, cleaned=True) - return response.res_data[0].avail - except RegistryError as err: - logger.warning( - "Couldn't check hosts %s. Errorcode was %s, error was %s", - hostnames, - err.code, - err, - ) - return False - def _create_host(self, host, addrs): - """Call _check_host first before using this function, - This creates the host object in the registry + """Creates the host object in the registry doesn't add the created host to the domain returns ErrorCode (int)""" logger.info("Creating host") @@ -310,6 +302,187 @@ class Domain(TimeStampedModel, DomainHelper): logger.error("Error _create_host, code was %s error was %s" % (e.code, e)) return e.code + def _convert_list_to_dict(self, listToConvert: list[tuple[str, list]]): + """converts a list of hosts into a dictionary + Args: + list[tuple[str, list]]: such as [("123",["1","2","3"])] + This is the list of hosts to convert + + returns: + convertDict (dict(str,list))- such as{"123":["1","2","3"]}""" + newDict: dict[str, Any] = {} + + for tup in listToConvert: + if len(tup) == 1: + newDict[tup[0]] = None + elif len(tup) == 2: + newDict[tup[0]] = tup[1] + return newDict + + def isSubdomain(self, nameserver: str): + """Returns boolean if the domain name is found in the argument passed""" + subdomain_pattern = r"([\w-]+\.)*" + full_pattern = subdomain_pattern + self.name + regex = re.compile(full_pattern) + return bool(regex.match(nameserver)) + + def checkHostIPCombo(self, nameserver: str, ip: list[str]): + """Checks the parameters past for a valid combination + raises error if: + - nameserver is a subdomain but is missing ip + - nameserver is not a subdomain but has ip + - nameserver is a subdomain but an ip passed is invalid + + Args: + hostname (str)- nameserver or subdomain + ip (list[str])-list of ip strings + Throws: + NameserverError (if exception hit) + Returns: + None""" + if self.isSubdomain(nameserver) and (ip is None or ip == []): + raise NameserverError(code=nsErrorCodes.MISSING_IP, nameserver=nameserver) + + elif not self.isSubdomain(nameserver) and (ip is not None and ip != []): + raise NameserverError( + code=nsErrorCodes.GLUE_RECORD_NOT_ALLOWED, nameserver=nameserver, ip=ip + ) + elif ip is not None and ip != []: + for addr in ip: + if not self._valid_ip_addr(addr): + raise NameserverError( + code=nsErrorCodes.INVALID_IP, nameserver=nameserver, ip=ip + ) + return None + + def _valid_ip_addr(self, ipToTest: str): + """returns boolean if valid ip address string + We currently only accept v4 or v6 ips + returns: + isValid (boolean)-True for valid ip address""" + try: + ip = ipaddress.ip_address(ipToTest) + return ip.version == 6 or ip.version == 4 + + except ValueError: + return False + + def getNameserverChanges( + self, hosts: list[tuple[str, list]] + ) -> tuple[list, list, dict, dict]: + """ + calls self.nameserver, it should pull from cache but may result + in an epp call + Args: + hosts: list[tuple[str, list]] such as [("123",["1","2","3"])] + Throws: + NameserverError (if exception hit) + Returns: + tuple[list, list, dict, dict] + These four tuple values as follows: + deleted_values: list[str] + updated_values: list[str] + new_values: dict(str,list) + prevHostDict: dict(str,list)""" + + oldNameservers = self.nameservers + + previousHostDict = self._convert_list_to_dict(oldNameservers) + + newHostDict = self._convert_list_to_dict(hosts) + deleted_values = [] + # TODO-currently a list of tuples, why not dict? for consistency + updated_values = [] + new_values = {} + + for prevHost in previousHostDict: + addrs = previousHostDict[prevHost] + # get deleted values-which are values in previous nameserver list + # but are not in the list of new host values + if prevHost not in newHostDict: + deleted_values.append(prevHost) + # if the host exists in both, check if the addresses changed + else: + # TODO - host is being updated when previous was None+new is empty list + # add check here + if newHostDict[prevHost] is not None and set( + newHostDict[prevHost] + ) != set(addrs): + self.checkHostIPCombo(nameserver=prevHost, ip=newHostDict[prevHost]) + updated_values.append((prevHost, newHostDict[prevHost])) + + new_values = { + key: newHostDict.get(key) + for key in newHostDict + if key not in previousHostDict and key.strip() != "" + } + + for nameserver, ip in new_values.items(): + self.checkHostIPCombo(nameserver=nameserver, ip=ip) + + return (deleted_values, updated_values, new_values, previousHostDict) + + def _update_host_values(self, updated_values, oldNameservers): + for hostTuple in updated_values: + updated_response_code = self._update_host( + hostTuple[0], hostTuple[1], oldNameservers.get(hostTuple[0]) + ) + if updated_response_code not in [ + ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY, + ErrorCode.OBJECT_EXISTS, + ]: + logger.warning( + "Could not update host %s. Error code was: %s " + % (hostTuple[0], updated_response_code) + ) + + def createNewHostList(self, new_values: dict): + """convert the dictionary of new values to a list of HostObjSet + for use in the UpdateDomain epp message + Args: + new_values: dict(str,list)- dict of {nameserver:ips} to add to domain + Returns: + tuple [list[epp.HostObjSet], int] + list[epp.HostObjSet]-epp object for use in the UpdateDomain epp message + defaults to empty list + int-number of items being created default 0 + """ + + hostStringList = [] + for key, value in new_values.items(): + createdCode = self._create_host( + host=key, addrs=value + ) # creates in registry + if ( + createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY + or createdCode == ErrorCode.OBJECT_EXISTS + ): + hostStringList.append(key) + if hostStringList == []: + return [], 0 + + addToDomainObject = epp.HostObjSet(hosts=hostStringList) + return [addToDomainObject], len(hostStringList) + + def createDeleteHostList(self, hostsToDelete: list[str]): + """ + Args: + hostsToDelete (list[str])- list of nameserver/host names to remove + Returns: + tuple [list[epp.HostObjSet], int] + list[epp.HostObjSet]-epp object for use in the UpdateDomain epp message + defaults to empty list + int-number of items being created default 0 + """ + deleteStrList = [] + for nameserver in hostsToDelete: + deleteStrList.append(nameserver) + if deleteStrList == []: + return [], 0 + deleteObj = epp.HostObjSet(hosts=hostsToDelete) + + return [deleteObj], len(deleteStrList) + @Cache def dnssecdata(self) -> extensions.DNSSECExtension: return self._get_property("dnssecdata") @@ -332,54 +505,62 @@ class Domain(TimeStampedModel, DomainHelper): raise e @nameservers.setter # type: ignore - def nameservers(self, hosts: list[tuple[str]]): - """host should be a tuple of type str, str,... where the elements are + def nameservers(self, hosts: list[tuple[str, list]]): + """Host should be a tuple of type str, str,... where the elements are Fully qualified host name, addresses associated with the host - example: [(ns1.okay.gov, 127.0.0.1, others ips)]""" - # TODO: ticket #848 finish this implementation - # must delete nameservers as well or update - # ip version checking may need to be added in a different ticket + example: [(ns1.okay.gov, [127.0.0.1, others ips])]""" if len(hosts) > 13: - raise ValueError( - "Too many hosts provided, you may not have more than 13 nameservers." - ) + raise NameserverError(code=nsErrorCodes.TOO_MANY_HOSTS) + + if self.state not in [self.State.DNS_NEEDED, self.State.READY]: + raise ActionNotAllowed("Nameservers can not be " "set in the current state") + logger.info("Setting nameservers") logger.info(hosts) - for hostTuple in hosts: - host = hostTuple[0] - addrs = None - if len(hostTuple) > 1: - addrs = hostTuple[1:] - avail = self._check_host([host]) - if avail: - createdCode = self._create_host(host=host, addrs=addrs) - # update the domain obj - if createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY: - # add host to domain - request = commands.UpdateDomain( - name=self.name, add=[epp.HostObjSet([host])] - ) + # get the changes made by user and old nameserver values + ( + deleted_values, + updated_values, + new_values, + oldNameservers, + ) = self.getNameserverChanges(hosts=hosts) - try: - registry.send(request, cleaned=True) - except RegistryError as e: - logger.error( - "Error adding nameserver, code was %s error was %s" - % (e.code, e) - ) + _ = self._update_host_values( + updated_values, oldNameservers + ) # returns nothing, just need to be run and errors + addToDomainList, addToDomainCount = self.createNewHostList(new_values) + deleteHostList, deleteCount = self.createDeleteHostList(deleted_values) + responseCode = self.addAndRemoveHostsFromDomain( + hostsToAdd=addToDomainList, hostsToDelete=deleteHostList + ) - try: - self.ready() - self.save() - except Exception as err: - logger.info( - "nameserver setter checked for create state " - "and it did not succeed. Error: %s" % err - ) - # TODO - handle removed nameservers here will need to change the state - # then go back to DNS_NEEDED + # if unable to update domain raise error and stop + if responseCode != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY: + raise NameserverError(code=nsErrorCodes.UNABLE_TO_UPDATE_DOMAIN) + + successTotalNameservers = len(oldNameservers) - deleteCount + addToDomainCount + + self._delete_hosts_if_not_used(hostsToDelete=deleted_values) + if successTotalNameservers < 2: + try: + self.dns_needed() + self.save() + except Exception as err: + logger.info( + "nameserver setter checked for dns_needed state " + "and it did not succeed. Warning: %s" % err + ) + elif successTotalNameservers >= 2 and successTotalNameservers <= 13: + try: + self.ready() + self.save() + except Exception as err: + logger.info( + "nameserver setter checked for create state " + "and it did not succeed. Warning: %s" % err + ) @Cache def statuses(self) -> list[str]: @@ -653,7 +834,10 @@ class Domain(TimeStampedModel, DomainHelper): def get_security_email(self): logger.info("get_security_email-> getting the contact ") secContact = self.security_contact - return secContact.email + if secContact is not None: + return secContact.email + else: + return None def clientHoldStatus(self): return epp.Status(state=self.Status.CLIENT_HOLD, description="", lang="en") @@ -726,10 +910,10 @@ class Domain(TimeStampedModel, DomainHelper): return None if contact_type is None: - raise ContactError("contact_type is None") + raise ContactError(code=ContactErrorCodes.CONTACT_TYPE_NONE) if contact_id is None: - raise ContactError("contact_id is None") + raise ContactError(code=ContactErrorCodes.CONTACT_ID_NONE) # Since contact_id is registry_id, # check that its the right length @@ -738,14 +922,10 @@ class Domain(TimeStampedModel, DomainHelper): contact_id_length > PublicContact.get_max_id_length() or contact_id_length < 1 ): - raise ContactError( - "contact_id is of invalid length. " - "Cannot exceed 16 characters, " - f"got {contact_id} with a length of {contact_id_length}" - ) + raise ContactError(code=ContactErrorCodes.CONTACT_ID_INVALID_LENGTH) if not isinstance(contact, eppInfo.InfoContactResultData): - raise ContactError("Contact must be of type InfoContactResultData") + raise ContactError(code=ContactErrorCodes.CONTACT_INVALID_TYPE) auth_info = contact.auth_info postal_info = contact.postal_info @@ -909,8 +1089,8 @@ class Domain(TimeStampedModel, DomainHelper): return self._handle_registrant_contact(desired_contact) - _registry_id: str - if contact_type in contacts: + _registry_id: str = "" + if contacts is not None and contact_type in contacts: _registry_id = contacts.get(contact_type) desired = PublicContact.objects.filter( @@ -992,7 +1172,7 @@ class Domain(TimeStampedModel, DomainHelper): if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST: # avoid infinite loop already_tried_to_create = True - self.pendingCreate() + self.dns_needed_from_unknown() self.save() else: logger.error(e) @@ -1006,7 +1186,7 @@ class Domain(TimeStampedModel, DomainHelper): return registrant.registry_id @transition(field="state", source=State.UNKNOWN, target=State.DNS_NEEDED) - def pendingCreate(self): + def dns_needed_from_unknown(self): logger.info("Changing to dns_needed") registrantID = self.addRegistrant() @@ -1039,20 +1219,29 @@ class Domain(TimeStampedModel, DomainHelper): @transition( field="state", source=[State.READY, State.ON_HOLD], target=State.ON_HOLD ) - def place_client_hold(self): - """place a clienthold on a domain (no longer should resolve)""" + def place_client_hold(self, ignoreEPP=False): + """place a clienthold on a domain (no longer should resolve) + ignoreEPP (boolean) - set to true to by-pass EPP (used for transition domains) + """ # TODO - ensure all requirements for client hold are made here # (check prohibited statuses) logger.info("clientHold()-> inside clientHold") - self._place_client_hold() + + # In order to allow transition domains to by-pass EPP calls, + # include this ignoreEPP flag + if not ignoreEPP: + self._place_client_hold() # TODO -on the client hold ticket any additional error handling here @transition(field="state", source=[State.READY, State.ON_HOLD], target=State.READY) - def revert_client_hold(self): - """undo a clienthold placed on a domain""" + def revert_client_hold(self, ignoreEPP=False): + """undo a clienthold placed on a domain + ignoreEPP (boolean) - set to true to by-pass EPP (used for transition domains) + """ logger.info("clientHold()-> inside clientHold") - self._remove_client_hold() + if not ignoreEPP: + self._remove_client_hold() # TODO -on the client hold ticket any additional error handling here @transition( @@ -1082,26 +1271,54 @@ class Domain(TimeStampedModel, DomainHelper): else: self._invalidate_cache() + # def is_dns_needed(self): + # """Commented out and kept in the codebase + # as this call should be made, but adds + # a lot of processing time + # when EPP calling is made more efficient + # this should be added back in + + # The goal is to double check that + # the nameservers we set are in fact + # on the registry + # """ + # self._invalidate_cache() + # nameserverList = self.nameservers + # return len(nameserverList) < 2 + + # def dns_not_needed(self): + # return not self.is_dns_needed() + @transition( field="state", source=[State.DNS_NEEDED], target=State.READY, + # conditions=[dns_not_needed] ) def ready(self): """Transition to the ready state domain should have nameservers and all contacts and now should be considered live on a domain """ - # TODO - in nameservers tickets 848 and 562 - # check here if updates need to be made - # consider adding these checks as constraints - # within the transistion itself - nameserverList = self.nameservers logger.info("Changing to ready state") - if len(nameserverList) < 2 or len(nameserverList) > 13: - raise ValueError("Not ready to become created, cannot transition yet") logger.info("able to transition to ready state") + @transition( + field="state", + source=[State.READY], + target=State.DNS_NEEDED, + # conditions=[is_dns_needed] + ) + def dns_needed(self): + """Transition to the DNS_NEEDED state + domain should NOT have nameservers but + SHOULD have all contacts + Going to check nameservers and will + result in an EPP call + """ + logger.info("Changing to DNS_NEEDED state") + logger.info("able to transition to DNS_NEEDED state") + def _disclose_fields(self, contact: PublicContact): """creates a disclose object that can be added to a contact Create using .disclose= on the command before sending. @@ -1227,6 +1444,10 @@ class Domain(TimeStampedModel, DomainHelper): raise e + def is_ipv6(self, ip: str): + ip_addr = ipaddress.ip_address(ip) + return ip_addr.version == 6 + def _fetch_hosts(self, host_data): """Fetch host info.""" hosts = [] @@ -1244,11 +1465,131 @@ class Domain(TimeStampedModel, DomainHelper): hosts.append({k: v for k, v in host.items() if v is not ...}) return hosts - def _update_or_create_host(self, host): - raise NotImplementedError() + def _convert_ips(self, ip_list: list[str]): + """Convert Ips to a list of epp.Ip objects + use when sending update host command. + if there are no ips an empty list will be returned - def _delete_host(self, host): - raise NotImplementedError() + Args: + ip_list (list[str]): the new list of ips, may be empty + Returns: + edited_ip_list (list[epp.Ip]): list of epp.ip objects ready to + be sent to the registry + """ + edited_ip_list = [] + if ip_list is None: + return [] + + for ip_addr in ip_list: + if self.is_ipv6(ip_addr): + edited_ip_list.append(epp.Ip(addr=ip_addr, ip="v6")) + else: # default ip addr is v4 + edited_ip_list.append(epp.Ip(addr=ip_addr)) + + return edited_ip_list + + def _update_host(self, nameserver: str, ip_list: list[str], old_ip_list: list[str]): + """Update an existing host object in EPP. Sends the update host command + can result in a RegistryError + Args: + nameserver (str): nameserver or subdomain + ip_list (list[str]): the new list of ips, may be empty + old_ip_list (list[str]): the old ip list, may also be empty + + Returns: + errorCode (int): one of ErrorCode enum type values + + """ + try: + if ( + ip_list is None + or len(ip_list) == 0 + and isinstance(old_ip_list, list) + and len(old_ip_list) != 0 + ): + return ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY + + added_ip_list = set(ip_list).difference(old_ip_list) + removed_ip_list = set(old_ip_list).difference(ip_list) + + request = commands.UpdateHost( + name=nameserver, + add=self._convert_ips(list(added_ip_list)), + rem=self._convert_ips(list(removed_ip_list)), + ) + response = registry.send(request, cleaned=True) + logger.info("_update_host()-> sending req as %s" % request) + return response.code + except RegistryError as e: + logger.error("Error _update_host, code was %s error was %s" % (e.code, e)) + return e.code + + def addAndRemoveHostsFromDomain( + self, hostsToAdd: list[str], hostsToDelete: list[str] + ): + """sends an UpdateDomain message to the registry with the hosts provided + Args: + hostsToDelete (list[epp.HostObjSet])- list of host objects to delete + hostsToAdd (list[epp.HostObjSet])- list of host objects to add + Returns: + response code (int)- RegistryErrorCode integer value + defaults to return COMMAND_COMPLETED_SUCCESSFULLY + if there is nothing to add or delete + """ + + if hostsToAdd == [] and hostsToDelete == []: + return ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY + + try: + updateReq = commands.UpdateDomain( + name=self.name, rem=hostsToDelete, add=hostsToAdd + ) + + logger.info( + "addAndRemoveHostsFromDomain()-> sending update domain req as %s" + % updateReq + ) + response = registry.send(updateReq, cleaned=True) + + return response.code + except RegistryError as e: + logger.error( + "Error addAndRemoveHostsFromDomain, code was %s error was %s" + % (e.code, e) + ) + return e.code + + def _delete_hosts_if_not_used(self, hostsToDelete: list[str]): + """delete the host object in registry, + will only delete the host object, if it's not being used by another domain + Performs just the DeleteHost epp call + Supresses regstry error, as registry can disallow delete for various reasons + Args: + hostsToDelete (list[str])- list of nameserver/host names to remove + Returns: + None + + """ + try: + for nameserver in hostsToDelete: + deleteHostReq = commands.DeleteHost(name=nameserver) + registry.send(deleteHostReq, cleaned=True) + logger.info( + "_delete_hosts_if_not_used()-> sending delete host req as %s" + % deleteHostReq + ) + + except RegistryError as e: + if e.code == ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION: + logger.info( + "Did not remove host %s because it is in use on another domain." + % nameserver + ) + else: + logger.error( + "Error _delete_hosts_if_not_used, code was %s error was %s" + % (e.code, e) + ) def _fetch_cache(self, fetch_hosts=False, fetch_contacts=False): logger.info("fetch_cache called") diff --git a/src/registrar/models/transition_domain.py b/src/registrar/models/transition_domain.py index 203795925..232fd9033 100644 --- a/src/registrar/models/transition_domain.py +++ b/src/registrar/models/transition_domain.py @@ -5,7 +5,7 @@ from .utility.time_stamped_model import TimeStampedModel class StatusChoices(models.TextChoices): READY = "ready", "Ready" - HOLD = "hold", "Hold" + ON_HOLD = "on hold", "On Hold" class TransitionDomain(TimeStampedModel): @@ -13,6 +13,10 @@ class TransitionDomain(TimeStampedModel): state of a domain upon transition between registry providers""" + # This is necessary to expose the enum to external + # classes that import TransitionDomain + StatusChoices = StatusChoices + username = models.TextField( null=False, blank=False, diff --git a/src/registrar/models/user_group.py b/src/registrar/models/user_group.py index b6f5b41b2..5cdb1f2ec 100644 --- a/src/registrar/models/user_group.py +++ b/src/registrar/models/user_group.py @@ -51,6 +51,11 @@ class UserGroup(Group): "model": "user", "permissions": ["analyst_access_permission", "change_user"], }, + { + "app_label": "registrar", + "model": "domaininvitation", + "permissions": ["add_domaininvitation", "view_domaininvitation"], + }, ] # Avoid error: You can't execute queries until the end diff --git a/src/registrar/models/utility/contact_error.py b/src/registrar/models/utility/contact_error.py index 93084eca2..cf392cb6e 100644 --- a/src/registrar/models/utility/contact_error.py +++ b/src/registrar/models/utility/contact_error.py @@ -1,2 +1,51 @@ +from enum import IntEnum + + +class ContactErrorCodes(IntEnum): + """Used in the ContactError class for + error mapping. + + Overview of contact error codes: + - 2000 CONTACT_TYPE_NONE + - 2001 CONTACT_ID_NONE + - 2002 CONTACT_ID_INVALID_LENGTH + - 2003 CONTACT_INVALID_TYPE + """ + + CONTACT_TYPE_NONE = 2000 + CONTACT_ID_NONE = 2001 + CONTACT_ID_INVALID_LENGTH = 2002 + CONTACT_INVALID_TYPE = 2003 + CONTACT_NOT_FOUND = 2004 + + class ContactError(Exception): - ... + """ + Overview of contact error codes: + - 2000 CONTACT_TYPE_NONE + - 2001 CONTACT_ID_NONE + - 2002 CONTACT_ID_INVALID_LENGTH + - 2003 CONTACT_INVALID_TYPE + - 2004 CONTACT_NOT_FOUND + """ + + # For linter + _contact_id_error = "contact_id has an invalid length. Cannot exceed 16 characters." + _contact_invalid_error = "Contact must be of type InfoContactResultData" + _contact_not_found_error = "No contact was found in cache or the registry" + _error_mapping = { + ContactErrorCodes.CONTACT_TYPE_NONE: "contact_type is None", + ContactErrorCodes.CONTACT_ID_NONE: "contact_id is None", + ContactErrorCodes.CONTACT_ID_INVALID_LENGTH: _contact_id_error, + ContactErrorCodes.CONTACT_INVALID_TYPE: _contact_invalid_error, + ContactErrorCodes.CONTACT_NOT_FOUND: _contact_not_found_error, + } + + def __init__(self, *args, code=None, **kwargs): + super().__init__(*args, **kwargs) + self.code = code + if self.code in self._error_mapping: + self.message = self._error_mapping.get(self.code) + + def __str__(self): + return f"{self.message}" diff --git a/src/registrar/templates/django/admin/domain_change_form.html b/src/registrar/templates/django/admin/domain_change_form.html index ac26fc922..2ed3d7532 100644 --- a/src/registrar/templates/django/admin/domain_change_form.html +++ b/src/registrar/templates/django/admin/domain_change_form.html @@ -13,10 +13,10 @@ {% elif original.state == original.State.ON_HOLD %} {% endif %} - - + + {% if original.state != original.State.DELETED %} - + {% endif %} {{ block.super }} diff --git a/src/registrar/templates/domain_detail.html b/src/registrar/templates/domain_detail.html index 6a700b393..bcf775fe5 100644 --- a/src/registrar/templates/domain_detail.html +++ b/src/registrar/templates/domain_detail.html @@ -46,8 +46,11 @@ {% include "includes/summary_item.html" with title='Your contact information' value=request.user.contact contact='true' edit_link=url %} {% url 'domain-security-email' pk=domain.id as url %} - {% include "includes/summary_item.html" with title='Security email' value=domain.security_email edit_link=url %} - + {% if security_email is not None and security_email != default_security_email%} + {% include "includes/summary_item.html" with title='Security email' value=security_email edit_link=url %} + {% else %} + {% include "includes/summary_item.html" with title='Security email' value='None provided' edit_link=url %} + {% endif %} {% url 'domain-users' pk=domain.id as url %} {% include "includes/summary_item.html" with title='User management' users='true' list=True value=domain.permissions.all edit_link=url %} diff --git a/src/registrar/templates/domain_nameservers.html b/src/registrar/templates/domain_nameservers.html index 2dabac1af..a7371ee0b 100644 --- a/src/registrar/templates/domain_nameservers.html +++ b/src/registrar/templates/domain_nameservers.html @@ -1,7 +1,7 @@ {% extends "domain_base.html" %} {% load static field_helpers%} -{% block title %}Domain name servers | {{ domain.name }} | {% endblock %} +{% block title %}DNS name servers | {{ domain.name }} | {% endblock %} {% block domain_content %} {# this is right after the messages block in the parent template #} @@ -9,7 +9,7 @@ {% include "includes/form_errors.html" with form=form %} {% endfor %} -

Domain name servers

+

DNS name servers

Before your domain can be used we'll need information about your domain name servers.

diff --git a/src/registrar/templates/domain_security_email.html b/src/registrar/templates/domain_security_email.html index 8175fa394..8fb0ccfb0 100644 --- a/src/registrar/templates/domain_security_email.html +++ b/src/registrar/templates/domain_security_email.html @@ -1,18 +1,16 @@ {% extends "domain_base.html" %} {% load static field_helpers url_helpers %} -{% block title %}Domain security email | {{ domain.name }} | {% endblock %} +{% block title %}Security email | {{ domain.name }} | {% endblock %} {% block domain_content %} -

Domain security email

+

Security email

We strongly recommend that you provide a security email. This email will allow the public to report observed or suspected security issues on your domain. Security emails are made public and included in the .gov domain data we provide.

A security contact should be capable of evaluating or triaging security reports for your entire domain. Use a team email address, not an individual’s email. We recommend using an alias, like security@domain.gov.

- {% include "includes/required_fields.html" %} -
{% csrf_token %} diff --git a/src/registrar/templates/domain_your_contact_information.html b/src/registrar/templates/domain_your_contact_information.html index 81c62584c..e2cad735f 100644 --- a/src/registrar/templates/domain_your_contact_information.html +++ b/src/registrar/templates/domain_your_contact_information.html @@ -1,11 +1,11 @@ {% extends "domain_base.html" %} {% load static field_helpers %} -{% block title %}Domain contact information | {{ domain.name }} | {% endblock %} +{% block title %}Your contact information | {{ domain.name }} | {% endblock %} {% block domain_content %} -

Domain contact information

+

Your contact information

If you’d like us to use a different name, email, or phone number you can make those changes below. Updating your contact information here will update the contact information for all domains in your account. However, it won’t affect your Login.gov account information.

diff --git a/src/registrar/templates/includes/summary_item.html b/src/registrar/templates/includes/summary_item.html index a2035b227..6fcad0650 100644 --- a/src/registrar/templates/includes/summary_item.html +++ b/src/registrar/templates/includes/summary_item.html @@ -43,6 +43,7 @@ {% else %} {% include "includes/contact.html" with contact=value %} {% endif %} + {% elif list %} {% if value|length == 1 %} {% if users %} diff --git a/src/registrar/tests/common.py b/src/registrar/tests/common.py index 3eddfbbcd..b8fea7f93 100644 --- a/src/registrar/tests/common.py +++ b/src/registrar/tests/common.py @@ -571,6 +571,8 @@ class MockEppLib(TestCase): contacts=..., hosts=..., statuses=..., + avail=..., + addrs=..., registrant=..., ): self.auth_info = auth_info @@ -578,6 +580,8 @@ class MockEppLib(TestCase): self.contacts = contacts self.hosts = hosts self.statuses = statuses + self.avail = avail # use for CheckDomain + self.addrs = addrs self.registrant = registrant def dummyInfoContactResultData( @@ -682,22 +686,88 @@ class MockEppLib(TestCase): hosts=["fake.host.com"], ) + infoDomainThreeHosts = fakedEppObject( + "my-nameserver.gov", + cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35), + contacts=[], + hosts=[ + "ns1.my-nameserver-1.com", + "ns1.my-nameserver-2.com", + "ns1.cats-are-superior3.com", + ], + ) + infoDomainNoHost = fakedEppObject( + "my-nameserver.gov", + cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35), + contacts=[], + hosts=[], + ) + + infoDomainTwoHosts = fakedEppObject( + "my-nameserver.gov", + cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35), + contacts=[], + hosts=["ns1.my-nameserver-1.com", "ns1.my-nameserver-2.com"], + ) + mockDataInfoHosts = fakedEppObject( + "lastPw", + cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35), + addrs=["1.2.3.4", "2.3.4.5"], + ) + + mockDataHostChange = fakedEppObject( "lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35) ) + infoDomainHasIP = fakedEppObject( + "nameserverwithip.gov", + cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35), + contacts=[], + hosts=[ + "ns1.nameserverwithip.gov", + "ns2.nameserverwithip.gov", + "ns3.nameserverwithip.gov", + ], + addrs=["1.2.3.4", "2.3.4.5"], + ) + + infoDomainCheckHostIPCombo = fakedEppObject( + "nameserversubdomain.gov", + cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35), + contacts=[], + hosts=[ + "ns1.nameserversubdomain.gov", + "ns2.nameserversubdomain.gov", + ], + ) + + def _getattrInfoDomain(self, _request): + if getattr(_request, "name", None) == "security.gov": + return MagicMock(res_data=[self.infoDomainNoContact]) + elif getattr(_request, "name", None) == "my-nameserver.gov": + if self.mockedSendFunction.call_count == 5: + return MagicMock(res_data=[self.infoDomainTwoHosts]) + else: + return MagicMock(res_data=[self.infoDomainNoHost]) + elif getattr(_request, "name", None) == "nameserverwithip.gov": + return MagicMock(res_data=[self.infoDomainHasIP]) + elif getattr(_request, "name", None) == "namerserversubdomain.gov": + return MagicMock(res_data=[self.infoDomainCheckHostIPCombo]) + elif getattr(_request, "name", None) == "freeman.gov": + return MagicMock(res_data=[self.InfoDomainWithContacts]) + elif getattr(_request, "name", None) == "threenameserversDomain.gov": + return MagicMock(res_data=[self.infoDomainThreeHosts]) + return MagicMock(res_data=[self.mockDataInfoDomain]) + def mockSend(self, _request, cleaned): """Mocks the registry.send function used inside of domain.py registry is imported from epplibwrapper returns objects that simulate what would be in a epp response but only relevant pieces for tests""" if isinstance(_request, commands.InfoDomain): - if getattr(_request, "name", None) == "security.gov": - return MagicMock(res_data=[self.infoDomainNoContact]) - elif getattr(_request, "name", None) == "freeman.gov": - return MagicMock(res_data=[self.InfoDomainWithContacts]) - else: - return MagicMock(res_data=[self.mockDataInfoDomain]) + return self._getattrInfoDomain(_request) + elif isinstance(_request, commands.InfoContact): mocked_result: info.InfoContactResultData @@ -724,6 +794,26 @@ class MockEppLib(TestCase): # use this for when a contact is being updated # sets the second send() to fail raise RegistryError(code=ErrorCode.OBJECT_EXISTS) + elif isinstance(_request, commands.CreateHost): + return MagicMock( + res_data=[self.mockDataHostChange], + code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY, + ) + elif isinstance(_request, commands.UpdateHost): + return MagicMock( + res_data=[self.mockDataHostChange], + code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY, + ) + elif isinstance(_request, commands.UpdateDomain): + return MagicMock( + res_data=[self.mockDataHostChange], + code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY, + ) + elif isinstance(_request, commands.DeleteHost): + return MagicMock( + res_data=[self.mockDataHostChange], + code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY, + ) elif ( isinstance(_request, commands.DeleteDomain) and getattr(_request, "name", None) == "failDelete.gov" @@ -734,6 +824,7 @@ class MockEppLib(TestCase): raise RegistryError( code=ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION ) + return MagicMock(res_data=[self.mockDataInfoHosts]) def setUp(self): diff --git a/src/registrar/tests/test_admin.py b/src/registrar/tests/test_admin.py index dd87a003a..51ace34f7 100644 --- a/src/registrar/tests/test_admin.py +++ b/src/registrar/tests/test_admin.py @@ -109,12 +109,12 @@ class TestDomainAdmin(MockEppLib): ) self.assertEqual(response.status_code, 200) self.assertContains(response, domain.name) - self.assertContains(response, "Delete Domain in Registry") + self.assertContains(response, "Delete domain in registry") # Test the info dialog request = self.factory.post( "/admin/registrar/domain/{}/change/".format(domain.pk), - {"_delete_domain": "Delete Domain in Registry", "name": domain.name}, + {"_delete_domain": "Delete domain in registry", "name": domain.name}, follow=True, ) request.user = self.client @@ -149,12 +149,12 @@ class TestDomainAdmin(MockEppLib): ) self.assertEqual(response.status_code, 200) self.assertContains(response, domain.name) - self.assertContains(response, "Delete Domain in Registry") + self.assertContains(response, "Delete domain in registry") # Test the error request = self.factory.post( "/admin/registrar/domain/{}/change/".format(domain.pk), - {"_delete_domain": "Delete Domain in Registry", "name": domain.name}, + {"_delete_domain": "Delete domain in registry", "name": domain.name}, follow=True, ) request.user = self.client @@ -194,12 +194,12 @@ class TestDomainAdmin(MockEppLib): ) self.assertEqual(response.status_code, 200) self.assertContains(response, domain.name) - self.assertContains(response, "Delete Domain in Registry") + self.assertContains(response, "Delete domain in registry") # Test the info dialog request = self.factory.post( "/admin/registrar/domain/{}/change/".format(domain.pk), - {"_delete_domain": "Delete Domain in Registry", "name": domain.name}, + {"_delete_domain": "Delete domain in registry", "name": domain.name}, follow=True, ) request.user = self.client @@ -221,7 +221,7 @@ class TestDomainAdmin(MockEppLib): # Test the info dialog request = self.factory.post( "/admin/registrar/domain/{}/change/".format(domain.pk), - {"_delete_domain": "Delete Domain in Registry", "name": domain.name}, + {"_delete_domain": "Delete domain in registry", "name": domain.name}, follow=True, ) request.user = self.client diff --git a/src/registrar/tests/test_migrations.py b/src/registrar/tests/test_migrations.py index f98e876d7..95e5853ff 100644 --- a/src/registrar/tests/test_migrations.py +++ b/src/registrar/tests/test_migrations.py @@ -32,6 +32,7 @@ class TestGroups(TestCase): ) # Test permissions for cisa_analysts_group + # Verifies permission data migrations ran as expected. # Define the expected permission codenames expected_permissions = [ "view_logentry", @@ -39,6 +40,8 @@ class TestGroups(TestCase): "view_domain", "change_domainapplication", "change_domaininformation", + "add_domaininvitation", + "view_domaininvitation", "change_draftdomain", "analyst_access_permission", "change_user", diff --git a/src/registrar/tests/test_models_domain.py b/src/registrar/tests/test_models_domain.py index 50456c2d5..e3e55faba 100644 --- a/src/registrar/tests/test_models_domain.py +++ b/src/registrar/tests/test_models_domain.py @@ -16,6 +16,10 @@ from registrar.models.domain_information import DomainInformation from registrar.models.draft_domain import DraftDomain from registrar.models.public_contact import PublicContact from registrar.models.user import User +from registrar.utility.errors import ActionNotAllowed, NameserverError + +from registrar.models.utility.contact_error import ContactError, ContactErrorCodes + from .common import MockEppLib from django_fsm import TransitionNotAllowed # type: ignore from epplibwrapper import ( @@ -103,6 +107,7 @@ class TestDomainCache(MockEppLib): } expectedHostsDict = { "name": self.mockDataInfoDomain.hosts[0], + "addrs": self.mockDataInfoHosts.addrs, "cr_date": self.mockDataInfoHosts.cr_date, } @@ -193,6 +198,56 @@ class TestDomainCache(MockEppLib): self.assertEqual(cached_contact, in_db.registry_id) self.assertEqual(domain.security_contact.email, "123test@mail.gov") + def test_errors_map_epp_contact_to_public_contact(self): + """ + Scenario: Registrant gets invalid data from EPPLib + When the `map_epp_contact_to_public_contact` function + gets invalid data from EPPLib + Then the function throws the expected ContactErrors + """ + domain, _ = Domain.objects.get_or_create(name="registry.gov") + fakedEpp = self.fakedEppObject() + invalid_length = fakedEpp.dummyInfoContactResultData( + "Cymaticsisasubsetofmodalvibrationalphenomena", "lengthInvalid@mail.gov" + ) + valid_object = fakedEpp.dummyInfoContactResultData("valid", "valid@mail.gov") + + desired_error = ContactErrorCodes.CONTACT_ID_INVALID_LENGTH + with self.assertRaises(ContactError) as context: + domain.map_epp_contact_to_public_contact( + invalid_length, + invalid_length.id, + PublicContact.ContactTypeChoices.SECURITY, + ) + self.assertEqual(context.exception.code, desired_error) + + desired_error = ContactErrorCodes.CONTACT_ID_NONE + with self.assertRaises(ContactError) as context: + domain.map_epp_contact_to_public_contact( + valid_object, + None, + PublicContact.ContactTypeChoices.SECURITY, + ) + self.assertEqual(context.exception.code, desired_error) + + desired_error = ContactErrorCodes.CONTACT_INVALID_TYPE + with self.assertRaises(ContactError) as context: + domain.map_epp_contact_to_public_contact( + "bad_object", + valid_object.id, + PublicContact.ContactTypeChoices.SECURITY, + ) + self.assertEqual(context.exception.code, desired_error) + + desired_error = ContactErrorCodes.CONTACT_TYPE_NONE + with self.assertRaises(ContactError) as context: + domain.map_epp_contact_to_public_contact( + valid_object, + valid_object.id, + None, + ) + self.assertEqual(context.exception.code, desired_error) + class TestDomainCreation(MockEppLib): """Rule: An approved domain application must result in a domain""" @@ -213,7 +268,7 @@ class TestDomainCreation(MockEppLib): application.status = DomainApplication.SUBMITTED # transition to approve state application.approve() - # should hav information present for this domain + # should have information present for this domain domain = Domain.objects.get(name="igorville.gov") self.assertTrue(domain) self.mockedSendFunction.assert_not_called() @@ -483,7 +538,7 @@ class TestRegistrantContacts(MockEppLib): expectedSecContact = PublicContact.get_default_security() expectedSecContact.domain = self.domain - self.domain.pendingCreate() + self.domain.dns_needed_from_unknown() self.assertEqual(self.mockedSendFunction.call_count, 8) self.assertEqual(PublicContact.objects.filter(domain=self.domain).count(), 4) @@ -526,7 +581,8 @@ class TestRegistrantContacts(MockEppLib): created contact of type 'security' """ # make a security contact that is a PublicContact - self.domain.pendingCreate() # make sure a security email already exists + # make sure a security email already exists + self.domain.dns_needed_from_unknown() expectedSecContact = PublicContact.get_default_security() expectedSecContact.domain = self.domain expectedSecContact.email = "newEmail@fake.com" @@ -842,7 +898,7 @@ class TestRegistrantContacts(MockEppLib): self.assertEqual(self.domain_contact._cache["registrant"], expected_contact_db) -class TestRegistrantNameservers(TestCase): +class TestRegistrantNameservers(MockEppLib): """Rule: Registrants may modify their nameservers""" def setUp(self): @@ -851,9 +907,91 @@ class TestRegistrantNameservers(TestCase): Given the registrant is logged in And the registrant is the admin on a domain """ - pass + super().setUp() + self.nameserver1 = "ns1.my-nameserver-1.com" + self.nameserver2 = "ns1.my-nameserver-2.com" + self.nameserver3 = "ns1.cats-are-superior3.com" + + self.domain, _ = Domain.objects.get_or_create( + name="my-nameserver.gov", state=Domain.State.DNS_NEEDED + ) + self.domainWithThreeNS, _ = Domain.objects.get_or_create( + name="threenameserversDomain.gov", state=Domain.State.READY + ) + + def test_get_nameserver_changes_success_deleted_vals(self): + """Testing only deleting and no other changes""" + self.domain._cache["hosts"] = [ + {"name": "ns1.example.com", "addrs": None}, + {"name": "ns2.example.com", "addrs": ["1.2.3.4"]}, + ] + newChanges = [ + ("ns1.example.com",), + ] + ( + deleted_values, + updated_values, + new_values, + oldNameservers, + ) = self.domain.getNameserverChanges(newChanges) + + self.assertEqual(deleted_values, ["ns2.example.com"]) + self.assertEqual(updated_values, []) + self.assertEqual(new_values, {}) + self.assertEqual( + oldNameservers, + {"ns1.example.com": None, "ns2.example.com": ["1.2.3.4"]}, + ) + + def test_get_nameserver_changes_success_updated_vals(self): + """Testing only updating no other changes""" + self.domain._cache["hosts"] = [ + {"name": "ns3.my-nameserver.gov", "addrs": ["1.2.3.4"]}, + ] + newChanges = [ + ("ns3.my-nameserver.gov", ["1.2.4.5"]), + ] + ( + deleted_values, + updated_values, + new_values, + oldNameservers, + ) = self.domain.getNameserverChanges(newChanges) + + self.assertEqual(deleted_values, []) + self.assertEqual(updated_values, [("ns3.my-nameserver.gov", ["1.2.4.5"])]) + self.assertEqual(new_values, {}) + self.assertEqual( + oldNameservers, + {"ns3.my-nameserver.gov": ["1.2.3.4"]}, + ) + + def test_get_nameserver_changes_success_new_vals(self): + # Testing only creating no other changes + self.domain._cache["hosts"] = [ + {"name": "ns1.example.com", "addrs": None}, + ] + newChanges = [ + ("ns1.example.com",), + ("ns4.example.com",), + ] + ( + deleted_values, + updated_values, + new_values, + oldNameservers, + ) = self.domain.getNameserverChanges(newChanges) + + self.assertEqual(deleted_values, []) + self.assertEqual(updated_values, []) + self.assertEqual(new_values, {"ns4.example.com": None}) + self.assertEqual( + oldNameservers, + { + "ns1.example.com": None, + }, + ) - @skip("not implemented yet") def test_user_adds_one_nameserver(self): """ Scenario: Registrant adds a single nameserver @@ -863,9 +1001,31 @@ class TestRegistrantNameservers(TestCase): to the registry And `domain.is_active` returns False """ - raise - @skip("not implemented yet") + # set 1 nameserver + nameserver = "ns1.my-nameserver.com" + self.domain.nameservers = [(nameserver,)] + + # when we create a host, we should've updated at the same time + created_host = commands.CreateHost(nameserver) + update_domain_with_created = commands.UpdateDomain( + name=self.domain.name, + add=[common.HostObjSet([created_host.name])], + rem=[], + ) + + # checking if commands were sent (commands have to be sent in order) + expectedCalls = [ + call(created_host, cleaned=True), + call(update_domain_with_created, cleaned=True), + ] + + self.mockedSendFunction.assert_has_calls(expectedCalls) + + # check that status is still NOT READY + # as you have less than 2 nameservers + self.assertFalse(self.domain.is_active()) + def test_user_adds_two_nameservers(self): """ Scenario: Registrant adds 2 or more nameservers, thereby activating the domain @@ -875,9 +1035,36 @@ class TestRegistrantNameservers(TestCase): to the registry And `domain.is_active` returns True """ - raise - @skip("not implemented yet") + # set 2 nameservers + self.domain.nameservers = [(self.nameserver1,), (self.nameserver2,)] + + # when you create a host, you also have to update at same time + created_host1 = commands.CreateHost(self.nameserver1) + created_host2 = commands.CreateHost(self.nameserver2) + + update_domain_with_created = commands.UpdateDomain( + name=self.domain.name, + add=[ + common.HostObjSet([created_host1.name, created_host2.name]), + ], + rem=[], + ) + + infoDomain = commands.InfoDomain(name="my-nameserver.gov", auth_info=None) + # checking if commands were sent (commands have to be sent in order) + expectedCalls = [ + call(infoDomain, cleaned=True), + call(created_host1, cleaned=True), + call(created_host2, cleaned=True), + call(update_domain_with_created, cleaned=True), + ] + + self.mockedSendFunction.assert_has_calls(expectedCalls, any_order=True) + self.assertEqual(4, self.mockedSendFunction.call_count) + # check that status is READY + self.assertTrue(self.domain.is_active()) + def test_user_adds_too_many_nameservers(self): """ Scenario: Registrant adds 14 or more nameservers @@ -885,9 +1072,44 @@ class TestRegistrantNameservers(TestCase): When `domain.nameservers` is set to an array of length 14 Then Domain raises a user-friendly error """ - raise - @skip("not implemented yet") + # set 13+ nameservers + nameserver1 = "ns1.cats-are-superior1.com" + nameserver2 = "ns1.cats-are-superior2.com" + nameserver3 = "ns1.cats-are-superior3.com" + nameserver4 = "ns1.cats-are-superior4.com" + nameserver5 = "ns1.cats-are-superior5.com" + nameserver6 = "ns1.cats-are-superior6.com" + nameserver7 = "ns1.cats-are-superior7.com" + nameserver8 = "ns1.cats-are-superior8.com" + nameserver9 = "ns1.cats-are-superior9.com" + nameserver10 = "ns1.cats-are-superior10.com" + nameserver11 = "ns1.cats-are-superior11.com" + nameserver12 = "ns1.cats-are-superior12.com" + nameserver13 = "ns1.cats-are-superior13.com" + nameserver14 = "ns1.cats-are-superior14.com" + + def _get_14_nameservers(): + self.domain.nameservers = [ + (nameserver1,), + (nameserver2,), + (nameserver3,), + (nameserver4,), + (nameserver5,), + (nameserver6,), + (nameserver7,), + (nameserver8,), + (nameserver9), + (nameserver10,), + (nameserver11,), + (nameserver12,), + (nameserver13,), + (nameserver14,), + ] + + self.assertRaises(NameserverError, _get_14_nameservers) + self.assertEqual(self.mockedSendFunction.call_count, 0) + def test_user_removes_some_nameservers(self): """ Scenario: Registrant removes some nameservers, while keeping at least 2 @@ -897,21 +1119,84 @@ class TestRegistrantNameservers(TestCase): to the registry And `domain.is_active` returns True """ - raise - @skip("not implemented yet") + # Mock is set to return 3 nameservers on infodomain + self.domainWithThreeNS.nameservers = [(self.nameserver1,), (self.nameserver2,)] + expectedCalls = [ + # calls info domain, and info on all hosts + # to get past values + # then removes the single host and updates domain + call( + commands.InfoDomain(name=self.domainWithThreeNS.name, auth_info=None), + cleaned=True, + ), + call(commands.InfoHost(name="ns1.my-nameserver-1.com"), cleaned=True), + call(commands.InfoHost(name="ns1.my-nameserver-2.com"), cleaned=True), + call(commands.InfoHost(name="ns1.cats-are-superior3.com"), cleaned=True), + call( + commands.UpdateDomain( + name=self.domainWithThreeNS.name, + add=[], + rem=[common.HostObjSet(hosts=["ns1.cats-are-superior3.com"])], + nsset=None, + keyset=None, + registrant=None, + auth_info=None, + ), + cleaned=True, + ), + call(commands.DeleteHost(name="ns1.cats-are-superior3.com"), cleaned=True), + ] + + self.mockedSendFunction.assert_has_calls(expectedCalls, any_order=True) + self.assertTrue(self.domainWithThreeNS.is_active()) + def test_user_removes_too_many_nameservers(self): """ Scenario: Registrant removes some nameservers, bringing the total to less than 2 - Given the domain has 3 nameservers + Given the domain has 2 nameservers When `domain.nameservers` is set to an array containing nameserver #1 Then `commands.UpdateDomain` and `commands.DeleteHost` is sent to the registry And `domain.is_active` returns False - """ - raise - @skip("not implemented yet") + """ + + self.domainWithThreeNS.nameservers = [(self.nameserver1,)] + expectedCalls = [ + call( + commands.InfoDomain(name=self.domainWithThreeNS.name, auth_info=None), + cleaned=True, + ), + call(commands.InfoHost(name="ns1.my-nameserver-1.com"), cleaned=True), + call(commands.InfoHost(name="ns1.my-nameserver-2.com"), cleaned=True), + call(commands.InfoHost(name="ns1.cats-are-superior3.com"), cleaned=True), + call(commands.DeleteHost(name="ns1.my-nameserver-2.com"), cleaned=True), + call( + commands.UpdateDomain( + name=self.domainWithThreeNS.name, + add=[], + rem=[ + common.HostObjSet( + hosts=[ + "ns1.my-nameserver-2.com", + "ns1.cats-are-superior3.com", + ] + ), + ], + nsset=None, + keyset=None, + registrant=None, + auth_info=None, + ), + cleaned=True, + ), + call(commands.DeleteHost(name="ns1.cats-are-superior3.com"), cleaned=True), + ] + + self.mockedSendFunction.assert_has_calls(expectedCalls, any_order=True) + self.assertFalse(self.domainWithThreeNS.is_active()) + def test_user_replaces_nameservers(self): """ Scenario: Registrant simultaneously adds and removes some nameservers @@ -922,9 +1207,60 @@ class TestRegistrantNameservers(TestCase): And `commands.UpdateDomain` is sent to add #4 and #5 plus remove #2 and #3 And `commands.DeleteHost` is sent to delete #2 and #3 """ - raise + self.domainWithThreeNS.nameservers = [ + (self.nameserver1,), + ("ns1.cats-are-superior1.com",), + ("ns1.cats-are-superior2.com",), + ] + + expectedCalls = [ + call( + commands.InfoDomain(name=self.domainWithThreeNS.name, auth_info=None), + cleaned=True, + ), + call(commands.InfoHost(name="ns1.my-nameserver-1.com"), cleaned=True), + call(commands.InfoHost(name="ns1.my-nameserver-2.com"), cleaned=True), + call(commands.InfoHost(name="ns1.cats-are-superior3.com"), cleaned=True), + call(commands.DeleteHost(name="ns1.my-nameserver-2.com"), cleaned=True), + call( + commands.CreateHost(name="ns1.cats-are-superior1.com", addrs=[]), + cleaned=True, + ), + call( + commands.CreateHost(name="ns1.cats-are-superior2.com", addrs=[]), + cleaned=True, + ), + call( + commands.UpdateDomain( + name=self.domainWithThreeNS.name, + add=[ + common.HostObjSet( + hosts=[ + "ns1.cats-are-superior1.com", + "ns1.cats-are-superior2.com", + ] + ), + ], + rem=[ + common.HostObjSet( + hosts=[ + "ns1.my-nameserver-2.com", + "ns1.cats-are-superior3.com", + ] + ), + ], + nsset=None, + keyset=None, + registrant=None, + auth_info=None, + ), + cleaned=True, + ), + ] + + self.mockedSendFunction.assert_has_calls(expectedCalls, any_order=True) + self.assertTrue(self.domainWithThreeNS.is_active()) - @skip("not implemented yet") def test_user_cannot_add_subordinate_without_ip(self): """ Scenario: Registrant adds a nameserver which is a subdomain of their .gov @@ -933,9 +1269,12 @@ class TestRegistrantNameservers(TestCase): with a subdomain of the domain and no IP addresses Then Domain raises a user-friendly error """ - raise - @skip("not implemented yet") + dotgovnameserver = "my-nameserver.gov" + + with self.assertRaises(NameserverError): + self.domain.nameservers = [(dotgovnameserver,)] + def test_user_updates_ips(self): """ Scenario: Registrant changes IP addresses for a nameserver @@ -945,9 +1284,53 @@ class TestRegistrantNameservers(TestCase): with a different IP address(es) Then `commands.UpdateHost` is sent to the registry """ - raise + domain, _ = Domain.objects.get_or_create( + name="nameserverwithip.gov", state=Domain.State.READY + ) + domain.nameservers = [ + ("ns1.nameserverwithip.gov", ["2.3.4.5", "1.2.3.4"]), + ( + "ns2.nameserverwithip.gov", + ["1.2.3.4", "2.3.4.5", "2001:0db8:85a3:0000:0000:8a2e:0370:7334"], + ), + ("ns3.nameserverwithip.gov", ["2.3.4.5"]), + ] + + expectedCalls = [ + call( + commands.InfoDomain(name="nameserverwithip.gov", auth_info=None), + cleaned=True, + ), + call(commands.InfoHost(name="ns1.nameserverwithip.gov"), cleaned=True), + call(commands.InfoHost(name="ns2.nameserverwithip.gov"), cleaned=True), + call(commands.InfoHost(name="ns3.nameserverwithip.gov"), cleaned=True), + call( + commands.UpdateHost( + name="ns2.nameserverwithip.gov", + add=[ + common.Ip( + addr="2001:0db8:85a3:0000:0000:8a2e:0370:7334", ip="v6" + ) + ], + rem=[], + chg=None, + ), + cleaned=True, + ), + call( + commands.UpdateHost( + name="ns3.nameserverwithip.gov", + add=[], + rem=[common.Ip(addr="1.2.3.4", ip=None)], + chg=None, + ), + cleaned=True, + ), + ] + + self.mockedSendFunction.assert_has_calls(expectedCalls, any_order=True) + self.assertTrue(domain.is_active()) - @skip("not implemented yet") def test_user_cannot_add_non_subordinate_with_ip(self): """ Scenario: Registrant adds a nameserver which is NOT a subdomain of their .gov @@ -956,9 +1339,11 @@ class TestRegistrantNameservers(TestCase): which is not a subdomain of the domain and has IP addresses Then Domain raises a user-friendly error """ - raise + dotgovnameserver = "mynameserverdotgov.gov" + + with self.assertRaises(NameserverError): + self.domain.nameservers = [(dotgovnameserver, ["1.2.3"])] - @skip("not implemented yet") def test_nameservers_are_idempotent(self): """ Scenario: Registrant adds a set of nameservers twice, due to a UI glitch @@ -966,10 +1351,68 @@ class TestRegistrantNameservers(TestCase): to the registry twice with identical data Then no errors are raised in Domain """ - # implementation note: this requires seeing what happens when these are actually - # sent like this, and then implementing appropriate mocks for any errors the - # registry normally sends in this case - raise + + # Checking that it doesn't create or update even if out of order + self.domainWithThreeNS.nameservers = [ + (self.nameserver3,), + (self.nameserver1,), + (self.nameserver2,), + ] + + expectedCalls = [ + call( + commands.InfoDomain(name=self.domainWithThreeNS.name, auth_info=None), + cleaned=True, + ), + call(commands.InfoHost(name="ns1.my-nameserver-1.com"), cleaned=True), + call(commands.InfoHost(name="ns1.my-nameserver-2.com"), cleaned=True), + call(commands.InfoHost(name="ns1.cats-are-superior3.com"), cleaned=True), + ] + + self.mockedSendFunction.assert_has_calls(expectedCalls, any_order=True) + self.assertEqual(self.mockedSendFunction.call_count, 4) + + def test_is_subdomain_with_no_ip(self): + domain, _ = Domain.objects.get_or_create( + name="nameserversubdomain.gov", state=Domain.State.READY + ) + + with self.assertRaises(NameserverError): + domain.nameservers = [ + ("ns1.nameserversubdomain.gov",), + ("ns2.nameserversubdomain.gov",), + ] + + def test_not_subdomain_but_has_ip(self): + domain, _ = Domain.objects.get_or_create( + name="nameserversubdomain.gov", state=Domain.State.READY + ) + + with self.assertRaises(NameserverError): + domain.nameservers = [ + ("ns1.cats-da-best.gov", ["1.2.3.4"]), + ("ns2.cats-da-best.gov", ["2.3.4.5"]), + ] + + def test_is_subdomain_but_ip_addr_not_valid(self): + domain, _ = Domain.objects.get_or_create( + name="nameserversubdomain.gov", state=Domain.State.READY + ) + + with self.assertRaises(NameserverError): + domain.nameservers = [ + ("ns1.nameserversubdomain.gov", ["1.2.3"]), + ("ns2.nameserversubdomain.gov", ["2.3.4"]), + ] + + def test_setting_not_allowed(self): + """Scenario: A domain state is not Ready or DNS Needed + then setting nameservers is not allowed""" + domain, _ = Domain.objects.get_or_create( + name="onholdDomain.gov", state=Domain.State.ON_HOLD + ) + with self.assertRaises(ActionNotAllowed): + domain.nameservers = [self.nameserver1, self.nameserver2] @skip("not implemented yet") def test_update_is_unsuccessful(self): @@ -977,8 +1420,22 @@ class TestRegistrantNameservers(TestCase): Scenario: An update to the nameservers is unsuccessful When an error is returned from epplibwrapper Then a user-friendly error message is returned for displaying on the web + + Note: TODO 433 -- we will perform correct error handling and complete + this ticket. We want to raise an error for update/create/delete, but + don't want to lose user info (and exit out too early) """ - raise + + domain, _ = Domain.objects.get_or_create( + name="failednameserver.gov", state=Domain.State.READY + ) + + with self.assertRaises(RegistryError): + domain.nameservers = [("ns1.failednameserver.gov", ["4.5.6"])] + + def tearDown(self): + Domain.objects.all().delete() + return super().tearDown() class TestRegistrantDNSSEC(MockEppLib): diff --git a/src/registrar/tests/test_nameserver_error.py b/src/registrar/tests/test_nameserver_error.py new file mode 100644 index 000000000..c64717eb5 --- /dev/null +++ b/src/registrar/tests/test_nameserver_error.py @@ -0,0 +1,46 @@ +from django.test import TestCase + +from registrar.utility.errors import ( + NameserverError, + NameserverErrorCodes as nsErrorCodes, +) + + +class TestNameserverError(TestCase): + def test_with_no_ip(self): + """Test NameserverError when no ip address is passed""" + nameserver = "nameserver val" + expected = ( + f"Nameserver {nameserver} needs to have an " + "IP address because it is a subdomain" + ) + + nsException = NameserverError( + code=nsErrorCodes.MISSING_IP, nameserver=nameserver + ) + self.assertEqual(nsException.message, expected) + self.assertEqual(nsException.code, nsErrorCodes.MISSING_IP) + + def test_with_only_code(self): + """Test NameserverError when no ip address + and no nameserver is passed""" + nameserver = "nameserver val" + expected = "Too many hosts provided, you may not have more than 13 nameservers." + + nsException = NameserverError( + code=nsErrorCodes.TOO_MANY_HOSTS, nameserver=nameserver + ) + self.assertEqual(nsException.message, expected) + self.assertEqual(nsException.code, nsErrorCodes.TOO_MANY_HOSTS) + + def test_with_ip_nameserver(self): + """Test NameserverError when ip and nameserver are passed""" + ip = "ip val" + nameserver = "nameserver val" + + expected = f"Nameserver {nameserver} has an invalid IP address: {ip}" + nsException = NameserverError( + code=nsErrorCodes.INVALID_IP, nameserver=nameserver, ip=ip + ) + self.assertEqual(nsException.message, expected) + self.assertEqual(nsException.code, nsErrorCodes.INVALID_IP) diff --git a/src/registrar/tests/test_views.py b/src/registrar/tests/test_views.py index 68aaf0ed8..2194b42db 100644 --- a/src/registrar/tests/test_views.py +++ b/src/registrar/tests/test_views.py @@ -1309,7 +1309,7 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest, MockEppLib): page = self.client.get( reverse("domain-nameservers", kwargs={"pk": self.domain.id}) ) - self.assertContains(page, "Domain name servers") + self.assertContains(page, "DNS name servers") @skip("Broken by adding registry connection fix in ticket 848") def test_domain_nameservers_form(self): @@ -1414,7 +1414,7 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest, MockEppLib): page = self.client.get( reverse("domain-your-contact-information", kwargs={"pk": self.domain.id}) ) - self.assertContains(page, "Domain contact information") + self.assertContains(page, "Your contact information") def test_domain_your_contact_information_content(self): """Logged-in user's contact information appears on the page.""" @@ -1439,7 +1439,7 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest, MockEppLib): ) # Loads correctly - self.assertContains(page, "Domain security email") + self.assertContains(page, "Security email") self.assertContains(page, "security@mail.gov") self.mockSendPatch.stop() @@ -1455,7 +1455,7 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest, MockEppLib): ) # Loads correctly - self.assertContains(page, "Domain security email") + self.assertContains(page, "Security email") self.assertNotContains(page, "dotgov@cisa.dhs.gov") self.mockSendPatch.stop() @@ -1464,7 +1464,7 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest, MockEppLib): page = self.client.get( reverse("domain-security-email", kwargs={"pk": self.domain.id}) ) - self.assertContains(page, "Domain security email") + self.assertContains(page, "Security email") def test_domain_security_email_form(self): """Adding a security email works. diff --git a/src/registrar/utility/errors.py b/src/registrar/utility/errors.py index 3b17a17c7..f7bc743d6 100644 --- a/src/registrar/utility/errors.py +++ b/src/registrar/utility/errors.py @@ -1,3 +1,6 @@ +from enum import IntEnum + + class BlankValueError(ValueError): pass @@ -8,3 +11,65 @@ class ExtraDotsError(ValueError): class DomainUnavailableError(ValueError): pass + + +class ActionNotAllowed(Exception): + """User accessed an action that is not + allowed by the current state""" + + pass + + +class NameserverErrorCodes(IntEnum): + """Used in the NameserverError class for + error mapping. + Overview of nameserver error codes: + - 1 MISSING_IP ip address is missing for a nameserver + - 2 GLUE_RECORD_NOT_ALLOWED a host has a nameserver + value but is not a subdomain + - 3 INVALID_IP invalid ip address format or invalid version + - 4 TOO_MANY_HOSTS more than the max allowed host values + """ + + MISSING_IP = 1 + GLUE_RECORD_NOT_ALLOWED = 2 + INVALID_IP = 3 + TOO_MANY_HOSTS = 4 + UNABLE_TO_UPDATE_DOMAIN = 5 + + +class NameserverError(Exception): + """ + NameserverError class used to raise exceptions on + the nameserver getter + """ + + _error_mapping = { + NameserverErrorCodes.MISSING_IP: "Nameserver {} needs to have an " + "IP address because it is a subdomain", + NameserverErrorCodes.GLUE_RECORD_NOT_ALLOWED: "Nameserver {} cannot be linked " + "because it is not a subdomain", + NameserverErrorCodes.INVALID_IP: "Nameserver {} has an invalid IP address: {}", + NameserverErrorCodes.TOO_MANY_HOSTS: ( + "Too many hosts provided, you may not have more than 13 nameservers." + ), + NameserverErrorCodes.UNABLE_TO_UPDATE_DOMAIN: ( + "Unable to update domain, changes were not applied." + "Check logs as a Registry Error is the likely cause" + ), + } + + def __init__(self, *args, code=None, nameserver=None, ip=None, **kwargs): + super().__init__(*args, **kwargs) + self.code = code + if self.code in self._error_mapping: + self.message = self._error_mapping.get(self.code) + if nameserver is not None and ip is not None: + self.message = self.message.format(str(nameserver), str(ip)) + elif nameserver is not None: + self.message = self.message.format(str(nameserver)) + elif ip is not None: + self.message = self.message.format(str(ip)) + + def __str__(self): + return f"{self.message}" diff --git a/src/registrar/views/domain.py b/src/registrar/views/domain.py index 8838407f4..bc1f42b88 100644 --- a/src/registrar/views/domain.py +++ b/src/registrar/views/domain.py @@ -21,6 +21,7 @@ from registrar.models import ( User, UserDomainRole, ) +from registrar.models.public_contact import PublicContact from ..forms import ( ContactForm, @@ -42,6 +43,19 @@ class DomainView(DomainPermissionView): template_name = "domain_detail.html" + def get_context_data(self, **kwargs): + context = super().get_context_data(**kwargs) + + default_email = Domain().get_default_security_contact().email + context["default_security_email"] = default_email + + security_email = self.get_object().get_security_email() + if security_email is None or security_email == default_email: + context["security_email"] = None + return context + context["security_email"] = security_email + return context + class DomainOrgNameAddressView(DomainPermissionView, FormMixin): """Organization name and mailing address view""" @@ -287,10 +301,21 @@ class DomainSecurityEmailView(DomainPermissionView, FormMixin): """The form is valid, call setter in model.""" # Set the security email from the form - new_email = form.cleaned_data.get("security_email", "") + new_email: str = form.cleaned_data.get("security_email", "") + + # If we pass nothing for the sec email, set to the default + if new_email is None or new_email.strip() == "": + new_email = PublicContact.get_default_security().email domain = self.object contact = domain.security_contact + + # If no default is created for security_contact, + # then we cannot connect to the registry. + if contact is None: + messages.error(self.request, "Update failed. Cannot contact the registry.") + return redirect(self.get_success_url()) + contact.email = new_email contact.save()