Nameserver fixes along with linter fixes

This commit is contained in:
Rebecca Hsieh 2023-09-28 17:41:19 -07:00
parent 1f149700e3
commit ef29459421
No known key found for this signature in database
GPG key ID: 644527A2F375A379
3 changed files with 126 additions and 87 deletions

View file

@ -234,6 +234,10 @@ class Domain(TimeStampedModel, DomainHelper):
logger.info("Domain is missing nameservers %s" % err)
return []
# TODO-848: Fix the output
# ('ns1.therealslimhsiehdy.com',)
# ('ns2.therealslimhsiehdy.com',)
# ('ns3.therealslimhsiehdy.com',)
hostList = []
for host in hosts:
hostList.append((host["name"], host["addrs"]))
@ -328,7 +332,7 @@ class Domain(TimeStampedModel, DomainHelper):
deleted_values.append((prevHost, addrs))
# if the host exists in both, check if the addresses changed
else:
# TODO - host is being updated when previous was None and new is an empty list
# TODO - host is being updated when previous was None+new is empty list
# add check here
if newHostDict[prevHost] is not None and set(
newHostDict[prevHost]
@ -347,6 +351,49 @@ class Domain(TimeStampedModel, DomainHelper):
return (deleted_values, updated_values, new_values, previousHostDict)
# TODO-848: Rename later - was getting complex err
def _loop_through(self, deleted_values, updated_values, new_values, oldNameservers):
successDeletedCount = 0
successCreatedCount = 0
for hostTuple in deleted_values:
deleted_response_code = self._delete_host(hostTuple[0])
if deleted_response_code == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
successDeletedCount += 1
for hostTuple in updated_values:
updated_response_code = self._update_host(
hostTuple[0], hostTuple[1], oldNameservers.get(hostTuple[0])
)
if updated_response_code not in [
ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
ErrorCode.OBJECT_EXISTS,
]:
logger.warning(
"Could not update host %s. Error code was: %s "
% (hostTuple[0], updated_response_code)
)
for key, value in new_values.items():
createdCode = self._create_host(
host=key, addrs=value
) # creates in registry
if (
createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY
or createdCode == ErrorCode.OBJECT_EXISTS
):
request = commands.UpdateDomain(
name=self.name, add=[epp.HostObjSet([key])]
)
try:
registry.send(request, cleaned=True)
successCreatedCount += 1
except RegistryError as e:
logger.error(
"Error adding nameserver, code was %s error was %s"
% (e.code, e)
)
return len(oldNameservers) - successDeletedCount + successCreatedCount
@nameservers.setter # type: ignore
def nameservers(self, hosts: list[tuple[str]]):
"""host should be a tuple of type str, str,... where the elements are
@ -369,68 +416,15 @@ class Domain(TimeStampedModel, DomainHelper):
new_values,
oldNameservers,
) = self.getNameserverChanges(hosts=hosts)
successDeletedCount = 0
successCreatedCount = 0
print("deleted_values")
print(deleted_values)
print("updated_values")
print(updated_values)
print("new_values")
print(new_values)
print("oldNameservers")
print(oldNameservers)
for hostTuple in deleted_values:
deleted_response_code = self._delete_host(hostTuple[0])
if deleted_response_code == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
successDeletedCount += 1
for hostTuple in updated_values:
updated_response_code = self._update_host(
hostTuple[0], hostTuple[1], oldNameservers.get(hostTuple[0])
)
if updated_response_code not in [
ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
ErrorCode.OBJECT_EXISTS,
]:
logger.warning(
"Could not update host %s. Error code was: %s "
% (hostTuple[0], updated_response_code)
)
for key, value in new_values.items():
createdCode = self._create_host(
host=key, addrs=value
) # creates in registry
if (
createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY
or createdCode == ErrorCode.OBJECT_EXISTS
):
request = commands.UpdateDomain(
name=self.name, add=[epp.HostObjSet([key])]
)
try:
registry.send(request, cleaned=True)
successCreatedCount += 1
except RegistryError as e:
logger.error(
"Error adding nameserver, code was %s error was %s"
% (e.code, e)
# TODO-848: Fix name here
successTotalNameservers = self._loop_through(
deleted_values, updated_values, new_values, oldNameservers
)
successTotalNameservers = (
len(oldNameservers) - successDeletedCount + successCreatedCount
)
# print("SUCCESSTOTALNAMESERVERS IS ")
# print(successTotalNameservers)
print("len(oldNameservers) IS ")
print(len(oldNameservers))
print("successDeletedCount IS ")
print(successDeletedCount)
print("successCreatedCount IS ")
print(successCreatedCount)
print("SUCCESSTOTALNAMESERVERS IS ")
print(successTotalNameservers)
if successTotalNameservers < 2:
try:
print("DNS_NEEDED: We have less than 2 nameservers")
@ -453,7 +447,8 @@ class Domain(TimeStampedModel, DomainHelper):
"nameserver setter checked for create state "
"and it did not succeed. Warning: %s" % err
)
# TODO-848: Handle removed nameservers here, will need to change the state then go back to DNS_NEEDED
# TODO-848: Handle removed nameservers here,
# will need to change the state then go back to DNS_NEEDED
@Cache
def statuses(self) -> list[str]:

View file

@ -581,6 +581,13 @@ class MockEppLib(TestCase):
contacts=[],
hosts=["fake.host.com"],
)
# infoDomainUpdateFail = fakedEppObject(
# "security",
# cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
# contacts=[],
# hosts=["ns1.failednameserver.gov"],
# addrs=["1.2.3"],
# )
infoDomainThreeHosts = fakedEppObject(
"my-nameserver.gov",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
@ -628,26 +635,31 @@ class MockEppLib(TestCase):
extendedValues = False
# TODO-848: Rename later - was getting complex err
def _getattrshelper(self, _request):
if getattr(_request, "name", None) == "security.gov":
return MagicMock(res_data=[self.infoDomainNoContact])
elif getattr(_request, "name", None) == "my-nameserver.gov":
if self.extendedValues:
return MagicMock(res_data=[self.infoDomainThreeHosts])
elif self.mockedSendFunction.call_count == 5:
return MagicMock(res_data=[self.infoDomainTwoHosts])
else:
return MagicMock(res_data=[self.infoDomainNoHost])
elif getattr(_request, "name", None) == "nameserverwithip.gov":
return MagicMock(res_data=[self.infoDomainHasIP])
# elif getattr(_request, "name", None) == "failednameserver.gov":
# return MagicMock(res_data=[self.infoDomainUpdateFail])
# return MagicMock(res_data=[self.mockDataInfoDomain])
def mockSend(self, _request, cleaned):
"""Mocks the registry.send function used inside of domain.py
registry is imported from epplibwrapper
returns objects that simulate what would be in a epp response
but only relevant pieces for tests"""
if isinstance(_request, commands.InfoDomain):
if getattr(_request, "name", None) == "security.gov":
return MagicMock(res_data=[self.infoDomainNoContact])
elif getattr(_request, "name", None) == "my-nameserver.gov":
if self.extendedValues:
return MagicMock(res_data=[self.infoDomainThreeHosts])
elif (
self.mockedSendFunction.call_count == 5
): ## remove this breaks anything?
return MagicMock(res_data=[self.infoDomainTwoHosts])
else:
return MagicMock(res_data=[self.infoDomainNoHost])
elif getattr(_request, "name", None) == "nameserverwithip.gov":
return MagicMock(res_data=[self.infoDomainHasIP])
return MagicMock(res_data=[self.mockDataInfoDomain])
# TODO-848: Fix name here
return self._getattrshelper(_request)
elif isinstance(_request, commands.InfoContact):
return MagicMock(res_data=[self.mockDataInfoContact])
elif (
@ -663,7 +675,9 @@ class MockEppLib(TestCase):
res_data=[self.mockDataHostChange],
code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,
)
elif isinstance(_request, commands.UpdateHost):
# elif isinstance(_request, commands.UpdateHost):
# if getattr(_request, "name", None) == "ns1.failednameserver.gov":
# raise RegistryError(code=ErrorCode.OBJECT_EXISTS)
return MagicMock(
res_data=[self.mockDataHostChange],
code=ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY,

View file

@ -20,6 +20,7 @@ from .common import MockEppLib
from epplibwrapper import (
commands,
common,
RegistryError,
)
@ -334,7 +335,8 @@ class TestRegistrantContacts(MockEppLib):
created contact of type 'security'
"""
# make a security contact that is a PublicContact
self.domain.dns_needed_from_unknown() # make sure a security email already exists
# make sure a security email already exists
self.domain.dns_needed_from_unknown()
expectedSecContact = PublicContact.get_default_security()
expectedSecContact.domain = self.domain
expectedSecContact.email = "newEmail@fake.com"
@ -558,7 +560,6 @@ class TestRegistrantNameservers(MockEppLib):
new_values,
oldNameservers,
) = self.domain.getNameserverChanges(newChanges)
print(oldNameservers)
self.assertEqual(deleted_values, [("ns2.example.com", ["1.2.3"])])
self.assertEqual(updated_values, [("ns3.my-nameserver.gov", ["1.2.4"])])
self.assertEqual(new_values, {"ns4.example.com": None})
@ -613,7 +614,6 @@ class TestRegistrantNameservers(MockEppLib):
"""
# set 2 nameservers
print("DOCKER DIDNT SUCK THIS TIME")
self.domain.nameservers = [(self.nameserver1,), (self.nameserver2,)]
# when you create a host, you also have to update at same time
@ -740,11 +740,7 @@ class TestRegistrantNameservers(MockEppLib):
"""
self.extendedValues = True
print("domain state")
print(self.domain.state)
self.domain.ready()
print("Domain state is now")
print(self.domain.state)
self.domain.nameservers = [(self.nameserver1,)]
expectedCalls = [
call(
@ -893,9 +889,6 @@ class TestRegistrantNameservers(MockEppLib):
("ns3.nameserverwithip.gov", ["2.3.4"]),
]
# print("self.mockedSendFunction.call_args_list is ")
# print(self.mockedSendFunction.call_args_list)
expectedCalls = [
call(
commands.InfoDomain(name="nameserverwithip.gov", auth_info=None),
@ -940,7 +933,6 @@ class TestRegistrantNameservers(MockEppLib):
with self.assertRaises(ValueError):
self.domain.nameservers = [(dotgovnameserver, ["1.2.3"])]
@skip("not implemented yet")
def test_nameservers_are_idempotent(self):
"""
Scenario: Registrant adds a set of nameservers twice, due to a UI glitch
@ -951,6 +943,31 @@ class TestRegistrantNameservers(MockEppLib):
# implementation note: this requires seeing what happens when these are actually
# sent like this, and then implementing appropriate mocks for any errors the
# registry normally sends in this case
self.extendedValues = True
# Checking that it doesn't create or update even if out of order
self.domain.nameservers = [
(self.nameserver3,),
(self.nameserver1,),
(self.nameserver2,),
]
expectedCalls = [
call(
commands.InfoDomain(name="my-nameserver.gov", auth_info=None),
cleaned=True,
),
call(commands.InfoHost(name="ns1.my-nameserver-1.com"), cleaned=True),
call(commands.InfoHost(name="ns1.my-nameserver-2.com"), cleaned=True),
call(commands.InfoHost(name="ns1.cats-are-superior3.com"), cleaned=True),
]
self.mockedSendFunction.assert_has_calls(expectedCalls, any_order=True)
self.assertEqual(self.mockedSendFunction.call_count, 4)
@skip("not implemented yet")
def test_caching_issue(self):
raise
@skip("not implemented yet")
@ -959,8 +976,21 @@ class TestRegistrantNameservers(MockEppLib):
Scenario: An update to the nameservers is unsuccessful
When an error is returned from epplibwrapper
Then a user-friendly error message is returned for displaying on the web
Note: TODO 433 -- we will perform correct error handling and complete
this ticket. We want to raise an error for update/create/delete, but
don't want to lose user info (and exit out too early)
"""
raise
domain, _ = Domain.objects.get_or_create(
name="failednameserver.gov", state=Domain.State.READY
)
with self.assertRaises(RegistryError):
domain.nameservers = [("ns1.failednameserver.gov", ["4.5.6"])]
# print("self.mockedSendFunction.call_args_list is ")
# print(self.mockedSendFunction.call_args_list)
def tearDown(self):
self.extendedValues = False