merge conflicts

This commit is contained in:
David Kennedy 2023-10-17 15:13:23 -04:00
commit 9bc66be67f
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B
34 changed files with 2346 additions and 750 deletions

View file

@ -5,6 +5,7 @@ import ipaddress
import re
from datetime import date
from string import digits
from typing import Optional
from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore
from django.db import models
@ -77,13 +78,15 @@ class Domain(TimeStampedModel, DomainHelper):
# Extract information about the calling function
calling_function_name = calling_frame.function
calling_module_name = calling_frame[0].f_globals['__name__']
calling_module_name = calling_frame[0].f_globals["__name__"]
calling_line_number = calling_frame[2]
# Print information about the calling function
print(f"Calling function: {calling_function_name} in module {calling_module_name} at line {calling_line_number}")
print(
f"Calling function: {calling_function_name} in module {calling_module_name} at line {calling_line_number}"
)
i+=1
i += 1
except Exception as err:
print("========================================================")
break
@ -484,24 +487,140 @@ class Domain(TimeStampedModel, DomainHelper):
return [deleteObj], len(deleteStrList)
@Cache
def dnssecdata(self) -> extensions.DNSSECExtension:
return self._get_property("dnssecdata")
def dnssecdata(self) -> Optional[extensions.DNSSECExtension]:
"""
Get a complete list of dnssecdata extensions for this domain.
dnssecdata are provided as a list of DNSSECExtension objects.
A DNSSECExtension object includes:
maxSigLife: Optional[int]
dsData: Optional[Sequence[DSData]]
keyData: Optional[Sequence[DNSSECKeyData]]
"""
try:
return self._get_property("dnssecdata")
except Exception as err:
# Don't throw error as this is normal for a new domain
logger.info("Domain does not have dnssec data defined %s" % err)
return None
def getDnssecdataChanges(
self, _dnssecdata: Optional[extensions.DNSSECExtension]
) -> tuple[dict, dict]:
"""
calls self.dnssecdata, it should pull from cache but may result
in an epp call
returns tuple of 2 values as follows:
addExtension: dict
remExtension: dict
addExtension includes all dsData or keyData to be added
remExtension includes all dsData or keyData to be removed
method operates on dsData OR keyData, never a mix of the two;
operates based on which is present in _dnssecdata;
if neither is present, addExtension will be empty dict, and
remExtension will be all existing dnssecdata to be deleted
"""
oldDnssecdata = self.dnssecdata
addDnssecdata: dict = {}
remDnssecdata: dict = {}
if _dnssecdata and _dnssecdata.dsData is not None:
# initialize addDnssecdata and remDnssecdata for dsData
addDnssecdata["dsData"] = _dnssecdata.dsData
if oldDnssecdata and len(oldDnssecdata.dsData) > 0:
# if existing dsData not in new dsData, mark for removal
dsDataForRemoval = [
dsData
for dsData in oldDnssecdata.dsData
if dsData not in _dnssecdata.dsData
]
if len(dsDataForRemoval) > 0:
remDnssecdata["dsData"] = dsDataForRemoval
# if new dsData not in existing dsData, mark for add
dsDataForAdd = [
dsData
for dsData in _dnssecdata.dsData
if dsData not in oldDnssecdata.dsData
]
if len(dsDataForAdd) > 0:
addDnssecdata["dsData"] = dsDataForAdd
else:
addDnssecdata["dsData"] = None
elif _dnssecdata and _dnssecdata.keyData is not None:
# initialize addDnssecdata and remDnssecdata for keyData
addDnssecdata["keyData"] = _dnssecdata.keyData
if oldDnssecdata and len(oldDnssecdata.keyData) > 0:
# if existing keyData not in new keyData, mark for removal
keyDataForRemoval = [
keyData
for keyData in oldDnssecdata.keyData
if keyData not in _dnssecdata.keyData
]
if len(keyDataForRemoval) > 0:
remDnssecdata["keyData"] = keyDataForRemoval
# if new keyData not in existing keyData, mark for add
keyDataForAdd = [
keyData
for keyData in _dnssecdata.keyData
if keyData not in oldDnssecdata.keyData
]
if len(keyDataForAdd) > 0:
addDnssecdata["keyData"] = keyDataForAdd
else:
# there are no new dsData or keyData, remove all
remDnssecdata["dsData"] = getattr(oldDnssecdata, "dsData", None)
remDnssecdata["keyData"] = getattr(oldDnssecdata, "keyData", None)
return addDnssecdata, remDnssecdata
@dnssecdata.setter # type: ignore
def dnssecdata(self, _dnssecdata: extensions.DNSSECExtension):
updateParams = {
"maxSigLife": _dnssecdata.get("maxSigLife", None),
"dsData": _dnssecdata.get("dsData", None),
"keyData": _dnssecdata.get("keyData", None),
"remAllDsKeyData": True,
def dnssecdata(self, _dnssecdata: Optional[extensions.DNSSECExtension]):
_addDnssecdata, _remDnssecdata = self.getDnssecdataChanges(_dnssecdata)
addParams = {
"maxSigLife": _addDnssecdata.get("maxSigLife", None),
"dsData": _addDnssecdata.get("dsData", None),
"keyData": _addDnssecdata.get("keyData", None),
}
request = commands.UpdateDomain(name=self.name)
extension = commands.UpdateDomainDNSSECExtension(**updateParams)
request.add_extension(extension)
remParams = {
"maxSigLife": _remDnssecdata.get("maxSigLife", None),
"remDsData": _remDnssecdata.get("dsData", None),
"remKeyData": _remDnssecdata.get("keyData", None),
}
addRequest = commands.UpdateDomain(name=self.name)
addExtension = commands.UpdateDomainDNSSECExtension(**addParams)
addRequest.add_extension(addExtension)
remRequest = commands.UpdateDomain(name=self.name)
remExtension = commands.UpdateDomainDNSSECExtension(**remParams)
remRequest.add_extension(remExtension)
try:
registry.send(request, cleaned=True)
if (
"dsData" in _addDnssecdata
and _addDnssecdata["dsData"] is not None
or "keyData" in _addDnssecdata
and _addDnssecdata["keyData"] is not None
):
registry.send(addRequest, cleaned=True)
if (
"dsData" in _remDnssecdata
and _remDnssecdata["dsData"] is not None
or "keyData" in _remDnssecdata
and _remDnssecdata["keyData"] is not None
):
registry.send(remRequest, cleaned=True)
except RegistryError as e:
logger.error("Error adding DNSSEC, code was %s error was %s" % (e.code, e))
logger.error(
"Error updating DNSSEC, code was %s error was %s" % (e.code, e)
)
raise e
@nameservers.setter # type: ignore
@ -1465,7 +1584,10 @@ class Domain(TimeStampedModel, DomainHelper):
"up_date": getattr(data, "up_date", ...),
}
hosts.append({k: v for k, v in host.items() if v is not ...})
logger.info("successfully called InfoHost on host_data, and have %s hosts to set to cache", len(hosts))
logger.info(
"successfully called InfoHost on host_data, and have %s hosts to set to cache",
len(hosts),
)
return hosts
def _convert_ips(self, ip_list: list[str]):
@ -1653,9 +1775,7 @@ class Domain(TimeStampedModel, DomainHelper):
cleaned["hosts"] = old_cache_hosts
# get nameserver info, if there are any
if (
fetch_hosts
):
if fetch_hosts:
if (
"_hosts" in cleaned
and isinstance(cleaned["_hosts"], list)