mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-08-15 05:54:11 +02:00
epplibwrapper test_client
This commit is contained in:
parent
6c8ec6044b
commit
4e00124fdd
1 changed files with 80 additions and 75 deletions
|
@ -26,7 +26,8 @@ class TestClient(TestCase):
|
||||||
|
|
||||||
def fake_result(self, code, msg):
|
def fake_result(self, code, msg):
|
||||||
"""Helper function to create a fake Result object"""
|
"""Helper function to create a fake Result object"""
|
||||||
return Result(code=code, msg=msg, res_data=[], cl_tr_id="cl_tr_id", sv_tr_id="sv_tr_id")
|
with less_console_noise():
|
||||||
|
return Result(code=code, msg=msg, res_data=[], cl_tr_id="cl_tr_id", sv_tr_id="sv_tr_id")
|
||||||
|
|
||||||
@patch("epplibwrapper.client.Client")
|
@patch("epplibwrapper.client.Client")
|
||||||
def test_initialize_client_success(self, mock_client):
|
def test_initialize_client_success(self, mock_client):
|
||||||
|
@ -268,8 +269,9 @@ class TestClient(TestCase):
|
||||||
Raises a ConcurrentObjectUseError, which gevent throws when accessing
|
Raises a ConcurrentObjectUseError, which gevent throws when accessing
|
||||||
the same thread from two different locations.
|
the same thread from two different locations.
|
||||||
"""
|
"""
|
||||||
# This error is thrown when two threads are being used concurrently
|
with less_console_noise():
|
||||||
raise ConcurrentObjectUseError("This socket is already used by another greenlet")
|
# This error is thrown when two threads are being used concurrently
|
||||||
|
raise ConcurrentObjectUseError("This socket is already used by another greenlet")
|
||||||
|
|
||||||
def do_nothing(self, command=None):
|
def do_nothing(self, command=None):
|
||||||
"""
|
"""
|
||||||
|
@ -281,62 +283,65 @@ class TestClient(TestCase):
|
||||||
"""
|
"""
|
||||||
Simulates receiving a success response from EPP.
|
Simulates receiving a success response from EPP.
|
||||||
"""
|
"""
|
||||||
mock = MagicMock(
|
with less_console_noise():
|
||||||
code=1000,
|
mock = MagicMock(
|
||||||
msg="Command completed successfully",
|
code=1000,
|
||||||
res_data=None,
|
msg="Command completed successfully",
|
||||||
cl_tr_id="xkw1uo#2023-10-17T15:29:09.559376",
|
res_data=None,
|
||||||
sv_tr_id="5CcH4gxISuGkq8eqvr1UyQ==-35a",
|
cl_tr_id="xkw1uo#2023-10-17T15:29:09.559376",
|
||||||
extensions=[],
|
sv_tr_id="5CcH4gxISuGkq8eqvr1UyQ==-35a",
|
||||||
msg_q=None,
|
extensions=[],
|
||||||
)
|
msg_q=None,
|
||||||
return mock
|
)
|
||||||
|
return mock
|
||||||
|
|
||||||
def fake_info_domain_received(self, command=None, cleaned=None):
|
def fake_info_domain_received(self, command=None, cleaned=None):
|
||||||
"""
|
"""
|
||||||
Simulates receiving a response by reading from a predefined XML file.
|
Simulates receiving a response by reading from a predefined XML file.
|
||||||
"""
|
"""
|
||||||
location = Path(__file__).parent / "utility" / "infoDomain.xml"
|
with less_console_noise():
|
||||||
xml = (location).read_bytes()
|
location = Path(__file__).parent / "utility" / "infoDomain.xml"
|
||||||
return xml
|
xml = (location).read_bytes()
|
||||||
|
return xml
|
||||||
|
|
||||||
def get_fake_epp_result(self):
|
def get_fake_epp_result(self):
|
||||||
"""Mimics a return from EPP by returning a dictionary in the same format"""
|
"""Mimics a return from EPP by returning a dictionary in the same format"""
|
||||||
result = {
|
with less_console_noise():
|
||||||
"cl_tr_id": None,
|
result = {
|
||||||
"code": 1000,
|
"cl_tr_id": None,
|
||||||
"extensions": [],
|
"code": 1000,
|
||||||
"msg": "Command completed successfully",
|
"extensions": [],
|
||||||
"msg_q": None,
|
"msg": "Command completed successfully",
|
||||||
"res_data": [
|
"msg_q": None,
|
||||||
info.InfoDomainResultData(
|
"res_data": [
|
||||||
roid="DF1340360-GOV",
|
info.InfoDomainResultData(
|
||||||
statuses=[
|
roid="DF1340360-GOV",
|
||||||
common.Status(
|
statuses=[
|
||||||
state="serverTransferProhibited",
|
common.Status(
|
||||||
description=None,
|
state="serverTransferProhibited",
|
||||||
lang="en",
|
description=None,
|
||||||
),
|
lang="en",
|
||||||
common.Status(state="inactive", description=None, lang="en"),
|
),
|
||||||
],
|
common.Status(state="inactive", description=None, lang="en"),
|
||||||
cl_id="gov2023-ote",
|
],
|
||||||
cr_id="gov2023-ote",
|
cl_id="gov2023-ote",
|
||||||
cr_date=datetime.datetime(2023, 8, 15, 23, 56, 36, tzinfo=tzlocal()),
|
cr_id="gov2023-ote",
|
||||||
up_id="gov2023-ote",
|
cr_date=datetime.datetime(2023, 8, 15, 23, 56, 36, tzinfo=tzlocal()),
|
||||||
up_date=datetime.datetime(2023, 8, 17, 2, 3, 19, tzinfo=tzlocal()),
|
up_id="gov2023-ote",
|
||||||
tr_date=None,
|
up_date=datetime.datetime(2023, 8, 17, 2, 3, 19, tzinfo=tzlocal()),
|
||||||
name="test3.gov",
|
tr_date=None,
|
||||||
registrant="TuaWnx9hnm84GCSU",
|
name="test3.gov",
|
||||||
admins=[],
|
registrant="TuaWnx9hnm84GCSU",
|
||||||
nsset=None,
|
admins=[],
|
||||||
keyset=None,
|
nsset=None,
|
||||||
ex_date=datetime.date(2024, 8, 15),
|
keyset=None,
|
||||||
auth_info=info.DomainAuthInfo(pw="2fooBAR123fooBaz"),
|
ex_date=datetime.date(2024, 8, 15),
|
||||||
)
|
auth_info=info.DomainAuthInfo(pw="2fooBAR123fooBaz"),
|
||||||
],
|
)
|
||||||
"sv_tr_id": "wRRNVhKhQW2m6wsUHbo/lA==-29a",
|
],
|
||||||
}
|
"sv_tr_id": "wRRNVhKhQW2m6wsUHbo/lA==-29a",
|
||||||
return result
|
}
|
||||||
|
return result
|
||||||
|
|
||||||
def test_send_command_close_failure_recovers(self):
|
def test_send_command_close_failure_recovers(self):
|
||||||
"""
|
"""
|
||||||
|
@ -350,28 +355,28 @@ class TestClient(TestCase):
|
||||||
- Subsequently, the client re-initializes the connection.
|
- Subsequently, the client re-initializes the connection.
|
||||||
- A retry of the command execution post-reinitialization succeeds.
|
- A retry of the command execution post-reinitialization succeeds.
|
||||||
"""
|
"""
|
||||||
|
with less_console_noise():
|
||||||
|
expected_result = self.get_fake_epp_result()
|
||||||
|
wrapper = None
|
||||||
|
# Trigger a retry
|
||||||
|
# Do nothing on connect, as we aren't testing it and want to connect while
|
||||||
|
# mimicking the rest of the client as closely as possible (which is not entirely possible with MagicMock)
|
||||||
|
with patch.object(EPPLibWrapper, "_connect", self.do_nothing):
|
||||||
|
with patch.object(SocketTransport, "send", self.fake_failure_send_concurrent_threads):
|
||||||
|
wrapper = EPPLibWrapper()
|
||||||
|
tested_command = commands.InfoDomain(name="test.gov")
|
||||||
|
try:
|
||||||
|
wrapper.send(tested_command, cleaned=True)
|
||||||
|
except RegistryError as err:
|
||||||
|
expected_error = "InfoDomain failed to execute due to an unknown error."
|
||||||
|
self.assertEqual(err.args[0], expected_error)
|
||||||
|
else:
|
||||||
|
self.fail("Registry error was not thrown")
|
||||||
|
|
||||||
expected_result = self.get_fake_epp_result()
|
# After a retry, try sending again to see if the connection recovers
|
||||||
wrapper = None
|
with patch.object(EPPLibWrapper, "_connect", self.do_nothing):
|
||||||
# Trigger a retry
|
with patch.object(SocketTransport, "send", self.fake_success_send), patch.object(
|
||||||
# Do nothing on connect, as we aren't testing it and want to connect while
|
SocketTransport, "receive", self.fake_info_domain_received
|
||||||
# mimicking the rest of the client as closely as possible (which is not entirely possible with MagicMock)
|
):
|
||||||
with patch.object(EPPLibWrapper, "_connect", self.do_nothing):
|
result = wrapper.send(tested_command, cleaned=True)
|
||||||
with patch.object(SocketTransport, "send", self.fake_failure_send_concurrent_threads):
|
self.assertEqual(expected_result, result.__dict__)
|
||||||
wrapper = EPPLibWrapper()
|
|
||||||
tested_command = commands.InfoDomain(name="test.gov")
|
|
||||||
try:
|
|
||||||
wrapper.send(tested_command, cleaned=True)
|
|
||||||
except RegistryError as err:
|
|
||||||
expected_error = "InfoDomain failed to execute due to an unknown error."
|
|
||||||
self.assertEqual(err.args[0], expected_error)
|
|
||||||
else:
|
|
||||||
self.fail("Registry error was not thrown")
|
|
||||||
|
|
||||||
# After a retry, try sending again to see if the connection recovers
|
|
||||||
with patch.object(EPPLibWrapper, "_connect", self.do_nothing):
|
|
||||||
with patch.object(SocketTransport, "send", self.fake_success_send), patch.object(
|
|
||||||
SocketTransport, "receive", self.fake_info_domain_received
|
|
||||||
):
|
|
||||||
result = wrapper.send(tested_command, cleaned=True)
|
|
||||||
self.assertEqual(expected_result, result.__dict__)
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue