mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-05-16 01:27:03 +02:00
Merge remote-tracking branch 'origin/main' into rjm/787-org-short-names
This commit is contained in:
commit
1f55ac8e06
14 changed files with 1121 additions and 122 deletions
|
@ -13,7 +13,8 @@ For more details, refer to the [user group model](../../src/registrar/models/use
|
|||
|
||||
We can edit and deploy new group permissions by:
|
||||
|
||||
1. editing `user_group` then:
|
||||
1. Editing `user_group` then:
|
||||
2. Duplicating migration `0036_create_groups_01`
|
||||
and running migrations (append the name with a version number
|
||||
to help django detect the migration eg 0037_create_groups_02)
|
||||
3. Making sure to update the dependency on the new migration with the previous migration
|
|
@ -342,6 +342,12 @@ class DomainInvitationAdmin(ListHeaderAdmin):
|
|||
]
|
||||
search_help_text = "Search by email or domain."
|
||||
|
||||
# Mark the FSM field 'status' as readonly
|
||||
# to allow admin users to create Domain Invitations
|
||||
# without triggering the FSM Transition Not Allowed
|
||||
# error.
|
||||
readonly_fields = ["status"]
|
||||
|
||||
|
||||
class DomainInformationAdmin(ListHeaderAdmin):
|
||||
"""Customize domain information admin class."""
|
||||
|
|
|
@ -39,6 +39,7 @@ class UserFixture:
|
|||
"username": "70488e0a-e937-4894-a28c-16f5949effd4",
|
||||
"first_name": "Gaby",
|
||||
"last_name": "DiSarli",
|
||||
"email": "gaby@truss.works",
|
||||
},
|
||||
{
|
||||
"username": "83c2b6dd-20a2-4cac-bb40-e22a72d2955c",
|
||||
|
@ -129,7 +130,7 @@ class UserFixture:
|
|||
"username": "0eb6f326-a3d4-410f-a521-aa4c1fad4e47",
|
||||
"first_name": "Gaby-Analyst",
|
||||
"last_name": "DiSarli-Analyst",
|
||||
"email": "gaby@truss.works",
|
||||
"email": "gaby+1@truss.works",
|
||||
},
|
||||
{
|
||||
"username": "cfe7c2fc-e24a-480e-8b78-28645a1459b3",
|
||||
|
|
|
@ -20,7 +20,8 @@ class DomainNameserverForm(forms.Form):
|
|||
|
||||
"""Form for changing nameservers."""
|
||||
|
||||
server = forms.CharField(label="Name server")
|
||||
server = forms.CharField(label="Name server", strip=True)
|
||||
# when adding IPs to this form ensure they are stripped as well
|
||||
|
||||
|
||||
NameserverFormset = formset_factory(
|
||||
|
|
|
@ -2,11 +2,14 @@
|
|||
# It is dependent on 0035 (which populates ContentType and Permissions)
|
||||
# If permissions on the groups need changing, edit CISA_ANALYST_GROUP_PERMISSIONS
|
||||
# in the user_group model then:
|
||||
# [NOT RECOMMENDED]
|
||||
# step 1: docker-compose exec app ./manage.py migrate --fake registrar 0035_contenttypes_permissions
|
||||
# step 2: docker-compose exec app ./manage.py migrate registrar 0036_create_groups
|
||||
# step 3: fake run the latest migration in the migrations list
|
||||
# [RECOMMENDED]
|
||||
# Alternatively:
|
||||
# Only step: duplicate the migtation that loads data and run: docker-compose exec app ./manage.py migrate
|
||||
# step 1: duplicate the migration that loads data
|
||||
# step 2: docker-compose exec app ./manage.py migrate
|
||||
|
||||
from django.db import migrations
|
||||
from registrar.models import UserGroup
|
||||
|
|
37
src/registrar/migrations/0038_create_groups_v02.py
Normal file
37
src/registrar/migrations/0038_create_groups_v02.py
Normal file
|
@ -0,0 +1,37 @@
|
|||
# This migration creates the create_full_access_group and create_cisa_analyst_group groups
|
||||
# It is dependent on 0035 (which populates ContentType and Permissions)
|
||||
# If permissions on the groups need changing, edit CISA_ANALYST_GROUP_PERMISSIONS
|
||||
# in the user_group model then:
|
||||
# [NOT RECOMMENDED]
|
||||
# step 1: docker-compose exec app ./manage.py migrate --fake registrar 0035_contenttypes_permissions
|
||||
# step 2: docker-compose exec app ./manage.py migrate registrar 0036_create_groups
|
||||
# step 3: fake run the latest migration in the migrations list
|
||||
# [RECOMMENDED]
|
||||
# Alternatively:
|
||||
# step 1: duplicate the migration that loads data
|
||||
# step 2: docker-compose exec app ./manage.py migrate
|
||||
|
||||
from django.db import migrations
|
||||
from registrar.models import UserGroup
|
||||
from typing import Any
|
||||
|
||||
|
||||
# For linting: RunPython expects a function reference,
|
||||
# so let's give it one
|
||||
def create_groups(apps, schema_editor) -> Any:
|
||||
UserGroup.create_cisa_analyst_group(apps, schema_editor)
|
||||
UserGroup.create_full_access_group(apps, schema_editor)
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
dependencies = [
|
||||
("registrar", "0037_create_groups_v01"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RunPython(
|
||||
create_groups,
|
||||
reverse_code=migrations.RunPython.noop,
|
||||
atomic=True,
|
||||
),
|
||||
]
|
|
@ -1,11 +1,13 @@
|
|||
from itertools import zip_longest
|
||||
import logging
|
||||
import ipaddress
|
||||
import re
|
||||
from datetime import date
|
||||
from string import digits
|
||||
from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore
|
||||
|
||||
from django.db import models
|
||||
|
||||
from typing import Any
|
||||
from epplibwrapper import (
|
||||
CLIENT as registry,
|
||||
commands,
|
||||
|
@ -15,8 +17,16 @@ from epplibwrapper import (
|
|||
RegistryError,
|
||||
ErrorCode,
|
||||
)
|
||||
|
||||
from registrar.utility.errors import (
|
||||
ActionNotAllowed,
|
||||
NameserverError,
|
||||
NameserverErrorCodes as nsErrorCodes,
|
||||
)
|
||||
|
||||
from registrar.models.utility.contact_error import ContactError, ContactErrorCodes
|
||||
|
||||
|
||||
from .utility.domain_field import DomainField
|
||||
from .utility.domain_helper import DomainHelper
|
||||
from .utility.time_stamped_model import TimeStampedModel
|
||||
|
@ -218,13 +228,13 @@ class Domain(TimeStampedModel, DomainHelper):
|
|||
raise NotImplementedError()
|
||||
|
||||
@Cache
|
||||
def nameservers(self) -> list[tuple[str]]:
|
||||
def nameservers(self) -> list[tuple[str, list]]:
|
||||
"""
|
||||
Get or set a complete list of nameservers for this domain.
|
||||
|
||||
Hosts are provided as a list of tuples, e.g.
|
||||
|
||||
[("ns1.example.com",), ("ns1.example.gov", "0.0.0.0")]
|
||||
[("ns1.example.com",), ("ns1.example.gov", ["0.0.0.0"])]
|
||||
|
||||
Subordinate hosts (something.your-domain.gov) MUST have IP addresses,
|
||||
while non-subordinate hosts MUST NOT.
|
||||
|
@ -232,39 +242,21 @@ class Domain(TimeStampedModel, DomainHelper):
|
|||
try:
|
||||
hosts = self._get_property("hosts")
|
||||
except Exception as err:
|
||||
# Don't throw error as this is normal for a new domain
|
||||
# TODO - 433 error handling ticket should address this
|
||||
# Do not raise error when missing nameservers
|
||||
# this is a standard occurence when a domain
|
||||
# is first created
|
||||
logger.info("Domain is missing nameservers %s" % err)
|
||||
return []
|
||||
|
||||
# TODO-687 fix this return value
|
||||
hostList = []
|
||||
for host in hosts:
|
||||
# TODO - this should actually have a second tuple value with the ip address
|
||||
# ignored because uncertain if we will even have a way to display mult.
|
||||
# and adresses can be a list of mult address
|
||||
hostList.append((host["name"],))
|
||||
hostList.append((host["name"], host["addrs"]))
|
||||
|
||||
return hostList
|
||||
|
||||
def _check_host(self, hostnames: list[str]):
|
||||
"""check if host is available, True if available
|
||||
returns boolean"""
|
||||
checkCommand = commands.CheckHost(hostnames)
|
||||
try:
|
||||
response = registry.send(checkCommand, cleaned=True)
|
||||
return response.res_data[0].avail
|
||||
except RegistryError as err:
|
||||
logger.warning(
|
||||
"Couldn't check hosts %s. Errorcode was %s, error was %s",
|
||||
hostnames,
|
||||
err.code,
|
||||
err,
|
||||
)
|
||||
return False
|
||||
|
||||
def _create_host(self, host, addrs):
|
||||
"""Call _check_host first before using this function,
|
||||
This creates the host object in the registry
|
||||
"""Creates the host object in the registry
|
||||
doesn't add the created host to the domain
|
||||
returns ErrorCode (int)"""
|
||||
logger.info("Creating host")
|
||||
|
@ -282,6 +274,187 @@ class Domain(TimeStampedModel, DomainHelper):
|
|||
logger.error("Error _create_host, code was %s error was %s" % (e.code, e))
|
||||
return e.code
|
||||
|
||||
def _convert_list_to_dict(self, listToConvert: list[tuple[str, list]]):
|
||||
"""converts a list of hosts into a dictionary
|
||||
Args:
|
||||
list[tuple[str, list]]: such as [("123",["1","2","3"])]
|
||||
This is the list of hosts to convert
|
||||
|
||||
returns:
|
||||
convertDict (dict(str,list))- such as{"123":["1","2","3"]}"""
|
||||
newDict: dict[str, Any] = {}
|
||||
|
||||
for tup in listToConvert:
|
||||
if len(tup) == 1:
|
||||
newDict[tup[0]] = None
|
||||
elif len(tup) == 2:
|
||||
newDict[tup[0]] = tup[1]
|
||||
return newDict
|
||||
|
||||
def isSubdomain(self, nameserver: str):
|
||||
"""Returns boolean if the domain name is found in the argument passed"""
|
||||
subdomain_pattern = r"([\w-]+\.)*"
|
||||
full_pattern = subdomain_pattern + self.name
|
||||
regex = re.compile(full_pattern)
|
||||
return bool(regex.match(nameserver))
|
||||
|
||||
def checkHostIPCombo(self, nameserver: str, ip: list[str]):
|
||||
"""Checks the parameters past for a valid combination
|
||||
raises error if:
|
||||
- nameserver is a subdomain but is missing ip
|
||||
- nameserver is not a subdomain but has ip
|
||||
- nameserver is a subdomain but an ip passed is invalid
|
||||
|
||||
Args:
|
||||
hostname (str)- nameserver or subdomain
|
||||
ip (list[str])-list of ip strings
|
||||
Throws:
|
||||
NameserverError (if exception hit)
|
||||
Returns:
|
||||
None"""
|
||||
if self.isSubdomain(nameserver) and (ip is None or ip == []):
|
||||
raise NameserverError(code=nsErrorCodes.MISSING_IP, nameserver=nameserver)
|
||||
|
||||
elif not self.isSubdomain(nameserver) and (ip is not None and ip != []):
|
||||
raise NameserverError(
|
||||
code=nsErrorCodes.GLUE_RECORD_NOT_ALLOWED, nameserver=nameserver, ip=ip
|
||||
)
|
||||
elif ip is not None and ip != []:
|
||||
for addr in ip:
|
||||
if not self._valid_ip_addr(addr):
|
||||
raise NameserverError(
|
||||
code=nsErrorCodes.INVALID_IP, nameserver=nameserver, ip=ip
|
||||
)
|
||||
return None
|
||||
|
||||
def _valid_ip_addr(self, ipToTest: str):
|
||||
"""returns boolean if valid ip address string
|
||||
We currently only accept v4 or v6 ips
|
||||
returns:
|
||||
isValid (boolean)-True for valid ip address"""
|
||||
try:
|
||||
ip = ipaddress.ip_address(ipToTest)
|
||||
return ip.version == 6 or ip.version == 4
|
||||
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
def getNameserverChanges(
|
||||
self, hosts: list[tuple[str, list]]
|
||||
) -> tuple[list, list, dict, dict]:
|
||||
"""
|
||||
calls self.nameserver, it should pull from cache but may result
|
||||
in an epp call
|
||||
Args:
|
||||
hosts: list[tuple[str, list]] such as [("123",["1","2","3"])]
|
||||
Throws:
|
||||
NameserverError (if exception hit)
|
||||
Returns:
|
||||
tuple[list, list, dict, dict]
|
||||
These four tuple values as follows:
|
||||
deleted_values: list[str]
|
||||
updated_values: list[str]
|
||||
new_values: dict(str,list)
|
||||
prevHostDict: dict(str,list)"""
|
||||
|
||||
oldNameservers = self.nameservers
|
||||
|
||||
previousHostDict = self._convert_list_to_dict(oldNameservers)
|
||||
|
||||
newHostDict = self._convert_list_to_dict(hosts)
|
||||
deleted_values = []
|
||||
# TODO-currently a list of tuples, why not dict? for consistency
|
||||
updated_values = []
|
||||
new_values = {}
|
||||
|
||||
for prevHost in previousHostDict:
|
||||
addrs = previousHostDict[prevHost]
|
||||
# get deleted values-which are values in previous nameserver list
|
||||
# but are not in the list of new host values
|
||||
if prevHost not in newHostDict:
|
||||
deleted_values.append(prevHost)
|
||||
# if the host exists in both, check if the addresses changed
|
||||
else:
|
||||
# TODO - host is being updated when previous was None+new is empty list
|
||||
# add check here
|
||||
if newHostDict[prevHost] is not None and set(
|
||||
newHostDict[prevHost]
|
||||
) != set(addrs):
|
||||
self.checkHostIPCombo(nameserver=prevHost, ip=newHostDict[prevHost])
|
||||
updated_values.append((prevHost, newHostDict[prevHost]))
|
||||
|
||||
new_values = {
|
||||
key: newHostDict.get(key)
|
||||
for key in newHostDict
|
||||
if key not in previousHostDict and key.strip() != ""
|
||||
}
|
||||
|
||||
for nameserver, ip in new_values.items():
|
||||
self.checkHostIPCombo(nameserver=nameserver, ip=ip)
|
||||
|
||||
return (deleted_values, updated_values, new_values, previousHostDict)
|
||||
|
||||
def _update_host_values(self, updated_values, oldNameservers):
|
||||
for hostTuple in updated_values:
|
||||
updated_response_code = self._update_host(
|
||||
hostTuple[0], hostTuple[1], oldNameservers.get(hostTuple[0])
|
||||
)
|
||||
if updated_response_code not in [
|
||||
ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
|
||||
ErrorCode.OBJECT_EXISTS,
|
||||
]:
|
||||
logger.warning(
|
||||
"Could not update host %s. Error code was: %s "
|
||||
% (hostTuple[0], updated_response_code)
|
||||
)
|
||||
|
||||
def createNewHostList(self, new_values: dict):
|
||||
"""convert the dictionary of new values to a list of HostObjSet
|
||||
for use in the UpdateDomain epp message
|
||||
Args:
|
||||
new_values: dict(str,list)- dict of {nameserver:ips} to add to domain
|
||||
Returns:
|
||||
tuple [list[epp.HostObjSet], int]
|
||||
list[epp.HostObjSet]-epp object for use in the UpdateDomain epp message
|
||||
defaults to empty list
|
||||
int-number of items being created default 0
|
||||
"""
|
||||
|
||||
hostStringList = []
|
||||
for key, value in new_values.items():
|
||||
createdCode = self._create_host(
|
||||
host=key, addrs=value
|
||||
) # creates in registry
|
||||
if (
|
||||
createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY
|
||||
or createdCode == ErrorCode.OBJECT_EXISTS
|
||||
):
|
||||
hostStringList.append(key)
|
||||
if hostStringList == []:
|
||||
return [], 0
|
||||
|
||||
addToDomainObject = epp.HostObjSet(hosts=hostStringList)
|
||||
return [addToDomainObject], len(hostStringList)
|
||||
|
||||
def createDeleteHostList(self, hostsToDelete: list[str]):
|
||||
"""
|
||||
Args:
|
||||
hostsToDelete (list[str])- list of nameserver/host names to remove
|
||||
Returns:
|
||||
tuple [list[epp.HostObjSet], int]
|
||||
list[epp.HostObjSet]-epp object for use in the UpdateDomain epp message
|
||||
defaults to empty list
|
||||
int-number of items being created default 0
|
||||
"""
|
||||
deleteStrList = []
|
||||
for nameserver in hostsToDelete:
|
||||
deleteStrList.append(nameserver)
|
||||
if deleteStrList == []:
|
||||
return [], 0
|
||||
deleteObj = epp.HostObjSet(hosts=hostsToDelete)
|
||||
|
||||
return [deleteObj], len(deleteStrList)
|
||||
|
||||
@Cache
|
||||
def dnssecdata(self) -> extensions.DNSSECExtension:
|
||||
return self._get_property("dnssecdata")
|
||||
|
@ -304,54 +477,62 @@ class Domain(TimeStampedModel, DomainHelper):
|
|||
raise e
|
||||
|
||||
@nameservers.setter # type: ignore
|
||||
def nameservers(self, hosts: list[tuple[str]]):
|
||||
"""host should be a tuple of type str, str,... where the elements are
|
||||
def nameservers(self, hosts: list[tuple[str, list]]):
|
||||
"""Host should be a tuple of type str, str,... where the elements are
|
||||
Fully qualified host name, addresses associated with the host
|
||||
example: [(ns1.okay.gov, 127.0.0.1, others ips)]"""
|
||||
# TODO: ticket #848 finish this implementation
|
||||
# must delete nameservers as well or update
|
||||
# ip version checking may need to be added in a different ticket
|
||||
example: [(ns1.okay.gov, [127.0.0.1, others ips])]"""
|
||||
|
||||
if len(hosts) > 13:
|
||||
raise ValueError(
|
||||
"Too many hosts provided, you may not have more than 13 nameservers."
|
||||
)
|
||||
raise NameserverError(code=nsErrorCodes.TOO_MANY_HOSTS)
|
||||
|
||||
if self.state not in [self.State.DNS_NEEDED, self.State.READY]:
|
||||
raise ActionNotAllowed("Nameservers can not be " "set in the current state")
|
||||
|
||||
logger.info("Setting nameservers")
|
||||
logger.info(hosts)
|
||||
for hostTuple in hosts:
|
||||
host = hostTuple[0]
|
||||
addrs = None
|
||||
if len(hostTuple) > 1:
|
||||
addrs = hostTuple[1:]
|
||||
avail = self._check_host([host])
|
||||
if avail:
|
||||
createdCode = self._create_host(host=host, addrs=addrs)
|
||||
|
||||
# update the domain obj
|
||||
if createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
|
||||
# add host to domain
|
||||
request = commands.UpdateDomain(
|
||||
name=self.name, add=[epp.HostObjSet([host])]
|
||||
# get the changes made by user and old nameserver values
|
||||
(
|
||||
deleted_values,
|
||||
updated_values,
|
||||
new_values,
|
||||
oldNameservers,
|
||||
) = self.getNameserverChanges(hosts=hosts)
|
||||
|
||||
_ = self._update_host_values(
|
||||
updated_values, oldNameservers
|
||||
) # returns nothing, just need to be run and errors
|
||||
addToDomainList, addToDomainCount = self.createNewHostList(new_values)
|
||||
deleteHostList, deleteCount = self.createDeleteHostList(deleted_values)
|
||||
responseCode = self.addAndRemoveHostsFromDomain(
|
||||
hostsToAdd=addToDomainList, hostsToDelete=deleteHostList
|
||||
)
|
||||
|
||||
# if unable to update domain raise error and stop
|
||||
if responseCode != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
|
||||
raise NameserverError(code=nsErrorCodes.UNABLE_TO_UPDATE_DOMAIN)
|
||||
|
||||
successTotalNameservers = len(oldNameservers) - deleteCount + addToDomainCount
|
||||
|
||||
self._delete_hosts_if_not_used(hostsToDelete=deleted_values)
|
||||
if successTotalNameservers < 2:
|
||||
try:
|
||||
registry.send(request, cleaned=True)
|
||||
except RegistryError as e:
|
||||
logger.error(
|
||||
"Error adding nameserver, code was %s error was %s"
|
||||
% (e.code, e)
|
||||
self.dns_needed()
|
||||
self.save()
|
||||
except Exception as err:
|
||||
logger.info(
|
||||
"nameserver setter checked for dns_needed state "
|
||||
"and it did not succeed. Warning: %s" % err
|
||||
)
|
||||
|
||||
elif successTotalNameservers >= 2 and successTotalNameservers <= 13:
|
||||
try:
|
||||
self.ready()
|
||||
self.save()
|
||||
except Exception as err:
|
||||
logger.info(
|
||||
"nameserver setter checked for create state "
|
||||
"and it did not succeed. Error: %s" % err
|
||||
"and it did not succeed. Warning: %s" % err
|
||||
)
|
||||
# TODO - handle removed nameservers here will need to change the state
|
||||
# then go back to DNS_NEEDED
|
||||
|
||||
@Cache
|
||||
def statuses(self) -> list[str]:
|
||||
|
@ -963,7 +1144,7 @@ class Domain(TimeStampedModel, DomainHelper):
|
|||
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST:
|
||||
# avoid infinite loop
|
||||
already_tried_to_create = True
|
||||
self.pendingCreate()
|
||||
self.dns_needed_from_unknown()
|
||||
self.save()
|
||||
else:
|
||||
logger.error(e)
|
||||
|
@ -977,7 +1158,7 @@ class Domain(TimeStampedModel, DomainHelper):
|
|||
return registrant.registry_id
|
||||
|
||||
@transition(field="state", source=State.UNKNOWN, target=State.DNS_NEEDED)
|
||||
def pendingCreate(self):
|
||||
def dns_needed_from_unknown(self):
|
||||
logger.info("Changing to dns_needed")
|
||||
|
||||
registrantID = self.addRegistrant()
|
||||
|
@ -1053,26 +1234,54 @@ class Domain(TimeStampedModel, DomainHelper):
|
|||
else:
|
||||
self._invalidate_cache()
|
||||
|
||||
# def is_dns_needed(self):
|
||||
# """Commented out and kept in the codebase
|
||||
# as this call should be made, but adds
|
||||
# a lot of processing time
|
||||
# when EPP calling is made more efficient
|
||||
# this should be added back in
|
||||
|
||||
# The goal is to double check that
|
||||
# the nameservers we set are in fact
|
||||
# on the registry
|
||||
# """
|
||||
# self._invalidate_cache()
|
||||
# nameserverList = self.nameservers
|
||||
# return len(nameserverList) < 2
|
||||
|
||||
# def dns_not_needed(self):
|
||||
# return not self.is_dns_needed()
|
||||
|
||||
@transition(
|
||||
field="state",
|
||||
source=[State.DNS_NEEDED],
|
||||
target=State.READY,
|
||||
# conditions=[dns_not_needed]
|
||||
)
|
||||
def ready(self):
|
||||
"""Transition to the ready state
|
||||
domain should have nameservers and all contacts
|
||||
and now should be considered live on a domain
|
||||
"""
|
||||
# TODO - in nameservers tickets 848 and 562
|
||||
# check here if updates need to be made
|
||||
# consider adding these checks as constraints
|
||||
# within the transistion itself
|
||||
nameserverList = self.nameservers
|
||||
logger.info("Changing to ready state")
|
||||
if len(nameserverList) < 2 or len(nameserverList) > 13:
|
||||
raise ValueError("Not ready to become created, cannot transition yet")
|
||||
logger.info("able to transition to ready state")
|
||||
|
||||
@transition(
|
||||
field="state",
|
||||
source=[State.READY],
|
||||
target=State.DNS_NEEDED,
|
||||
# conditions=[is_dns_needed]
|
||||
)
|
||||
def dns_needed(self):
|
||||
"""Transition to the DNS_NEEDED state
|
||||
domain should NOT have nameservers but
|
||||
SHOULD have all contacts
|
||||
Going to check nameservers and will
|
||||
result in an EPP call
|
||||
"""
|
||||
logger.info("Changing to DNS_NEEDED state")
|
||||
logger.info("able to transition to DNS_NEEDED state")
|
||||
|
||||
def _disclose_fields(self, contact: PublicContact):
|
||||
"""creates a disclose object that can be added to a contact Create using
|
||||
.disclose= <this function> on the command before sending.
|
||||
|
@ -1198,6 +1407,10 @@ class Domain(TimeStampedModel, DomainHelper):
|
|||
|
||||
raise e
|
||||
|
||||
def is_ipv6(self, ip: str):
|
||||
ip_addr = ipaddress.ip_address(ip)
|
||||
return ip_addr.version == 6
|
||||
|
||||
def _fetch_hosts(self, host_data):
|
||||
"""Fetch host info."""
|
||||
hosts = []
|
||||
|
@ -1215,11 +1428,131 @@ class Domain(TimeStampedModel, DomainHelper):
|
|||
hosts.append({k: v for k, v in host.items() if v is not ...})
|
||||
return hosts
|
||||
|
||||
def _update_or_create_host(self, host):
|
||||
raise NotImplementedError()
|
||||
def _convert_ips(self, ip_list: list[str]):
|
||||
"""Convert Ips to a list of epp.Ip objects
|
||||
use when sending update host command.
|
||||
if there are no ips an empty list will be returned
|
||||
|
||||
def _delete_host(self, host):
|
||||
raise NotImplementedError()
|
||||
Args:
|
||||
ip_list (list[str]): the new list of ips, may be empty
|
||||
Returns:
|
||||
edited_ip_list (list[epp.Ip]): list of epp.ip objects ready to
|
||||
be sent to the registry
|
||||
"""
|
||||
edited_ip_list = []
|
||||
if ip_list is None:
|
||||
return []
|
||||
|
||||
for ip_addr in ip_list:
|
||||
if self.is_ipv6(ip_addr):
|
||||
edited_ip_list.append(epp.Ip(addr=ip_addr, ip="v6"))
|
||||
else: # default ip addr is v4
|
||||
edited_ip_list.append(epp.Ip(addr=ip_addr))
|
||||
|
||||
return edited_ip_list
|
||||
|
||||
def _update_host(self, nameserver: str, ip_list: list[str], old_ip_list: list[str]):
|
||||
"""Update an existing host object in EPP. Sends the update host command
|
||||
can result in a RegistryError
|
||||
Args:
|
||||
nameserver (str): nameserver or subdomain
|
||||
ip_list (list[str]): the new list of ips, may be empty
|
||||
old_ip_list (list[str]): the old ip list, may also be empty
|
||||
|
||||
Returns:
|
||||
errorCode (int): one of ErrorCode enum type values
|
||||
|
||||
"""
|
||||
try:
|
||||
if (
|
||||
ip_list is None
|
||||
or len(ip_list) == 0
|
||||
and isinstance(old_ip_list, list)
|
||||
and len(old_ip_list) != 0
|
||||
):
|
||||
return ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY
|
||||
|
||||
added_ip_list = set(ip_list).difference(old_ip_list)
|
||||
removed_ip_list = set(old_ip_list).difference(ip_list)
|
||||
|
||||
request = commands.UpdateHost(
|
||||
name=nameserver,
|
||||
add=self._convert_ips(list(added_ip_list)),
|
||||
rem=self._convert_ips(list(removed_ip_list)),
|
||||
)
|
||||
response = registry.send(request, cleaned=True)
|
||||
logger.info("_update_host()-> sending req as %s" % request)
|
||||
return response.code
|
||||
except RegistryError as e:
|
||||
logger.error("Error _update_host, code was %s error was %s" % (e.code, e))
|
||||
return e.code
|
||||
|
||||
def addAndRemoveHostsFromDomain(
|
||||
self, hostsToAdd: list[str], hostsToDelete: list[str]
|
||||
):
|
||||
"""sends an UpdateDomain message to the registry with the hosts provided
|
||||
Args:
|
||||
hostsToDelete (list[epp.HostObjSet])- list of host objects to delete
|
||||
hostsToAdd (list[epp.HostObjSet])- list of host objects to add
|
||||
Returns:
|
||||
response code (int)- RegistryErrorCode integer value
|
||||
defaults to return COMMAND_COMPLETED_SUCCESSFULLY
|
||||
if there is nothing to add or delete
|
||||
"""
|
||||
|
||||
if hostsToAdd == [] and hostsToDelete == []:
|
||||
return ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY
|
||||
|
||||
try:
|
||||
updateReq = commands.UpdateDomain(
|
||||
name=self.name, rem=hostsToDelete, add=hostsToAdd
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"addAndRemoveHostsFromDomain()-> sending update domain req as %s"
|
||||
% updateReq
|
||||
)
|
||||
response = registry.send(updateReq, cleaned=True)
|
||||
|
||||
return response.code
|
||||
except RegistryError as e:
|
||||
logger.error(
|
||||
"Error addAndRemoveHostsFromDomain, code was %s error was %s"
|
||||
% (e.code, e)
|
||||
)
|
||||
return e.code
|
||||
|
||||
def _delete_hosts_if_not_used(self, hostsToDelete: list[str]):
|
||||
"""delete the host object in registry,
|
||||
will only delete the host object, if it's not being used by another domain
|
||||
Performs just the DeleteHost epp call
|
||||
Supresses regstry error, as registry can disallow delete for various reasons
|
||||
Args:
|
||||
hostsToDelete (list[str])- list of nameserver/host names to remove
|
||||
Returns:
|
||||
None
|
||||
|
||||
"""
|
||||
try:
|
||||
for nameserver in hostsToDelete:
|
||||
deleteHostReq = commands.DeleteHost(name=nameserver)
|
||||
registry.send(deleteHostReq, cleaned=True)
|
||||
logger.info(
|
||||
"_delete_hosts_if_not_used()-> sending delete host req as %s"
|
||||
% deleteHostReq
|
||||
)
|
||||
|
||||
except RegistryError as e:
|
||||
if e.code == ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION:
|
||||
logger.info(
|
||||
"Did not remove host %s because it is in use on another domain."
|
||||
% nameserver
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
"Error _delete_hosts_if_not_used, code was %s error was %s"
|
||||
% (e.code, e)
|
||||
)
|
||||
|
||||
def _fetch_cache(self, fetch_hosts=False, fetch_contacts=False):
|
||||
"""Contact registry for info about a domain."""
|
||||
|
|
|
@ -51,6 +51,11 @@ class UserGroup(Group):
|
|||
"model": "user",
|
||||
"permissions": ["analyst_access_permission", "change_user"],
|
||||
},
|
||||
{
|
||||
"app_label": "registrar",
|
||||
"model": "domaininvitation",
|
||||
"permissions": ["add_domaininvitation", "view_domaininvitation"],
|
||||
},
|
||||
]
|
||||
|
||||
# Avoid error: You can't execute queries until the end
|
||||
|
|
|
@ -43,6 +43,7 @@
|
|||
{% else %}
|
||||
{% include "includes/contact.html" with contact=value %}
|
||||
{% endif %}
|
||||
<!-- TODO #687 add here another elif for nameservers to show item Ips -->
|
||||
{% elif list %}
|
||||
{% if value|length == 1 %}
|
||||
{% if users %}
|
||||
|
|
|
@ -571,6 +571,8 @@ class MockEppLib(TestCase):
|
|||
contacts=...,
|
||||
hosts=...,
|
||||
statuses=...,
|
||||
avail=...,
|
||||
addrs=...,
|
||||
registrant=...,
|
||||
):
|
||||
self.auth_info = auth_info
|
||||
|
@ -578,6 +580,8 @@ class MockEppLib(TestCase):
|
|||
self.contacts = contacts
|
||||
self.hosts = hosts
|
||||
self.statuses = statuses
|
||||
self.avail = avail # use for CheckDomain
|
||||
self.addrs = addrs
|
||||
self.registrant = registrant
|
||||
|
||||
def dummyInfoContactResultData(
|
||||
|
@ -682,22 +686,88 @@ class MockEppLib(TestCase):
|
|||
hosts=["fake.host.com"],
|
||||
)
|
||||
|
||||
infoDomainThreeHosts = fakedEppObject(
|
||||
"my-nameserver.gov",
|
||||
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
|
||||
contacts=[],
|
||||
hosts=[
|
||||
"ns1.my-nameserver-1.com",
|
||||
"ns1.my-nameserver-2.com",
|
||||
"ns1.cats-are-superior3.com",
|
||||
],
|
||||
)
|
||||
infoDomainNoHost = fakedEppObject(
|
||||
"my-nameserver.gov",
|
||||
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
|
||||
contacts=[],
|
||||
hosts=[],
|
||||
)
|
||||
|
||||
infoDomainTwoHosts = fakedEppObject(
|
||||
"my-nameserver.gov",
|
||||
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
|
||||
contacts=[],
|
||||
hosts=["ns1.my-nameserver-1.com", "ns1.my-nameserver-2.com"],
|
||||
)
|
||||
|
||||
mockDataInfoHosts = fakedEppObject(
|
||||
"lastPw",
|
||||
cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35),
|
||||
addrs=["1.2.3.4", "2.3.4.5"],
|
||||
)
|
||||
|
||||
mockDataHostChange = fakedEppObject(
|
||||
"lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35)
|
||||
)
|
||||
|
||||
infoDomainHasIP = fakedEppObject(
|
||||
"nameserverwithip.gov",
|
||||
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
|
||||
contacts=[],
|
||||
hosts=[
|
||||
"ns1.nameserverwithip.gov",
|
||||
"ns2.nameserverwithip.gov",
|
||||
"ns3.nameserverwithip.gov",
|
||||
],
|
||||
addrs=["1.2.3.4", "2.3.4.5"],
|
||||
)
|
||||
|
||||
infoDomainCheckHostIPCombo = fakedEppObject(
|
||||
"nameserversubdomain.gov",
|
||||
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
|
||||
contacts=[],
|
||||
hosts=[
|
||||
"ns1.nameserversubdomain.gov",
|
||||
"ns2.nameserversubdomain.gov",
|
||||
],
|
||||
)
|
||||
|
||||
def _getattrInfoDomain(self, _request):
|
||||
if getattr(_request, "name", None) == "security.gov":
|
||||
return MagicMock(res_data=[self.infoDomainNoContact])
|
||||
elif getattr(_request, "name", None) == "my-nameserver.gov":
|
||||
if self.mockedSendFunction.call_count == 5:
|
||||
return MagicMock(res_data=[self.infoDomainTwoHosts])
|
||||
else:
|
||||
return MagicMock(res_data=[self.infoDomainNoHost])
|
||||
elif getattr(_request, "name", None) == "nameserverwithip.gov":
|
||||
return MagicMock(res_data=[self.infoDomainHasIP])
|
||||
elif getattr(_request, "name", None) == "namerserversubdomain.gov":
|
||||
return MagicMock(res_data=[self.infoDomainCheckHostIPCombo])
|
||||
elif getattr(_request, "name", None) == "freeman.gov":
|
||||
return MagicMock(res_data=[self.InfoDomainWithContacts])
|
||||
elif getattr(_request, "name", None) == "threenameserversDomain.gov":
|
||||
return MagicMock(res_data=[self.infoDomainThreeHosts])
|
||||
return MagicMock(res_data=[self.mockDataInfoDomain])
|
||||
|
||||
def mockSend(self, _request, cleaned):
|
||||
"""Mocks the registry.send function used inside of domain.py
|
||||
registry is imported from epplibwrapper
|
||||
returns objects that simulate what would be in a epp response
|
||||
but only relevant pieces for tests"""
|
||||
if isinstance(_request, commands.InfoDomain):
|
||||
if getattr(_request, "name", None) == "security.gov":
|
||||
return MagicMock(res_data=[self.infoDomainNoContact])
|
||||
elif getattr(_request, "name", None) == "freeman.gov":
|
||||
return MagicMock(res_data=[self.InfoDomainWithContacts])
|
||||
else:
|
||||
return MagicMock(res_data=[self.mockDataInfoDomain])
|
||||
return self._getattrInfoDomain(_request)
|
||||
|
||||
elif isinstance(_request, commands.InfoContact):
|
||||
mocked_result: info.InfoContactResultData
|
||||
|
||||
|
@ -724,6 +794,26 @@ class MockEppLib(TestCase):
|
|||
# use this for when a contact is being updated
|
||||
# sets the second send() to fail
|
||||
raise RegistryError(code=ErrorCode.OBJECT_EXISTS)
|
||||
elif isinstance(_request, commands.CreateHost):
|
||||
return MagicMock(
|
||||
res_data=[self.mockDataHostChange],
|
||||
code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
|
||||
)
|
||||
elif isinstance(_request, commands.UpdateHost):
|
||||
return MagicMock(
|
||||
res_data=[self.mockDataHostChange],
|
||||
code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
|
||||
)
|
||||
elif isinstance(_request, commands.UpdateDomain):
|
||||
return MagicMock(
|
||||
res_data=[self.mockDataHostChange],
|
||||
code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
|
||||
)
|
||||
elif isinstance(_request, commands.DeleteHost):
|
||||
return MagicMock(
|
||||
res_data=[self.mockDataHostChange],
|
||||
code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
|
||||
)
|
||||
elif (
|
||||
isinstance(_request, commands.DeleteDomain)
|
||||
and getattr(_request, "name", None) == "failDelete.gov"
|
||||
|
@ -734,6 +824,7 @@ class MockEppLib(TestCase):
|
|||
raise RegistryError(
|
||||
code=ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION
|
||||
)
|
||||
|
||||
return MagicMock(res_data=[self.mockDataInfoHosts])
|
||||
|
||||
def setUp(self):
|
||||
|
|
|
@ -32,6 +32,7 @@ class TestGroups(TestCase):
|
|||
)
|
||||
|
||||
# Test permissions for cisa_analysts_group
|
||||
# Verifies permission data migrations ran as expected.
|
||||
# Define the expected permission codenames
|
||||
expected_permissions = [
|
||||
"view_logentry",
|
||||
|
@ -39,6 +40,8 @@ class TestGroups(TestCase):
|
|||
"view_domain",
|
||||
"change_domainapplication",
|
||||
"change_domaininformation",
|
||||
"add_domaininvitation",
|
||||
"view_domaininvitation",
|
||||
"change_draftdomain",
|
||||
"analyst_access_permission",
|
||||
"change_user",
|
||||
|
|
|
@ -16,7 +16,10 @@ from registrar.models.domain_information import DomainInformation
|
|||
from registrar.models.draft_domain import DraftDomain
|
||||
from registrar.models.public_contact import PublicContact
|
||||
from registrar.models.user import User
|
||||
from registrar.utility.errors import ActionNotAllowed, NameserverError
|
||||
|
||||
from registrar.models.utility.contact_error import ContactError, ContactErrorCodes
|
||||
|
||||
from .common import MockEppLib
|
||||
from django_fsm import TransitionNotAllowed # type: ignore
|
||||
from epplibwrapper import (
|
||||
|
@ -104,6 +107,7 @@ class TestDomainCache(MockEppLib):
|
|||
}
|
||||
expectedHostsDict = {
|
||||
"name": self.mockDataInfoDomain.hosts[0],
|
||||
"addrs": self.mockDataInfoHosts.addrs,
|
||||
"cr_date": self.mockDataInfoHosts.cr_date,
|
||||
}
|
||||
|
||||
|
@ -264,7 +268,7 @@ class TestDomainCreation(MockEppLib):
|
|||
application.status = DomainApplication.SUBMITTED
|
||||
# transition to approve state
|
||||
application.approve()
|
||||
# should hav information present for this domain
|
||||
# should have information present for this domain
|
||||
domain = Domain.objects.get(name="igorville.gov")
|
||||
self.assertTrue(domain)
|
||||
self.mockedSendFunction.assert_not_called()
|
||||
|
@ -534,7 +538,7 @@ class TestRegistrantContacts(MockEppLib):
|
|||
expectedSecContact = PublicContact.get_default_security()
|
||||
expectedSecContact.domain = self.domain
|
||||
|
||||
self.domain.pendingCreate()
|
||||
self.domain.dns_needed_from_unknown()
|
||||
|
||||
self.assertEqual(self.mockedSendFunction.call_count, 8)
|
||||
self.assertEqual(PublicContact.objects.filter(domain=self.domain).count(), 4)
|
||||
|
@ -577,7 +581,8 @@ class TestRegistrantContacts(MockEppLib):
|
|||
created contact of type 'security'
|
||||
"""
|
||||
# make a security contact that is a PublicContact
|
||||
self.domain.pendingCreate() # make sure a security email already exists
|
||||
# make sure a security email already exists
|
||||
self.domain.dns_needed_from_unknown()
|
||||
expectedSecContact = PublicContact.get_default_security()
|
||||
expectedSecContact.domain = self.domain
|
||||
expectedSecContact.email = "newEmail@fake.com"
|
||||
|
@ -893,7 +898,7 @@ class TestRegistrantContacts(MockEppLib):
|
|||
self.assertEqual(self.domain_contact._cache["registrant"], expected_contact_db)
|
||||
|
||||
|
||||
class TestRegistrantNameservers(TestCase):
|
||||
class TestRegistrantNameservers(MockEppLib):
|
||||
"""Rule: Registrants may modify their nameservers"""
|
||||
|
||||
def setUp(self):
|
||||
|
@ -902,9 +907,91 @@ class TestRegistrantNameservers(TestCase):
|
|||
Given the registrant is logged in
|
||||
And the registrant is the admin on a domain
|
||||
"""
|
||||
pass
|
||||
super().setUp()
|
||||
self.nameserver1 = "ns1.my-nameserver-1.com"
|
||||
self.nameserver2 = "ns1.my-nameserver-2.com"
|
||||
self.nameserver3 = "ns1.cats-are-superior3.com"
|
||||
|
||||
self.domain, _ = Domain.objects.get_or_create(
|
||||
name="my-nameserver.gov", state=Domain.State.DNS_NEEDED
|
||||
)
|
||||
self.domainWithThreeNS, _ = Domain.objects.get_or_create(
|
||||
name="threenameserversDomain.gov", state=Domain.State.READY
|
||||
)
|
||||
|
||||
def test_get_nameserver_changes_success_deleted_vals(self):
|
||||
"""Testing only deleting and no other changes"""
|
||||
self.domain._cache["hosts"] = [
|
||||
{"name": "ns1.example.com", "addrs": None},
|
||||
{"name": "ns2.example.com", "addrs": ["1.2.3.4"]},
|
||||
]
|
||||
newChanges = [
|
||||
("ns1.example.com",),
|
||||
]
|
||||
(
|
||||
deleted_values,
|
||||
updated_values,
|
||||
new_values,
|
||||
oldNameservers,
|
||||
) = self.domain.getNameserverChanges(newChanges)
|
||||
|
||||
self.assertEqual(deleted_values, ["ns2.example.com"])
|
||||
self.assertEqual(updated_values, [])
|
||||
self.assertEqual(new_values, {})
|
||||
self.assertEqual(
|
||||
oldNameservers,
|
||||
{"ns1.example.com": None, "ns2.example.com": ["1.2.3.4"]},
|
||||
)
|
||||
|
||||
def test_get_nameserver_changes_success_updated_vals(self):
|
||||
"""Testing only updating no other changes"""
|
||||
self.domain._cache["hosts"] = [
|
||||
{"name": "ns3.my-nameserver.gov", "addrs": ["1.2.3.4"]},
|
||||
]
|
||||
newChanges = [
|
||||
("ns3.my-nameserver.gov", ["1.2.4.5"]),
|
||||
]
|
||||
(
|
||||
deleted_values,
|
||||
updated_values,
|
||||
new_values,
|
||||
oldNameservers,
|
||||
) = self.domain.getNameserverChanges(newChanges)
|
||||
|
||||
self.assertEqual(deleted_values, [])
|
||||
self.assertEqual(updated_values, [("ns3.my-nameserver.gov", ["1.2.4.5"])])
|
||||
self.assertEqual(new_values, {})
|
||||
self.assertEqual(
|
||||
oldNameservers,
|
||||
{"ns3.my-nameserver.gov": ["1.2.3.4"]},
|
||||
)
|
||||
|
||||
def test_get_nameserver_changes_success_new_vals(self):
|
||||
# Testing only creating no other changes
|
||||
self.domain._cache["hosts"] = [
|
||||
{"name": "ns1.example.com", "addrs": None},
|
||||
]
|
||||
newChanges = [
|
||||
("ns1.example.com",),
|
||||
("ns4.example.com",),
|
||||
]
|
||||
(
|
||||
deleted_values,
|
||||
updated_values,
|
||||
new_values,
|
||||
oldNameservers,
|
||||
) = self.domain.getNameserverChanges(newChanges)
|
||||
|
||||
self.assertEqual(deleted_values, [])
|
||||
self.assertEqual(updated_values, [])
|
||||
self.assertEqual(new_values, {"ns4.example.com": None})
|
||||
self.assertEqual(
|
||||
oldNameservers,
|
||||
{
|
||||
"ns1.example.com": None,
|
||||
},
|
||||
)
|
||||
|
||||
@skip("not implemented yet")
|
||||
def test_user_adds_one_nameserver(self):
|
||||
"""
|
||||
Scenario: Registrant adds a single nameserver
|
||||
|
@ -914,9 +1001,31 @@ class TestRegistrantNameservers(TestCase):
|
|||
to the registry
|
||||
And `domain.is_active` returns False
|
||||
"""
|
||||
raise
|
||||
|
||||
@skip("not implemented yet")
|
||||
# set 1 nameserver
|
||||
nameserver = "ns1.my-nameserver.com"
|
||||
self.domain.nameservers = [(nameserver,)]
|
||||
|
||||
# when we create a host, we should've updated at the same time
|
||||
created_host = commands.CreateHost(nameserver)
|
||||
update_domain_with_created = commands.UpdateDomain(
|
||||
name=self.domain.name,
|
||||
add=[common.HostObjSet([created_host.name])],
|
||||
rem=[],
|
||||
)
|
||||
|
||||
# checking if commands were sent (commands have to be sent in order)
|
||||
expectedCalls = [
|
||||
call(created_host, cleaned=True),
|
||||
call(update_domain_with_created, cleaned=True),
|
||||
]
|
||||
|
||||
self.mockedSendFunction.assert_has_calls(expectedCalls)
|
||||
|
||||
# check that status is still NOT READY
|
||||
# as you have less than 2 nameservers
|
||||
self.assertFalse(self.domain.is_active())
|
||||
|
||||
def test_user_adds_two_nameservers(self):
|
||||
"""
|
||||
Scenario: Registrant adds 2 or more nameservers, thereby activating the domain
|
||||
|
@ -926,9 +1035,36 @@ class TestRegistrantNameservers(TestCase):
|
|||
to the registry
|
||||
And `domain.is_active` returns True
|
||||
"""
|
||||
raise
|
||||
|
||||
@skip("not implemented yet")
|
||||
# set 2 nameservers
|
||||
self.domain.nameservers = [(self.nameserver1,), (self.nameserver2,)]
|
||||
|
||||
# when you create a host, you also have to update at same time
|
||||
created_host1 = commands.CreateHost(self.nameserver1)
|
||||
created_host2 = commands.CreateHost(self.nameserver2)
|
||||
|
||||
update_domain_with_created = commands.UpdateDomain(
|
||||
name=self.domain.name,
|
||||
add=[
|
||||
common.HostObjSet([created_host1.name, created_host2.name]),
|
||||
],
|
||||
rem=[],
|
||||
)
|
||||
|
||||
infoDomain = commands.InfoDomain(name="my-nameserver.gov", auth_info=None)
|
||||
# checking if commands were sent (commands have to be sent in order)
|
||||
expectedCalls = [
|
||||
call(infoDomain, cleaned=True),
|
||||
call(created_host1, cleaned=True),
|
||||
call(created_host2, cleaned=True),
|
||||
call(update_domain_with_created, cleaned=True),
|
||||
]
|
||||
|
||||
self.mockedSendFunction.assert_has_calls(expectedCalls, any_order=True)
|
||||
self.assertEqual(4, self.mockedSendFunction.call_count)
|
||||
# check that status is READY
|
||||
self.assertTrue(self.domain.is_active())
|
||||
|
||||
def test_user_adds_too_many_nameservers(self):
|
||||
"""
|
||||
Scenario: Registrant adds 14 or more nameservers
|
||||
|
@ -936,9 +1072,44 @@ class TestRegistrantNameservers(TestCase):
|
|||
When `domain.nameservers` is set to an array of length 14
|
||||
Then Domain raises a user-friendly error
|
||||
"""
|
||||
raise
|
||||
|
||||
@skip("not implemented yet")
|
||||
# set 13+ nameservers
|
||||
nameserver1 = "ns1.cats-are-superior1.com"
|
||||
nameserver2 = "ns1.cats-are-superior2.com"
|
||||
nameserver3 = "ns1.cats-are-superior3.com"
|
||||
nameserver4 = "ns1.cats-are-superior4.com"
|
||||
nameserver5 = "ns1.cats-are-superior5.com"
|
||||
nameserver6 = "ns1.cats-are-superior6.com"
|
||||
nameserver7 = "ns1.cats-are-superior7.com"
|
||||
nameserver8 = "ns1.cats-are-superior8.com"
|
||||
nameserver9 = "ns1.cats-are-superior9.com"
|
||||
nameserver10 = "ns1.cats-are-superior10.com"
|
||||
nameserver11 = "ns1.cats-are-superior11.com"
|
||||
nameserver12 = "ns1.cats-are-superior12.com"
|
||||
nameserver13 = "ns1.cats-are-superior13.com"
|
||||
nameserver14 = "ns1.cats-are-superior14.com"
|
||||
|
||||
def _get_14_nameservers():
|
||||
self.domain.nameservers = [
|
||||
(nameserver1,),
|
||||
(nameserver2,),
|
||||
(nameserver3,),
|
||||
(nameserver4,),
|
||||
(nameserver5,),
|
||||
(nameserver6,),
|
||||
(nameserver7,),
|
||||
(nameserver8,),
|
||||
(nameserver9),
|
||||
(nameserver10,),
|
||||
(nameserver11,),
|
||||
(nameserver12,),
|
||||
(nameserver13,),
|
||||
(nameserver14,),
|
||||
]
|
||||
|
||||
self.assertRaises(NameserverError, _get_14_nameservers)
|
||||
self.assertEqual(self.mockedSendFunction.call_count, 0)
|
||||
|
||||
def test_user_removes_some_nameservers(self):
|
||||
"""
|
||||
Scenario: Registrant removes some nameservers, while keeping at least 2
|
||||
|
@ -948,21 +1119,84 @@ class TestRegistrantNameservers(TestCase):
|
|||
to the registry
|
||||
And `domain.is_active` returns True
|
||||
"""
|
||||
raise
|
||||
|
||||
@skip("not implemented yet")
|
||||
# Mock is set to return 3 nameservers on infodomain
|
||||
self.domainWithThreeNS.nameservers = [(self.nameserver1,), (self.nameserver2,)]
|
||||
expectedCalls = [
|
||||
# calls info domain, and info on all hosts
|
||||
# to get past values
|
||||
# then removes the single host and updates domain
|
||||
call(
|
||||
commands.InfoDomain(name=self.domainWithThreeNS.name, auth_info=None),
|
||||
cleaned=True,
|
||||
),
|
||||
call(commands.InfoHost(name="ns1.my-nameserver-1.com"), cleaned=True),
|
||||
call(commands.InfoHost(name="ns1.my-nameserver-2.com"), cleaned=True),
|
||||
call(commands.InfoHost(name="ns1.cats-are-superior3.com"), cleaned=True),
|
||||
call(
|
||||
commands.UpdateDomain(
|
||||
name=self.domainWithThreeNS.name,
|
||||
add=[],
|
||||
rem=[common.HostObjSet(hosts=["ns1.cats-are-superior3.com"])],
|
||||
nsset=None,
|
||||
keyset=None,
|
||||
registrant=None,
|
||||
auth_info=None,
|
||||
),
|
||||
cleaned=True,
|
||||
),
|
||||
call(commands.DeleteHost(name="ns1.cats-are-superior3.com"), cleaned=True),
|
||||
]
|
||||
|
||||
self.mockedSendFunction.assert_has_calls(expectedCalls, any_order=True)
|
||||
self.assertTrue(self.domainWithThreeNS.is_active())
|
||||
|
||||
def test_user_removes_too_many_nameservers(self):
|
||||
"""
|
||||
Scenario: Registrant removes some nameservers, bringing the total to less than 2
|
||||
Given the domain has 3 nameservers
|
||||
Given the domain has 2 nameservers
|
||||
When `domain.nameservers` is set to an array containing nameserver #1
|
||||
Then `commands.UpdateDomain` and `commands.DeleteHost` is sent
|
||||
to the registry
|
||||
And `domain.is_active` returns False
|
||||
"""
|
||||
raise
|
||||
|
||||
@skip("not implemented yet")
|
||||
"""
|
||||
|
||||
self.domainWithThreeNS.nameservers = [(self.nameserver1,)]
|
||||
expectedCalls = [
|
||||
call(
|
||||
commands.InfoDomain(name=self.domainWithThreeNS.name, auth_info=None),
|
||||
cleaned=True,
|
||||
),
|
||||
call(commands.InfoHost(name="ns1.my-nameserver-1.com"), cleaned=True),
|
||||
call(commands.InfoHost(name="ns1.my-nameserver-2.com"), cleaned=True),
|
||||
call(commands.InfoHost(name="ns1.cats-are-superior3.com"), cleaned=True),
|
||||
call(commands.DeleteHost(name="ns1.my-nameserver-2.com"), cleaned=True),
|
||||
call(
|
||||
commands.UpdateDomain(
|
||||
name=self.domainWithThreeNS.name,
|
||||
add=[],
|
||||
rem=[
|
||||
common.HostObjSet(
|
||||
hosts=[
|
||||
"ns1.my-nameserver-2.com",
|
||||
"ns1.cats-are-superior3.com",
|
||||
]
|
||||
),
|
||||
],
|
||||
nsset=None,
|
||||
keyset=None,
|
||||
registrant=None,
|
||||
auth_info=None,
|
||||
),
|
||||
cleaned=True,
|
||||
),
|
||||
call(commands.DeleteHost(name="ns1.cats-are-superior3.com"), cleaned=True),
|
||||
]
|
||||
|
||||
self.mockedSendFunction.assert_has_calls(expectedCalls, any_order=True)
|
||||
self.assertFalse(self.domainWithThreeNS.is_active())
|
||||
|
||||
def test_user_replaces_nameservers(self):
|
||||
"""
|
||||
Scenario: Registrant simultaneously adds and removes some nameservers
|
||||
|
@ -973,9 +1207,60 @@ class TestRegistrantNameservers(TestCase):
|
|||
And `commands.UpdateDomain` is sent to add #4 and #5 plus remove #2 and #3
|
||||
And `commands.DeleteHost` is sent to delete #2 and #3
|
||||
"""
|
||||
raise
|
||||
self.domainWithThreeNS.nameservers = [
|
||||
(self.nameserver1,),
|
||||
("ns1.cats-are-superior1.com",),
|
||||
("ns1.cats-are-superior2.com",),
|
||||
]
|
||||
|
||||
expectedCalls = [
|
||||
call(
|
||||
commands.InfoDomain(name=self.domainWithThreeNS.name, auth_info=None),
|
||||
cleaned=True,
|
||||
),
|
||||
call(commands.InfoHost(name="ns1.my-nameserver-1.com"), cleaned=True),
|
||||
call(commands.InfoHost(name="ns1.my-nameserver-2.com"), cleaned=True),
|
||||
call(commands.InfoHost(name="ns1.cats-are-superior3.com"), cleaned=True),
|
||||
call(commands.DeleteHost(name="ns1.my-nameserver-2.com"), cleaned=True),
|
||||
call(
|
||||
commands.CreateHost(name="ns1.cats-are-superior1.com", addrs=[]),
|
||||
cleaned=True,
|
||||
),
|
||||
call(
|
||||
commands.CreateHost(name="ns1.cats-are-superior2.com", addrs=[]),
|
||||
cleaned=True,
|
||||
),
|
||||
call(
|
||||
commands.UpdateDomain(
|
||||
name=self.domainWithThreeNS.name,
|
||||
add=[
|
||||
common.HostObjSet(
|
||||
hosts=[
|
||||
"ns1.cats-are-superior1.com",
|
||||
"ns1.cats-are-superior2.com",
|
||||
]
|
||||
),
|
||||
],
|
||||
rem=[
|
||||
common.HostObjSet(
|
||||
hosts=[
|
||||
"ns1.my-nameserver-2.com",
|
||||
"ns1.cats-are-superior3.com",
|
||||
]
|
||||
),
|
||||
],
|
||||
nsset=None,
|
||||
keyset=None,
|
||||
registrant=None,
|
||||
auth_info=None,
|
||||
),
|
||||
cleaned=True,
|
||||
),
|
||||
]
|
||||
|
||||
self.mockedSendFunction.assert_has_calls(expectedCalls, any_order=True)
|
||||
self.assertTrue(self.domainWithThreeNS.is_active())
|
||||
|
||||
@skip("not implemented yet")
|
||||
def test_user_cannot_add_subordinate_without_ip(self):
|
||||
"""
|
||||
Scenario: Registrant adds a nameserver which is a subdomain of their .gov
|
||||
|
@ -984,9 +1269,12 @@ class TestRegistrantNameservers(TestCase):
|
|||
with a subdomain of the domain and no IP addresses
|
||||
Then Domain raises a user-friendly error
|
||||
"""
|
||||
raise
|
||||
|
||||
@skip("not implemented yet")
|
||||
dotgovnameserver = "my-nameserver.gov"
|
||||
|
||||
with self.assertRaises(NameserverError):
|
||||
self.domain.nameservers = [(dotgovnameserver,)]
|
||||
|
||||
def test_user_updates_ips(self):
|
||||
"""
|
||||
Scenario: Registrant changes IP addresses for a nameserver
|
||||
|
@ -996,9 +1284,53 @@ class TestRegistrantNameservers(TestCase):
|
|||
with a different IP address(es)
|
||||
Then `commands.UpdateHost` is sent to the registry
|
||||
"""
|
||||
raise
|
||||
domain, _ = Domain.objects.get_or_create(
|
||||
name="nameserverwithip.gov", state=Domain.State.READY
|
||||
)
|
||||
domain.nameservers = [
|
||||
("ns1.nameserverwithip.gov", ["2.3.4.5", "1.2.3.4"]),
|
||||
(
|
||||
"ns2.nameserverwithip.gov",
|
||||
["1.2.3.4", "2.3.4.5", "2001:0db8:85a3:0000:0000:8a2e:0370:7334"],
|
||||
),
|
||||
("ns3.nameserverwithip.gov", ["2.3.4.5"]),
|
||||
]
|
||||
|
||||
expectedCalls = [
|
||||
call(
|
||||
commands.InfoDomain(name="nameserverwithip.gov", auth_info=None),
|
||||
cleaned=True,
|
||||
),
|
||||
call(commands.InfoHost(name="ns1.nameserverwithip.gov"), cleaned=True),
|
||||
call(commands.InfoHost(name="ns2.nameserverwithip.gov"), cleaned=True),
|
||||
call(commands.InfoHost(name="ns3.nameserverwithip.gov"), cleaned=True),
|
||||
call(
|
||||
commands.UpdateHost(
|
||||
name="ns2.nameserverwithip.gov",
|
||||
add=[
|
||||
common.Ip(
|
||||
addr="2001:0db8:85a3:0000:0000:8a2e:0370:7334", ip="v6"
|
||||
)
|
||||
],
|
||||
rem=[],
|
||||
chg=None,
|
||||
),
|
||||
cleaned=True,
|
||||
),
|
||||
call(
|
||||
commands.UpdateHost(
|
||||
name="ns3.nameserverwithip.gov",
|
||||
add=[],
|
||||
rem=[common.Ip(addr="1.2.3.4", ip=None)],
|
||||
chg=None,
|
||||
),
|
||||
cleaned=True,
|
||||
),
|
||||
]
|
||||
|
||||
self.mockedSendFunction.assert_has_calls(expectedCalls, any_order=True)
|
||||
self.assertTrue(domain.is_active())
|
||||
|
||||
@skip("not implemented yet")
|
||||
def test_user_cannot_add_non_subordinate_with_ip(self):
|
||||
"""
|
||||
Scenario: Registrant adds a nameserver which is NOT a subdomain of their .gov
|
||||
|
@ -1007,9 +1339,11 @@ class TestRegistrantNameservers(TestCase):
|
|||
which is not a subdomain of the domain and has IP addresses
|
||||
Then Domain raises a user-friendly error
|
||||
"""
|
||||
raise
|
||||
dotgovnameserver = "mynameserverdotgov.gov"
|
||||
|
||||
with self.assertRaises(NameserverError):
|
||||
self.domain.nameservers = [(dotgovnameserver, ["1.2.3"])]
|
||||
|
||||
@skip("not implemented yet")
|
||||
def test_nameservers_are_idempotent(self):
|
||||
"""
|
||||
Scenario: Registrant adds a set of nameservers twice, due to a UI glitch
|
||||
|
@ -1017,10 +1351,68 @@ class TestRegistrantNameservers(TestCase):
|
|||
to the registry twice with identical data
|
||||
Then no errors are raised in Domain
|
||||
"""
|
||||
# implementation note: this requires seeing what happens when these are actually
|
||||
# sent like this, and then implementing appropriate mocks for any errors the
|
||||
# registry normally sends in this case
|
||||
raise
|
||||
|
||||
# Checking that it doesn't create or update even if out of order
|
||||
self.domainWithThreeNS.nameservers = [
|
||||
(self.nameserver3,),
|
||||
(self.nameserver1,),
|
||||
(self.nameserver2,),
|
||||
]
|
||||
|
||||
expectedCalls = [
|
||||
call(
|
||||
commands.InfoDomain(name=self.domainWithThreeNS.name, auth_info=None),
|
||||
cleaned=True,
|
||||
),
|
||||
call(commands.InfoHost(name="ns1.my-nameserver-1.com"), cleaned=True),
|
||||
call(commands.InfoHost(name="ns1.my-nameserver-2.com"), cleaned=True),
|
||||
call(commands.InfoHost(name="ns1.cats-are-superior3.com"), cleaned=True),
|
||||
]
|
||||
|
||||
self.mockedSendFunction.assert_has_calls(expectedCalls, any_order=True)
|
||||
self.assertEqual(self.mockedSendFunction.call_count, 4)
|
||||
|
||||
def test_is_subdomain_with_no_ip(self):
|
||||
domain, _ = Domain.objects.get_or_create(
|
||||
name="nameserversubdomain.gov", state=Domain.State.READY
|
||||
)
|
||||
|
||||
with self.assertRaises(NameserverError):
|
||||
domain.nameservers = [
|
||||
("ns1.nameserversubdomain.gov",),
|
||||
("ns2.nameserversubdomain.gov",),
|
||||
]
|
||||
|
||||
def test_not_subdomain_but_has_ip(self):
|
||||
domain, _ = Domain.objects.get_or_create(
|
||||
name="nameserversubdomain.gov", state=Domain.State.READY
|
||||
)
|
||||
|
||||
with self.assertRaises(NameserverError):
|
||||
domain.nameservers = [
|
||||
("ns1.cats-da-best.gov", ["1.2.3.4"]),
|
||||
("ns2.cats-da-best.gov", ["2.3.4.5"]),
|
||||
]
|
||||
|
||||
def test_is_subdomain_but_ip_addr_not_valid(self):
|
||||
domain, _ = Domain.objects.get_or_create(
|
||||
name="nameserversubdomain.gov", state=Domain.State.READY
|
||||
)
|
||||
|
||||
with self.assertRaises(NameserverError):
|
||||
domain.nameservers = [
|
||||
("ns1.nameserversubdomain.gov", ["1.2.3"]),
|
||||
("ns2.nameserversubdomain.gov", ["2.3.4"]),
|
||||
]
|
||||
|
||||
def test_setting_not_allowed(self):
|
||||
"""Scenario: A domain state is not Ready or DNS Needed
|
||||
then setting nameservers is not allowed"""
|
||||
domain, _ = Domain.objects.get_or_create(
|
||||
name="onholdDomain.gov", state=Domain.State.ON_HOLD
|
||||
)
|
||||
with self.assertRaises(ActionNotAllowed):
|
||||
domain.nameservers = [self.nameserver1, self.nameserver2]
|
||||
|
||||
@skip("not implemented yet")
|
||||
def test_update_is_unsuccessful(self):
|
||||
|
@ -1028,8 +1420,22 @@ class TestRegistrantNameservers(TestCase):
|
|||
Scenario: An update to the nameservers is unsuccessful
|
||||
When an error is returned from epplibwrapper
|
||||
Then a user-friendly error message is returned for displaying on the web
|
||||
|
||||
Note: TODO 433 -- we will perform correct error handling and complete
|
||||
this ticket. We want to raise an error for update/create/delete, but
|
||||
don't want to lose user info (and exit out too early)
|
||||
"""
|
||||
raise
|
||||
|
||||
domain, _ = Domain.objects.get_or_create(
|
||||
name="failednameserver.gov", state=Domain.State.READY
|
||||
)
|
||||
|
||||
with self.assertRaises(RegistryError):
|
||||
domain.nameservers = [("ns1.failednameserver.gov", ["4.5.6"])]
|
||||
|
||||
def tearDown(self):
|
||||
Domain.objects.all().delete()
|
||||
return super().tearDown()
|
||||
|
||||
|
||||
class TestRegistrantDNSSEC(MockEppLib):
|
||||
|
|
46
src/registrar/tests/test_nameserver_error.py
Normal file
46
src/registrar/tests/test_nameserver_error.py
Normal file
|
@ -0,0 +1,46 @@
|
|||
from django.test import TestCase
|
||||
|
||||
from registrar.utility.errors import (
|
||||
NameserverError,
|
||||
NameserverErrorCodes as nsErrorCodes,
|
||||
)
|
||||
|
||||
|
||||
class TestNameserverError(TestCase):
|
||||
def test_with_no_ip(self):
|
||||
"""Test NameserverError when no ip address is passed"""
|
||||
nameserver = "nameserver val"
|
||||
expected = (
|
||||
f"Nameserver {nameserver} needs to have an "
|
||||
"IP address because it is a subdomain"
|
||||
)
|
||||
|
||||
nsException = NameserverError(
|
||||
code=nsErrorCodes.MISSING_IP, nameserver=nameserver
|
||||
)
|
||||
self.assertEqual(nsException.message, expected)
|
||||
self.assertEqual(nsException.code, nsErrorCodes.MISSING_IP)
|
||||
|
||||
def test_with_only_code(self):
|
||||
"""Test NameserverError when no ip address
|
||||
and no nameserver is passed"""
|
||||
nameserver = "nameserver val"
|
||||
expected = "Too many hosts provided, you may not have more than 13 nameservers."
|
||||
|
||||
nsException = NameserverError(
|
||||
code=nsErrorCodes.TOO_MANY_HOSTS, nameserver=nameserver
|
||||
)
|
||||
self.assertEqual(nsException.message, expected)
|
||||
self.assertEqual(nsException.code, nsErrorCodes.TOO_MANY_HOSTS)
|
||||
|
||||
def test_with_ip_nameserver(self):
|
||||
"""Test NameserverError when ip and nameserver are passed"""
|
||||
ip = "ip val"
|
||||
nameserver = "nameserver val"
|
||||
|
||||
expected = f"Nameserver {nameserver} has an invalid IP address: {ip}"
|
||||
nsException = NameserverError(
|
||||
code=nsErrorCodes.INVALID_IP, nameserver=nameserver, ip=ip
|
||||
)
|
||||
self.assertEqual(nsException.message, expected)
|
||||
self.assertEqual(nsException.code, nsErrorCodes.INVALID_IP)
|
|
@ -1,3 +1,6 @@
|
|||
from enum import IntEnum
|
||||
|
||||
|
||||
class BlankValueError(ValueError):
|
||||
pass
|
||||
|
||||
|
@ -8,3 +11,65 @@ class ExtraDotsError(ValueError):
|
|||
|
||||
class DomainUnavailableError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class ActionNotAllowed(Exception):
|
||||
"""User accessed an action that is not
|
||||
allowed by the current state"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class NameserverErrorCodes(IntEnum):
|
||||
"""Used in the NameserverError class for
|
||||
error mapping.
|
||||
Overview of nameserver error codes:
|
||||
- 1 MISSING_IP ip address is missing for a nameserver
|
||||
- 2 GLUE_RECORD_NOT_ALLOWED a host has a nameserver
|
||||
value but is not a subdomain
|
||||
- 3 INVALID_IP invalid ip address format or invalid version
|
||||
- 4 TOO_MANY_HOSTS more than the max allowed host values
|
||||
"""
|
||||
|
||||
MISSING_IP = 1
|
||||
GLUE_RECORD_NOT_ALLOWED = 2
|
||||
INVALID_IP = 3
|
||||
TOO_MANY_HOSTS = 4
|
||||
UNABLE_TO_UPDATE_DOMAIN = 5
|
||||
|
||||
|
||||
class NameserverError(Exception):
|
||||
"""
|
||||
NameserverError class used to raise exceptions on
|
||||
the nameserver getter
|
||||
"""
|
||||
|
||||
_error_mapping = {
|
||||
NameserverErrorCodes.MISSING_IP: "Nameserver {} needs to have an "
|
||||
"IP address because it is a subdomain",
|
||||
NameserverErrorCodes.GLUE_RECORD_NOT_ALLOWED: "Nameserver {} cannot be linked "
|
||||
"because it is not a subdomain",
|
||||
NameserverErrorCodes.INVALID_IP: "Nameserver {} has an invalid IP address: {}",
|
||||
NameserverErrorCodes.TOO_MANY_HOSTS: (
|
||||
"Too many hosts provided, you may not have more than 13 nameservers."
|
||||
),
|
||||
NameserverErrorCodes.UNABLE_TO_UPDATE_DOMAIN: (
|
||||
"Unable to update domain, changes were not applied."
|
||||
"Check logs as a Registry Error is the likely cause"
|
||||
),
|
||||
}
|
||||
|
||||
def __init__(self, *args, code=None, nameserver=None, ip=None, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.code = code
|
||||
if self.code in self._error_mapping:
|
||||
self.message = self._error_mapping.get(self.code)
|
||||
if nameserver is not None and ip is not None:
|
||||
self.message = self.message.format(str(nameserver), str(ip))
|
||||
elif nameserver is not None:
|
||||
self.message = self.message.format(str(nameserver))
|
||||
elif ip is not None:
|
||||
self.message = self.message.format(str(ip))
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.message}"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue