properly typed extensions.DNSSECExtension

This commit is contained in:
David Kennedy 2023-10-10 08:08:16 -04:00
parent 95b1a02789
commit 9e8aa2dee3
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B
4 changed files with 120 additions and 49 deletions

View file

@ -2,6 +2,7 @@ from itertools import zip_longest
import logging import logging
from datetime import date from datetime import date
from string import digits from string import digits
from typing import Optional
from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore
from django.db import models from django.db import models
@ -283,7 +284,7 @@ class Domain(TimeStampedModel, DomainHelper):
return e.code return e.code
@Cache @Cache
def dnssecdata(self) -> extensions.DNSSECExtension: def dnssecdata(self) -> Optional[extensions.DNSSECExtension]:
try: try:
return self._get_property("dnssecdata") return self._get_property("dnssecdata")
except Exception as err: except Exception as err:
@ -293,8 +294,9 @@ class Domain(TimeStampedModel, DomainHelper):
return None return None
def getDnssecdataChanges( def getDnssecdataChanges(
self, _dnssecdata: dict self,
) -> tuple[dict, dict]: _dnssecdata: Optional[extensions.DNSSECExtension]
) -> tuple[dict, dict]:
""" """
calls self.dnssecdata, it should pull from cache but may result calls self.dnssecdata, it should pull from cache but may result
in an epp call in an epp call
@ -311,34 +313,76 @@ class Domain(TimeStampedModel, DomainHelper):
remExtension will be all existing dnssecdata to be deleted remExtension will be all existing dnssecdata to be deleted
""" """
if isinstance(_dnssecdata, extensions.DNSSECExtension):
logger.info("extension is properly typed")
else:
logger.info("extension is NOT properly typed")
oldDnssecdata = self.dnssecdata oldDnssecdata = self.dnssecdata
addDnssecdata = {"dsData": [], "keyData": [],} addDnssecdata: dict = {}
remDnssecdata = {"dsData": [], "keyData": [],} # "dsData": [],
# "keyData": [],
if _dnssecdata and len(_dnssecdata["dsData"]) > 0: # }
remDnssecdata: dict = {}
# "dsData": [],
# "keyData": [],
# }
if _dnssecdata and _dnssecdata.dsData is not None:
logger.info("there is submitted dsdata for comparison")
logger.info("there is %s submitted records", len(_dnssecdata.dsData))
# initialize addDnssecdata and remDnssecdata for dsData # initialize addDnssecdata and remDnssecdata for dsData
addDnssecdata["dsData"] = _dnssecdata["dsData"] addDnssecdata["dsData"] = _dnssecdata.dsData
remDnssecdata["dsData"] = [] # remDnssecdata["dsData"] = []
if oldDnssecdata and len(oldDnssecdata.dsData) > 0: if oldDnssecdata and len(oldDnssecdata.dsData) > 0:
logger.info("there is existing ds data for comparison")
logger.info("there is %s existing records for compare", len(oldDnssecdata.dsData))
# if existing dsData not in new dsData, mark for removal # if existing dsData not in new dsData, mark for removal
remDnssecdata["dsData"] = [dsData for dsData in oldDnssecdata.dsData if dsData not in _dnssecdata["dsData"]] dsDataForRemoval = [
dsData
for dsData in oldDnssecdata.dsData
if dsData not in _dnssecdata.dsData
]
if len(dsDataForRemoval) > 0:
logger.info("ds data marked for removal")
remDnssecdata["dsData"] = dsDataForRemoval
# if new dsData not in existing dsData, mark for add # if new dsData not in existing dsData, mark for add
addDnssecdata["dsData"] = [dsData for dsData in _dnssecdata["dsData"] if dsData not in oldDnssecdata.dsData] dsDataForAdd = [
dsData
for dsData in _dnssecdata.dsData
if dsData not in oldDnssecdata.dsData
]
if len(dsDataForAdd) > 0:
logger.info("ds data marked for add")
addDnssecdata["dsData"] = dsDataForAdd
else:
addDnssecdata["dsData"] = None
elif _dnssecdata and len(_dnssecdata["keyData"]) > 0: elif _dnssecdata and _dnssecdata.keyData is not None:
# initialize addDnssecdata and remDnssecdata for keyData # initialize addDnssecdata and remDnssecdata for keyData
addDnssecdata["keyData"] = _dnssecdata["keyData"] addDnssecdata["keyData"] = _dnssecdata.keyData
remDnssecdata["keyData"] = [] # remDnssecdata["keyData"] = []
if oldDnssecdata and len(oldDnssecdata.keyData) > 0: if oldDnssecdata and len(oldDnssecdata.keyData) > 0:
# if existing keyData not in new keyData, mark for removal # if existing keyData not in new keyData, mark for removal
remDnssecdata["keyData"] = [keyData for keyData in oldDnssecdata.keyData if keyData not in _dnssecdata["keyData"]] keyDataForRemoval = [
keyData
for keyData in oldDnssecdata.keyData
if keyData not in _dnssecdata.keyData
]
if len(keyDataForRemoval) > 0:
remDnssecdata["keyData"] = keyDataForRemoval
# if new keyData not in existing keyData, mark for add # if new keyData not in existing keyData, mark for add
addDnssecdata["keyData"] = [keyData for keyData in _dnssecdata["keyData"] if keyData not in oldDnssecdata.keyData] keyDataForAdd = [
keyData
for keyData in _dnssecdata.keyData
if keyData not in oldDnssecdata.keyData
]
if len(keyDataForAdd) > 0:
addDnssecdata["keyData"] = keyDataForAdd
else: else:
# there are no new dsData or keyData, remove all # there are no new dsData or keyData, remove all
remDnssecdata["dsData"] = getattr(oldDnssecdata, "dsData", None) remDnssecdata["dsData"] = getattr(oldDnssecdata, "dsData", None)
@ -347,7 +391,7 @@ class Domain(TimeStampedModel, DomainHelper):
return addDnssecdata, remDnssecdata return addDnssecdata, remDnssecdata
@dnssecdata.setter # type: ignore @dnssecdata.setter # type: ignore
def dnssecdata(self, _dnssecdata: dict): def dnssecdata(self, _dnssecdata: Optional[extensions.DNSSECExtension]):
_addDnssecdata, _remDnssecdata = self.getDnssecdataChanges(_dnssecdata) _addDnssecdata, _remDnssecdata = self.getDnssecdataChanges(_dnssecdata)
addParams = { addParams = {
"maxSigLife": _addDnssecdata.get("maxSigLife", None), "maxSigLife": _addDnssecdata.get("maxSigLife", None),
@ -366,12 +410,26 @@ class Domain(TimeStampedModel, DomainHelper):
remExtension = commands.UpdateDomainDNSSECExtension(**remParams) remExtension = commands.UpdateDomainDNSSECExtension(**remParams)
remRequest.add_extension(remExtension) remRequest.add_extension(remExtension)
try: try:
if len(_addDnssecdata.get("dsData", [])) > 0 or len(_addDnssecdata.get("keyData",[])) > 0: if (
"dsData" in _addDnssecdata and
_addDnssecdata["dsData"] is not None
or "keyData" in _addDnssecdata and
_addDnssecdata["keyData"] is not None
):
logger.info("sending addition")
registry.send(addRequest, cleaned=True) registry.send(addRequest, cleaned=True)
if len(_remDnssecdata.get("dsData", [])) > 0 or len(_remDnssecdata.get("keyData", [])) > 0: if (
"dsData" in _remDnssecdata and
_remDnssecdata["dsData"] is not None
or "keyData" in _remDnssecdata and
_remDnssecdata["keyData"] is not None
):
logger.info("sending removal")
registry.send(remRequest, cleaned=True) registry.send(remRequest, cleaned=True)
except RegistryError as e: except RegistryError as e:
logger.error("Error updating DNSSEC, code was %s error was %s" % (e.code, e)) logger.error(
"Error updating DNSSEC, code was %s error was %s" % (e.code, e)
)
raise e raise e
@nameservers.setter # type: ignore @nameservers.setter # type: ignore

View file

@ -706,22 +706,19 @@ class MockEppLib(TestCase):
} }
dnssecExtensionWithDsData: Mapping[Any, Any] = { dnssecExtensionWithDsData: Mapping[Any, Any] = {
"dsData": [common.DSData(**addDsData1)], # type: ignore "dsData": [common.DSData(**addDsData1)], # type: ignore
"keyData": [],
} }
dnssecExtensionWithMultDsData: Mapping[str, Any] = { dnssecExtensionWithMultDsData: Mapping[str, Any] = {
"dsData": [ "dsData": [
common.DSData(**addDsData1), # type: ignore common.DSData(**addDsData1), # type: ignore
common.DSData(**addDsData2), # type: ignore common.DSData(**addDsData2), # type: ignore
], ],
"keyData": [],
} }
dnssecExtensionWithKeyData: Mapping[str, Any] = { dnssecExtensionWithKeyData: Mapping[str, Any] = {
"keyData": [common.DNSSECKeyData(**keyDataDict)], # type: ignore "keyData": [common.DNSSECKeyData(**keyDataDict)], # type: ignore
"dsData": [],
} }
dnssecExtensionRemovingDsData: Mapping[Any, Any] = { dnssecExtensionRemovingDsData: Mapping[Any, Any] = {
"dsData": [], "dsData": None,
"keyData": [], "keyData": None,
} }
def mockSend(self, _request, cleaned): def mockSend(self, _request, cleaned):

