Merge branch 'main' into za/1102-epp-contact-disclose-update

This commit is contained in:
zandercymatics 2023-10-19 10:49:59 -06:00
commit c30428b6f8
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
70 changed files with 5603 additions and 1331 deletions

View file

@ -1,11 +1,14 @@
from itertools import zip_longest
import logging
import ipaddress
import re
from datetime import date
from string import digits
from typing import Optional
from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore
from django.db import models
from typing import Any
from epplibwrapper import (
CLIENT as registry,
commands,
@ -15,7 +18,15 @@ from epplibwrapper import (
RegistryError,
ErrorCode,
)
from registrar.models.utility.contact_error import ContactError
from registrar.utility.errors import (
ActionNotAllowed,
NameserverError,
NameserverErrorCodes as nsErrorCodes,
)
from registrar.models.utility.contact_error import ContactError, ContactErrorCodes
from .utility.domain_field import DomainField
from .utility.domain_helper import DomainHelper
@ -218,13 +229,13 @@ class Domain(TimeStampedModel, DomainHelper):
raise NotImplementedError()
@Cache
def nameservers(self) -> list[tuple[str]]:
def nameservers(self) -> list[tuple[str, list]]:
"""
Get or set a complete list of nameservers for this domain.
Hosts are provided as a list of tuples, e.g.
[("ns1.example.com",), ("ns1.example.gov", "0.0.0.0")]
[("ns1.example.com",), ("ns1.example.gov", ["0.0.0.0"])]
Subordinate hosts (something.your-domain.gov) MUST have IP addresses,
while non-subordinate hosts MUST NOT.
@ -232,42 +243,23 @@ class Domain(TimeStampedModel, DomainHelper):
try:
hosts = self._get_property("hosts")
except Exception as err:
# Don't throw error as this is normal for a new domain
# TODO - 433 error handling ticket should address this
# Do not raise error when missing nameservers
# this is a standard occurence when a domain
# is first created
logger.info("Domain is missing nameservers %s" % err)
return []
# TODO-687 fix this return value
hostList = []
for host in hosts:
# TODO - this should actually have a second tuple value with the ip address
# ignored because uncertain if we will even have a way to display mult.
# and adresses can be a list of mult address
hostList.append((host["name"],))
hostList.append((host["name"], host["addrs"]))
return hostList
def _check_host(self, hostnames: list[str]):
"""check if host is available, True if available
returns boolean"""
checkCommand = commands.CheckHost(hostnames)
try:
response = registry.send(checkCommand, cleaned=True)
return response.res_data[0].avail
except RegistryError as err:
logger.warning(
"Couldn't check hosts %s. Errorcode was %s, error was %s",
hostnames,
err.code,
err,
)
return False
def _create_host(self, host, addrs):
"""Call _check_host first before using this function,
This creates the host object in the registry
"""Creates the host object in the registry
doesn't add the created host to the domain
returns ErrorCode (int)"""
logger.info("Creating host")
if addrs is not None:
addresses = [epp.Ip(addr=addr) for addr in addrs]
request = commands.CreateHost(name=host, addrs=addresses)
@ -282,76 +274,381 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error("Error _create_host, code was %s error was %s" % (e.code, e))
return e.code
def _convert_list_to_dict(self, listToConvert: list[tuple[str, list]]):
"""converts a list of hosts into a dictionary
Args:
list[tuple[str, list]]: such as [("123",["1","2","3"])]
This is the list of hosts to convert
returns:
convertDict (dict(str,list))- such as{"123":["1","2","3"]}"""
newDict: dict[str, Any] = {}
for tup in listToConvert:
if len(tup) == 1:
newDict[tup[0]] = None
elif len(tup) == 2:
newDict[tup[0]] = tup[1]
return newDict
def isSubdomain(self, nameserver: str):
"""Returns boolean if the domain name is found in the argument passed"""
subdomain_pattern = r"([\w-]+\.)*"
full_pattern = subdomain_pattern + self.name
regex = re.compile(full_pattern)
return bool(regex.match(nameserver))
def checkHostIPCombo(self, nameserver: str, ip: list[str]):
"""Checks the parameters past for a valid combination
raises error if:
- nameserver is a subdomain but is missing ip
- nameserver is not a subdomain but has ip
- nameserver is a subdomain but an ip passed is invalid
Args:
hostname (str)- nameserver or subdomain
ip (list[str])-list of ip strings
Throws:
NameserverError (if exception hit)
Returns:
None"""
if self.isSubdomain(nameserver) and (ip is None or ip == []):
raise NameserverError(code=nsErrorCodes.MISSING_IP, nameserver=nameserver)
elif not self.isSubdomain(nameserver) and (ip is not None and ip != []):
raise NameserverError(
code=nsErrorCodes.GLUE_RECORD_NOT_ALLOWED, nameserver=nameserver, ip=ip
)
elif ip is not None and ip != []:
for addr in ip:
if not self._valid_ip_addr(addr):
raise NameserverError(
code=nsErrorCodes.INVALID_IP, nameserver=nameserver, ip=ip
)
return None
def _valid_ip_addr(self, ipToTest: str):
"""returns boolean if valid ip address string
We currently only accept v4 or v6 ips
returns:
isValid (boolean)-True for valid ip address"""
try:
ip = ipaddress.ip_address(ipToTest)
return ip.version == 6 or ip.version == 4
except ValueError:
return False
def getNameserverChanges(
self, hosts: list[tuple[str, list]]
) -> tuple[list, list, dict, dict]:
"""
calls self.nameserver, it should pull from cache but may result
in an epp call
Args:
hosts: list[tuple[str, list]] such as [("123",["1","2","3"])]
Throws:
NameserverError (if exception hit)
Returns:
tuple[list, list, dict, dict]
These four tuple values as follows:
deleted_values: list[str]
updated_values: list[str]
new_values: dict(str,list)
prevHostDict: dict(str,list)"""
oldNameservers = self.nameservers
previousHostDict = self._convert_list_to_dict(oldNameservers)
newHostDict = self._convert_list_to_dict(hosts)
deleted_values = []
# TODO-currently a list of tuples, why not dict? for consistency
updated_values = []
new_values = {}
for prevHost in previousHostDict:
addrs = previousHostDict[prevHost]
# get deleted values-which are values in previous nameserver list
# but are not in the list of new host values
if prevHost not in newHostDict:
deleted_values.append(prevHost)
# if the host exists in both, check if the addresses changed
else:
# TODO - host is being updated when previous was None+new is empty list
# add check here
if newHostDict[prevHost] is not None and set(
newHostDict[prevHost]
) != set(addrs):
self.checkHostIPCombo(nameserver=prevHost, ip=newHostDict[prevHost])
updated_values.append((prevHost, newHostDict[prevHost]))
new_values = {
key: newHostDict.get(key)
for key in newHostDict
if key not in previousHostDict and key.strip() != ""
}
for nameserver, ip in new_values.items():
self.checkHostIPCombo(nameserver=nameserver, ip=ip)
return (deleted_values, updated_values, new_values, previousHostDict)
def _update_host_values(self, updated_values, oldNameservers):
for hostTuple in updated_values:
updated_response_code = self._update_host(
hostTuple[0], hostTuple[1], oldNameservers.get(hostTuple[0])
)
if updated_response_code not in [
ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
ErrorCode.OBJECT_EXISTS,
]:
logger.warning(
"Could not update host %s. Error code was: %s "
% (hostTuple[0], updated_response_code)
)
def createNewHostList(self, new_values: dict):
"""convert the dictionary of new values to a list of HostObjSet
for use in the UpdateDomain epp message
Args:
new_values: dict(str,list)- dict of {nameserver:ips} to add to domain
Returns:
tuple [list[epp.HostObjSet], int]
list[epp.HostObjSet]-epp object for use in the UpdateDomain epp message
defaults to empty list
int-number of items being created default 0
"""
hostStringList = []
for key, value in new_values.items():
createdCode = self._create_host(
host=key, addrs=value
) # creates in registry
if (
createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY
or createdCode == ErrorCode.OBJECT_EXISTS
):
hostStringList.append(key)
if hostStringList == []:
return [], 0
addToDomainObject = epp.HostObjSet(hosts=hostStringList)
return [addToDomainObject], len(hostStringList)
def createDeleteHostList(self, hostsToDelete: list[str]):
"""
Args:
hostsToDelete (list[str])- list of nameserver/host names to remove
Returns:
tuple [list[epp.HostObjSet], int]
list[epp.HostObjSet]-epp object for use in the UpdateDomain epp message
defaults to empty list
int-number of items being created default 0
"""
deleteStrList = []
for nameserver in hostsToDelete:
deleteStrList.append(nameserver)
if deleteStrList == []:
return [], 0
deleteObj = epp.HostObjSet(hosts=hostsToDelete)
return [deleteObj], len(deleteStrList)
@Cache
def dnssecdata(self) -> extensions.DNSSECExtension:
return self._get_property("dnssecdata")
def dnssecdata(self) -> Optional[extensions.DNSSECExtension]:
"""
Get a complete list of dnssecdata extensions for this domain.
dnssecdata are provided as a list of DNSSECExtension objects.
A DNSSECExtension object includes:
maxSigLife: Optional[int]
dsData: Optional[Sequence[DSData]]
keyData: Optional[Sequence[DNSSECKeyData]]
"""
try:
return self._get_property("dnssecdata")
except Exception as err:
# Don't throw error as this is normal for a new domain
logger.info("Domain does not have dnssec data defined %s" % err)
return None
def getDnssecdataChanges(
self, _dnssecdata: Optional[extensions.DNSSECExtension]
) -> tuple[dict, dict]:
"""
calls self.dnssecdata, it should pull from cache but may result
in an epp call
returns tuple of 2 values as follows:
addExtension: dict
remExtension: dict
addExtension includes all dsData or keyData to be added
remExtension includes all dsData or keyData to be removed
method operates on dsData OR keyData, never a mix of the two;
operates based on which is present in _dnssecdata;
if neither is present, addExtension will be empty dict, and
remExtension will be all existing dnssecdata to be deleted
"""
oldDnssecdata = self.dnssecdata
addDnssecdata: dict = {}
remDnssecdata: dict = {}
if _dnssecdata and _dnssecdata.dsData is not None:
# initialize addDnssecdata and remDnssecdata for dsData
addDnssecdata["dsData"] = _dnssecdata.dsData
if oldDnssecdata and len(oldDnssecdata.dsData) > 0:
# if existing dsData not in new dsData, mark for removal
dsDataForRemoval = [
dsData
for dsData in oldDnssecdata.dsData
if dsData not in _dnssecdata.dsData
]
if len(dsDataForRemoval) > 0:
remDnssecdata["dsData"] = dsDataForRemoval
# if new dsData not in existing dsData, mark for add
dsDataForAdd = [
dsData
for dsData in _dnssecdata.dsData
if dsData not in oldDnssecdata.dsData
]
if len(dsDataForAdd) > 0:
addDnssecdata["dsData"] = dsDataForAdd
else:
addDnssecdata["dsData"] = None
elif _dnssecdata and _dnssecdata.keyData is not None:
# initialize addDnssecdata and remDnssecdata for keyData
addDnssecdata["keyData"] = _dnssecdata.keyData
if oldDnssecdata and len(oldDnssecdata.keyData) > 0:
# if existing keyData not in new keyData, mark for removal
keyDataForRemoval = [
keyData
for keyData in oldDnssecdata.keyData
if keyData not in _dnssecdata.keyData
]
if len(keyDataForRemoval) > 0:
remDnssecdata["keyData"] = keyDataForRemoval
# if new keyData not in existing keyData, mark for add
keyDataForAdd = [
keyData
for keyData in _dnssecdata.keyData
if keyData not in oldDnssecdata.keyData
]
if len(keyDataForAdd) > 0:
addDnssecdata["keyData"] = keyDataForAdd
else:
# there are no new dsData or keyData, remove all
remDnssecdata["dsData"] = getattr(oldDnssecdata, "dsData", None)
remDnssecdata["keyData"] = getattr(oldDnssecdata, "keyData", None)
return addDnssecdata, remDnssecdata
@dnssecdata.setter # type: ignore
def dnssecdata(self, _dnssecdata: extensions.DNSSECExtension):
updateParams = {
"maxSigLife": _dnssecdata.get("maxSigLife", None),
"dsData": _dnssecdata.get("dsData", None),
"keyData": _dnssecdata.get("keyData", None),
"remAllDsKeyData": True,
def dnssecdata(self, _dnssecdata: Optional[extensions.DNSSECExtension]):
_addDnssecdata, _remDnssecdata = self.getDnssecdataChanges(_dnssecdata)
addParams = {
"maxSigLife": _addDnssecdata.get("maxSigLife", None),
"dsData": _addDnssecdata.get("dsData", None),
"keyData": _addDnssecdata.get("keyData", None),
}
request = commands.UpdateDomain(name=self.name)
extension = commands.UpdateDomainDNSSECExtension(**updateParams)
request.add_extension(extension)
remParams = {
"maxSigLife": _remDnssecdata.get("maxSigLife", None),
"remDsData": _remDnssecdata.get("dsData", None),
"remKeyData": _remDnssecdata.get("keyData", None),
}
addRequest = commands.UpdateDomain(name=self.name)
addExtension = commands.UpdateDomainDNSSECExtension(**addParams)
addRequest.add_extension(addExtension)
remRequest = commands.UpdateDomain(name=self.name)
remExtension = commands.UpdateDomainDNSSECExtension(**remParams)
remRequest.add_extension(remExtension)
try:
registry.send(request, cleaned=True)
if (
"dsData" in _addDnssecdata
and _addDnssecdata["dsData"] is not None
or "keyData" in _addDnssecdata
and _addDnssecdata["keyData"] is not None
):
registry.send(addRequest, cleaned=True)
if (
"dsData" in _remDnssecdata
and _remDnssecdata["dsData"] is not None
or "keyData" in _remDnssecdata
and _remDnssecdata["keyData"] is not None
):
registry.send(remRequest, cleaned=True)
except RegistryError as e:
logger.error("Error adding DNSSEC, code was %s error was %s" % (e.code, e))
logger.error(
"Error updating DNSSEC, code was %s error was %s" % (e.code, e)
)
raise e
@nameservers.setter # type: ignore
def nameservers(self, hosts: list[tuple[str]]):
"""host should be a tuple of type str, str,... where the elements are
def nameservers(self, hosts: list[tuple[str, list]]):
"""Host should be a tuple of type str, str,... where the elements are
Fully qualified host name, addresses associated with the host
example: [(ns1.okay.gov, 127.0.0.1, others ips)]"""
# TODO: ticket #848 finish this implementation
# must delete nameservers as well or update
# ip version checking may need to be added in a different ticket
example: [(ns1.okay.gov, [127.0.0.1, others ips])]"""
if len(hosts) > 13:
raise ValueError(
"Too many hosts provided, you may not have more than 13 nameservers."
)
raise NameserverError(code=nsErrorCodes.TOO_MANY_HOSTS)
if self.state not in [self.State.DNS_NEEDED, self.State.READY]:
raise ActionNotAllowed("Nameservers can not be " "set in the current state")
logger.info("Setting nameservers")
logger.info(hosts)
for hostTuple in hosts:
host = hostTuple[0]
addrs = None
if len(hostTuple) > 1:
addrs = hostTuple[1:]
avail = self._check_host([host])
if avail:
createdCode = self._create_host(host=host, addrs=addrs)
# update the domain obj
if createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
# add host to domain
request = commands.UpdateDomain(
name=self.name, add=[epp.HostObjSet([host])]
)
# get the changes made by user and old nameserver values
(
deleted_values,
updated_values,
new_values,
oldNameservers,
) = self.getNameserverChanges(hosts=hosts)
try:
registry.send(request, cleaned=True)
except RegistryError as e:
logger.error(
"Error adding nameserver, code was %s error was %s"
% (e.code, e)
)
_ = self._update_host_values(
updated_values, oldNameservers
) # returns nothing, just need to be run and errors
addToDomainList, addToDomainCount = self.createNewHostList(new_values)
deleteHostList, deleteCount = self.createDeleteHostList(deleted_values)
responseCode = self.addAndRemoveHostsFromDomain(
hostsToAdd=addToDomainList, hostsToDelete=deleteHostList
)
try:
self.ready()
self.save()
except Exception as err:
logger.info(
"nameserver setter checked for create state "
"and it did not succeed. Error: %s" % err
)
# TODO - handle removed nameservers here will need to change the state
# then go back to DNS_NEEDED
# if unable to update domain raise error and stop
if responseCode != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
raise NameserverError(code=nsErrorCodes.UNABLE_TO_UPDATE_DOMAIN)
successTotalNameservers = len(oldNameservers) - deleteCount + addToDomainCount
self._delete_hosts_if_not_used(hostsToDelete=deleted_values)
if successTotalNameservers < 2:
try:
self.dns_needed()
self.save()
except Exception as err:
logger.info(
"nameserver setter checked for dns_needed state "
"and it did not succeed. Warning: %s" % err
)
elif successTotalNameservers >= 2 and successTotalNameservers <= 13:
try:
self.ready()
self.save()
except Exception as err:
logger.info(
"nameserver setter checked for create state "
"and it did not succeed. Warning: %s" % err
)
@Cache
def statuses(self) -> list[str]:
@ -520,7 +817,7 @@ class Domain(TimeStampedModel, DomainHelper):
and errorCode != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY
):
# TODO- ticket #433 look here for error handling
raise Exception("Unable to add contact to registry")
raise RegistryError(code=errorCode)
# contact doesn't exist on the domain yet
logger.info("_set_singleton_contact()-> contact has been added to the registry")
@ -625,7 +922,10 @@ class Domain(TimeStampedModel, DomainHelper):
def get_security_email(self):
logger.info("get_security_email-> getting the contact ")
secContact = self.security_contact
return secContact.email
if secContact is not None:
return secContact.email
else:
return None
def clientHoldStatus(self):
return epp.Status(state=self.Status.CLIENT_HOLD, description="", lang="en")
@ -698,10 +998,10 @@ class Domain(TimeStampedModel, DomainHelper):
return None
if contact_type is None:
raise ContactError("contact_type is None")
raise ContactError(code=ContactErrorCodes.CONTACT_TYPE_NONE)
if contact_id is None:
raise ContactError("contact_id is None")
raise ContactError(code=ContactErrorCodes.CONTACT_ID_NONE)
# Since contact_id is registry_id,
# check that its the right length
@ -710,14 +1010,10 @@ class Domain(TimeStampedModel, DomainHelper):
contact_id_length > PublicContact.get_max_id_length()
or contact_id_length < 1
):
raise ContactError(
"contact_id is of invalid length. "
"Cannot exceed 16 characters, "
f"got {contact_id} with a length of {contact_id_length}"
)
raise ContactError(code=ContactErrorCodes.CONTACT_ID_INVALID_LENGTH)
if not isinstance(contact, eppInfo.InfoContactResultData):
raise ContactError("Contact must be of type InfoContactResultData")
raise ContactError(code=ContactErrorCodes.CONTACT_INVALID_TYPE)
auth_info = contact.auth_info
postal_info = contact.postal_info
@ -881,8 +1177,8 @@ class Domain(TimeStampedModel, DomainHelper):
return self._handle_registrant_contact(desired_contact)
_registry_id: str
if contact_type in contacts:
_registry_id: str = ""
if contacts is not None and contact_type in contacts:
_registry_id = contacts.get(contact_type)
desired = PublicContact.objects.filter(
@ -948,7 +1244,6 @@ class Domain(TimeStampedModel, DomainHelper):
count = 0
while not exitEarly and count < 3:
try:
logger.info("Getting domain info from epp")
req = commands.InfoDomain(name=self.name)
domainInfoResponse = registry.send(req, cleaned=True)
exitEarly = True
@ -964,7 +1259,7 @@ class Domain(TimeStampedModel, DomainHelper):
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST:
# avoid infinite loop
already_tried_to_create = True
self.pendingCreate()
self.dns_needed_from_unknown()
self.save()
else:
logger.error(e)
@ -978,7 +1273,7 @@ class Domain(TimeStampedModel, DomainHelper):
return registrant.registry_id
@transition(field="state", source=State.UNKNOWN, target=State.DNS_NEEDED)
def pendingCreate(self):
def dns_needed_from_unknown(self):
logger.info("Changing to dns_needed")
registrantID = self.addRegistrant()
@ -1011,20 +1306,29 @@ class Domain(TimeStampedModel, DomainHelper):
@transition(
field="state", source=[State.READY, State.ON_HOLD], target=State.ON_HOLD
)
def place_client_hold(self):
"""place a clienthold on a domain (no longer should resolve)"""
def place_client_hold(self, ignoreEPP=False):
"""place a clienthold on a domain (no longer should resolve)
ignoreEPP (boolean) - set to true to by-pass EPP (used for transition domains)
"""
# TODO - ensure all requirements for client hold are made here
# (check prohibited statuses)
logger.info("clientHold()-> inside clientHold")
self._place_client_hold()
# In order to allow transition domains to by-pass EPP calls,
# include this ignoreEPP flag
if not ignoreEPP:
self._place_client_hold()
# TODO -on the client hold ticket any additional error handling here
@transition(field="state", source=[State.READY, State.ON_HOLD], target=State.READY)
def revert_client_hold(self):
"""undo a clienthold placed on a domain"""
def revert_client_hold(self, ignoreEPP=False):
"""undo a clienthold placed on a domain
ignoreEPP (boolean) - set to true to by-pass EPP (used for transition domains)
"""
logger.info("clientHold()-> inside clientHold")
self._remove_client_hold()
if not ignoreEPP:
self._remove_client_hold()
# TODO -on the client hold ticket any additional error handling here
@transition(
@ -1054,26 +1358,54 @@ class Domain(TimeStampedModel, DomainHelper):
else:
self._invalidate_cache()
# def is_dns_needed(self):
# """Commented out and kept in the codebase
# as this call should be made, but adds
# a lot of processing time
# when EPP calling is made more efficient
# this should be added back in
# The goal is to double check that
# the nameservers we set are in fact
# on the registry
# """
# self._invalidate_cache()
# nameserverList = self.nameservers
# return len(nameserverList) < 2
# def dns_not_needed(self):
# return not self.is_dns_needed()
@transition(
field="state",
source=[State.DNS_NEEDED],
target=State.READY,
# conditions=[dns_not_needed]
)
def ready(self):
"""Transition to the ready state
domain should have nameservers and all contacts
and now should be considered live on a domain
"""
# TODO - in nameservers tickets 848 and 562
# check here if updates need to be made
# consider adding these checks as constraints
# within the transistion itself
nameserverList = self.nameservers
logger.info("Changing to ready state")
if len(nameserverList) < 2 or len(nameserverList) > 13:
raise ValueError("Not ready to become created, cannot transition yet")
logger.info("able to transition to ready state")
@transition(
field="state",
source=[State.READY],
target=State.DNS_NEEDED,
# conditions=[is_dns_needed]
)
def dns_needed(self):
"""Transition to the DNS_NEEDED state
domain should NOT have nameservers but
SHOULD have all contacts
Going to check nameservers and will
result in an EPP call
"""
logger.info("Changing to DNS_NEEDED state")
logger.info("able to transition to DNS_NEEDED state")
def _disclose_fields(self, contact: PublicContact):
"""creates a disclose object that can be added to a contact Create using
.disclose= <this function> on the command before sending.
@ -1197,6 +1529,10 @@ class Domain(TimeStampedModel, DomainHelper):
raise e
def is_ipv6(self, ip: str):
ip_addr = ipaddress.ip_address(ip)
return ip_addr.version == 6
def _fetch_hosts(self, host_data):
"""Fetch host info."""
hosts = []
@ -1214,84 +1550,214 @@ class Domain(TimeStampedModel, DomainHelper):
hosts.append({k: v for k, v in host.items() if v is not ...})
return hosts
def _update_or_create_host(self, host):
raise NotImplementedError()
def _convert_ips(self, ip_list: list[str]):
"""Convert Ips to a list of epp.Ip objects
use when sending update host command.
if there are no ips an empty list will be returned
def _delete_host(self, host):
raise NotImplementedError()
Args:
ip_list (list[str]): the new list of ips, may be empty
Returns:
edited_ip_list (list[epp.Ip]): list of epp.ip objects ready to
be sent to the registry
"""
edited_ip_list = []
if ip_list is None:
return []
for ip_addr in ip_list:
if self.is_ipv6(ip_addr):
edited_ip_list.append(epp.Ip(addr=ip_addr, ip="v6"))
else: # default ip addr is v4
edited_ip_list.append(epp.Ip(addr=ip_addr))
return edited_ip_list
def _update_host(self, nameserver: str, ip_list: list[str], old_ip_list: list[str]):
"""Update an existing host object in EPP. Sends the update host command
can result in a RegistryError
Args:
nameserver (str): nameserver or subdomain
ip_list (list[str]): the new list of ips, may be empty
old_ip_list (list[str]): the old ip list, may also be empty
Returns:
errorCode (int): one of ErrorCode enum type values
"""
try:
if (
ip_list is None
or len(ip_list) == 0
and isinstance(old_ip_list, list)
and len(old_ip_list) != 0
):
return ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY
added_ip_list = set(ip_list).difference(old_ip_list)
removed_ip_list = set(old_ip_list).difference(ip_list)
request = commands.UpdateHost(
name=nameserver,
add=self._convert_ips(list(added_ip_list)),
rem=self._convert_ips(list(removed_ip_list)),
)
response = registry.send(request, cleaned=True)
logger.info("_update_host()-> sending req as %s" % request)
return response.code
except RegistryError as e:
logger.error("Error _update_host, code was %s error was %s" % (e.code, e))
return e.code
def addAndRemoveHostsFromDomain(
self, hostsToAdd: list[str], hostsToDelete: list[str]
):
"""sends an UpdateDomain message to the registry with the hosts provided
Args:
hostsToDelete (list[epp.HostObjSet])- list of host objects to delete
hostsToAdd (list[epp.HostObjSet])- list of host objects to add
Returns:
response code (int)- RegistryErrorCode integer value
defaults to return COMMAND_COMPLETED_SUCCESSFULLY
if there is nothing to add or delete
"""
if hostsToAdd == [] and hostsToDelete == []:
return ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY
try:
updateReq = commands.UpdateDomain(
name=self.name, rem=hostsToDelete, add=hostsToAdd
)
logger.info(
"addAndRemoveHostsFromDomain()-> sending update domain req as %s"
% updateReq
)
response = registry.send(updateReq, cleaned=True)
return response.code
except RegistryError as e:
logger.error(
"Error addAndRemoveHostsFromDomain, code was %s error was %s"
% (e.code, e)
)
return e.code
def _delete_hosts_if_not_used(self, hostsToDelete: list[str]):
"""delete the host object in registry,
will only delete the host object, if it's not being used by another domain
Performs just the DeleteHost epp call
Supresses regstry error, as registry can disallow delete for various reasons
Args:
hostsToDelete (list[str])- list of nameserver/host names to remove
Returns:
None
"""
try:
for nameserver in hostsToDelete:
deleteHostReq = commands.DeleteHost(name=nameserver)
registry.send(deleteHostReq, cleaned=True)
logger.info(
"_delete_hosts_if_not_used()-> sending delete host req as %s"
% deleteHostReq
)
except RegistryError as e:
if e.code == ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION:
logger.info(
"Did not remove host %s because it is in use on another domain."
% nameserver
)
else:
logger.error(
"Error _delete_hosts_if_not_used, code was %s error was %s"
% (e.code, e)
)
def _fetch_cache(self, fetch_hosts=False, fetch_contacts=False):
"""Contact registry for info about a domain."""
try:
# get info from registry
dataResponse = self._get_or_create_domain()
data = dataResponse.res_data[0]
# extract properties from response
# (Ellipsis is used to mean "null")
cache = {
"auth_info": getattr(data, "auth_info", ...),
"_contacts": getattr(data, "contacts", ...),
"cr_date": getattr(data, "cr_date", ...),
"ex_date": getattr(data, "ex_date", ...),
"_hosts": getattr(data, "hosts", ...),
"name": getattr(data, "name", ...),
"registrant": getattr(data, "registrant", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
}
# remove null properties (to distinguish between "a value of None" and null)
cleaned = {k: v for k, v in cache.items() if v is not ...}
data_response = self._get_or_create_domain()
cache = self._extract_data_from_response(data_response)
# remove null properties (to distinguish between "a value of None" and null)
cleaned = self._remove_null_properties(cache)
# statuses can just be a list no need to keep the epp object
if "statuses" in cleaned:
cleaned["statuses"] = [status.state for status in cleaned["statuses"]]
# get extensions info, if there is any
# DNSSECExtension is one possible extension, make sure to handle
# only DNSSECExtension and not other type extensions
returned_extensions = dataResponse.extensions
cleaned["dnssecdata"] = None
for extension in returned_extensions:
if isinstance(extension, extensions.DNSSECExtension):
cleaned["dnssecdata"] = extension
cleaned["dnssecdata"] = self._get_dnssec_data(data_response.extensions)
# Capture and store old hosts and contacts from cache if they exist
old_cache_hosts = self._cache.get("hosts")
old_cache_contacts = self._cache.get("contacts")
# get contact info, if there are any
if (
fetch_contacts
and "_contacts" in cleaned
and isinstance(cleaned["_contacts"], list)
and len(cleaned["_contacts"]) > 0
):
cleaned["contacts"] = self._fetch_contacts(cleaned["_contacts"])
# We're only getting contacts, so retain the old
# hosts that existed in cache (if they existed)
# and pass them along.
if fetch_contacts:
cleaned["contacts"] = self._get_contacts(cleaned.get("_contacts", []))
if old_cache_hosts is not None:
logger.debug("resetting cleaned['hosts'] to old_cache_hosts")
cleaned["hosts"] = old_cache_hosts
# get nameserver info, if there are any
if (
fetch_hosts
and "_hosts" in cleaned
and isinstance(cleaned["_hosts"], list)
and len(cleaned["_hosts"])
):
cleaned["hosts"] = self._fetch_hosts(cleaned["_hosts"])
# We're only getting hosts, so retain the old
# contacts that existed in cache (if they existed)
# and pass them along.
if fetch_hosts:
cleaned["hosts"] = self._get_hosts(cleaned.get("_hosts", []))
if old_cache_contacts is not None:
cleaned["contacts"] = old_cache_contacts
# replace the prior cache with new data
self._cache = cleaned
except RegistryError as e:
logger.error(e)
def _extract_data_from_response(self, data_response):
data = data_response.res_data[0]
return {
"auth_info": getattr(data, "auth_info", ...),
"_contacts": getattr(data, "contacts", ...),
"cr_date": getattr(data, "cr_date", ...),
"ex_date": getattr(data, "ex_date", ...),
"_hosts": getattr(data, "hosts", ...),
"name": getattr(data, "name", ...),
"registrant": getattr(data, "registrant", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
}
def _remove_null_properties(self, cache):
return {k: v for k, v in cache.items() if v is not ...}
def _get_dnssec_data(self, response_extensions):
# get extensions info, if there is any
# DNSSECExtension is one possible extension, make sure to handle
# only DNSSECExtension and not other type extensions
dnssec_data = None
for extension in response_extensions:
if isinstance(extension, extensions.DNSSECExtension):
dnssec_data = extension
return dnssec_data
def _get_contacts(self, contacts):
choices = PublicContact.ContactTypeChoices
# We expect that all these fields get populated,
# so we can create these early, rather than waiting.
cleaned_contacts = {
choices.ADMINISTRATIVE: None,
choices.SECURITY: None,
choices.TECHNICAL: None,
}
if contacts and isinstance(contacts, list) and len(contacts) > 0:
cleaned_contacts = self._fetch_contacts(contacts)
return cleaned_contacts
def _get_hosts(self, hosts):
cleaned_hosts = []
if hosts and isinstance(hosts, list):
cleaned_hosts = self._fetch_hosts(hosts)
return cleaned_hosts
def _get_or_create_public_contact(self, public_contact: PublicContact):
"""Tries to find a PublicContact object in our DB.
If it can't, it'll create it. Returns PublicContact"""