View file

@ -984,7 +984,9 @@ class TestRegistrantDNSSEC(MockEppLib):
"""Rule: Registrants may modify their secure DNS data""" """Rule: Registrants may modify their secure DNS data"""
# helper function to create UpdateDomainDNSSECExtention object for verification # helper function to create UpdateDomainDNSSECExtention object for verification
def createUpdateExtension(self, dnssecdata: extensions.DNSSECExtension, remove=False): def createUpdateExtension(
self, dnssecdata: extensions.DNSSECExtension, remove=False
):
if not remove: if not remove:
return commands.UpdateDomainDNSSECExtension( return commands.UpdateDomainDNSSECExtension(
maxSigLife=dnssecdata.maxSigLife, maxSigLife=dnssecdata.maxSigLife,
@ -1033,7 +1035,9 @@ class TestRegistrantDNSSEC(MockEppLib):
""" """
domain, _ = Domain.objects.get_or_create(name="dnssec-dsdata.gov") domain, _ = Domain.objects.get_or_create(name="dnssec-dsdata.gov")
domain.dnssecdata = self.dnssecExtensionWithDsData domain.dnssecdata = extensions.DNSSECExtension(
**self.dnssecExtensionWithDsData
)
# get the DNS SEC extension added to the UpdateDomain command and # get the DNS SEC extension added to the UpdateDomain command and
# verify that it is properly sent # verify that it is properly sent
# args[0] is the _request sent to registry # args[0] is the _request sent to registry
@ -1098,9 +1102,9 @@ class TestRegistrantDNSSEC(MockEppLib):
domain, _ = Domain.objects.get_or_create(name="dnssec-dsdata.gov") domain, _ = Domain.objects.get_or_create(name="dnssec-dsdata.gov")
# set the dnssecdata once # set the dnssecdata once
domain.dnssecdata = self.dnssecExtensionWithDsData domain.dnssecdata = extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
# set the dnssecdata again # set the dnssecdata again
domain.dnssecdata = self.dnssecExtensionWithDsData domain.dnssecdata = extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
# test that the dnssecdata getter is functioning properly # test that the dnssecdata getter is functioning properly
dnssecdata_get = domain.dnssecdata dnssecdata_get = domain.dnssecdata
self.mockedSendFunction.assert_has_calls( self.mockedSendFunction.assert_has_calls(
@ -1155,7 +1159,7 @@ class TestRegistrantDNSSEC(MockEppLib):
domain, _ = Domain.objects.get_or_create(name="dnssec-multdsdata.gov") domain, _ = Domain.objects.get_or_create(name="dnssec-multdsdata.gov")
domain.dnssecdata = self.dnssecExtensionWithMultDsData domain.dnssecdata = extensions.DNSSECExtension(**self.dnssecExtensionWithMultDsData)
# get the DNS SEC extension added to the UpdateDomain command # get the DNS SEC extension added to the UpdateDomain command
# and verify that it is properly sent # and verify that it is properly sent
# args[0] is the _request sent to registry # args[0] is the _request sent to registry
@ -1201,16 +1205,18 @@ class TestRegistrantDNSSEC(MockEppLib):
This test verifies: This test verifies:
1 - setter initially calls InfoDomain command 1 - setter initially calls InfoDomain command
2 - invalidate cache forces second InfoDomain command (to match mocks) 2 - first setter calls UpdateDomain command
3 - second setter calls InfoDomain command again
3 - setter then calls UpdateDomain command 3 - setter then calls UpdateDomain command
4 - setter adds the UpdateDNSSECExtension extension to the command with rem 4 - setter adds the UpdateDNSSECExtension extension to the command with rem
""" """
domain, _ = Domain.objects.get_or_create(name="dnssec-dsdata.gov") domain, _ = Domain.objects.get_or_create(name="dnssec-dsdata.gov")
dnssecdata_get_initial = domain.dnssecdata # call to force initial mock # dnssecdata_get_initial = domain.dnssecdata # call to force initial mock
domain._invalidate_cache() # domain._invalidate_cache()
domain.dnssecdata = self.dnssecExtensionRemovingDsData domain.dnssecdata = extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
domain.dnssecdata = extensions.DNSSECExtension(**self.dnssecExtensionRemovingDsData)
# get the DNS SEC extension added to the UpdateDomain command and # get the DNS SEC extension added to the UpdateDomain command and
# verify that it is properly sent # verify that it is properly sent
# args[0] is the _request sent to registry # args[0] is the _request sent to registry
@ -1220,7 +1226,7 @@ class TestRegistrantDNSSEC(MockEppLib):
args[0].extensions[0], args[0].extensions[0],
self.createUpdateExtension( self.createUpdateExtension(
extensions.DNSSECExtension(**self.dnssecExtensionWithDsData), extensions.DNSSECExtension(**self.dnssecExtensionWithDsData),
remove=True remove=True,
), ),
) )
self.mockedSendFunction.assert_has_calls( self.mockedSendFunction.assert_has_calls(
@ -1231,6 +1237,16 @@ class TestRegistrantDNSSEC(MockEppLib):
), ),
cleaned=True, cleaned=True,
), ),
call(
commands.UpdateDomain(
name="dnssec-dsdata.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call( call(
commands.InfoDomain( commands.InfoDomain(
name="dnssec-dsdata.gov", name="dnssec-dsdata.gov",
@ -1265,7 +1281,7 @@ class TestRegistrantDNSSEC(MockEppLib):
domain, _ = Domain.objects.get_or_create(name="dnssec-keydata.gov") domain, _ = Domain.objects.get_or_create(name="dnssec-keydata.gov")
domain.dnssecdata = self.dnssecExtensionWithKeyData domain.dnssecdata = extensions.DNSSECExtension(**self.dnssecExtensionWithKeyData)
# get the DNS SEC extension added to the UpdateDomain command # get the DNS SEC extension added to the UpdateDomain command
# and verify that it is properly sent # and verify that it is properly sent
# args[0] is the _request sent to registry # args[0] is the _request sent to registry
@ -1314,7 +1330,7 @@ class TestRegistrantDNSSEC(MockEppLib):
domain, _ = Domain.objects.get_or_create(name="dnssec-invalid.gov") domain, _ = Domain.objects.get_or_create(name="dnssec-invalid.gov")
with self.assertRaises(RegistryError) as err: with self.assertRaises(RegistryError) as err:
domain.dnssecdata = self.dnssecExtensionWithDsData domain.dnssecdata = extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
self.assertTrue( self.assertTrue(
err.is_client_error() or err.is_session_error() or err.is_server_error() err.is_client_error() or err.is_session_error() or err.is_server_error()
) )

View file

@ -362,8 +362,8 @@ class DomainDsdataView(DomainPermissionView, FormMixin):
def form_valid(self, formset): def form_valid(self, formset):
"""The formset is valid, perform something with it.""" """The formset is valid, perform something with it."""
# Set the nameservers from the formset # Set the dnssecdata from the formset
dnssecdata = {"dsData": []} dnssecdata = extensions.DNSSECExtension()
for form in formset: for form in formset:
try: try:
@ -375,13 +375,13 @@ class DomainDsdataView(DomainPermissionView, FormMixin):
"digestType": int(form.cleaned_data["digest_type"]), "digestType": int(form.cleaned_data["digest_type"]),
"digest": form.cleaned_data["digest"], "digest": form.cleaned_data["digest"],
} }
dnssecdata["dsData"].append(common.DSData(**dsrecord)) if dnssecdata.dsData is None:
dnssecdata.dsData = []
dnssecdata.dsData.append(common.DSData(**dsrecord))
except KeyError: except KeyError:
# no server information in this field, skip it # no server information in this field, skip it
pass pass
domain = self.get_object() domain = self.get_object()
if len(dnssecdata["dsData"]) == 0:
dnssecdata = {}
try: try:
domain.dnssecdata = dnssecdata domain.dnssecdata = dnssecdata
except RegistryError as err: except RegistryError as err:
@ -483,7 +483,7 @@ class DomainKeydataView(DomainPermissionView, FormMixin):
"""The formset is valid, perform something with it.""" """The formset is valid, perform something with it."""
# Set the nameservers from the formset # Set the nameservers from the formset
dnssecdata = {"keyData": []} dnssecdata = extensions.DNSSECExtension()
for form in formset: for form in formset:
try: try:
@ -495,13 +495,13 @@ class DomainKeydataView(DomainPermissionView, FormMixin):
"alg": int(form.cleaned_data["algorithm"]), "alg": int(form.cleaned_data["algorithm"]),
"pubKey": form.cleaned_data["pub_key"], "pubKey": form.cleaned_data["pub_key"],
} }
if dnssecdata.keyData is None:
dnssecdata.keyData = []
dnssecdata["keyData"].append(common.DNSSECKeyData(**keyrecord)) dnssecdata["keyData"].append(common.DNSSECKeyData(**keyrecord))
except KeyError: except KeyError:
# no server information in this field, skip it # no server information in this field, skip it
pass pass
domain = self.get_object() domain = self.get_object()
if len(dnssecdata["keyData"]) == 0:
dnssecdata = {}
try: try:
domain.dnssecdata = dnssecdata domain.dnssecdata = dnssecdata
except RegistryError as err: except RegistryError as err: