Fix test cases

This commit is contained in:
zandercymatics 2023-10-18 14:02:14 -06:00
parent 7d9ebb1072
commit 9c6a6ef12a
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
4 changed files with 151 additions and 118 deletions

View file

@ -4,7 +4,7 @@ Date: 2023-13-10
## Status
In Review
Accepted
## Context

View file

@ -121,7 +121,6 @@ class EPPLibWrapper:
logger.error(message, exc_info=True)
raise RegistryError(message) from err
else:
print(f"test thing {response}")
if response.code >= 2000:
raise RegistryError(response.msg, code=response.code)
else:
@ -188,10 +187,7 @@ class EPPLibWrapper:
"""
# Since we reuse the same creds for each pool, we can test on
# one socket, and if successful, then we know we can connect.
if (
settings.DEBUG
or not self._test_registry_connection_success()
):
if not self._test_registry_connection_success():
logger.warning("Cannot contact the Registry")
self.pool_status.connection_success = False
else:

View file

@ -1,5 +1,8 @@
import datetime
from pathlib import Path
from unittest import skip
from unittest.mock import MagicMock, patch
from dateutil.tz import tzlocal
from django.conf import settings
from django.test import TestCase
@ -16,6 +19,8 @@ try:
from epplib import commands
from epplib.client import Client
from epplib.exceptions import TransportError
from epplib.transport import SocketTransport
from epplib.models import common, info
except ImportError:
pass
@ -36,115 +41,23 @@ class TestConnectionPool(TestCase):
"keepalive": 60,
}
#self.start_mocks()
def tearDown(self):
#self.mock_send_patch.stop()
#self.mock_connect_patch.stop()
#self.mockSendPatch.stop()
pass
def start_mocks(self):
# Mock a successful connection
self.mock_connect_patch = patch("epplib.client.Client.connect")
self.mocked_connect_function = self.mock_connect_patch.start()
self.mocked_connect_function.side_effect = self.mock_connect
# Mock the send behaviour
self.mock_send_patch = patch("epplib.client.Client.send")
self.mocked_send_function = self.mock_send_patch.start()
self.mocked_send_function.side_effect = self.mock_send
# Mock the pool object
self.mockSendPatch = patch("registrar.models.domain.registry._pool")
self.mockedSendFunction = self.mockSendPatch.start()
self.mockedSendFunction.side_effect = self.fake_pool
def mock_connect(self, _request):
return None
def mock_send(self, _request):
if isinstance(_request, commands.Login):
response = MagicMock(
code=1000,
msg="Command completed successfully",
res_data=None,
cl_tr_id="xkw1uo#2023-10-17T15:29:09.559376",
sv_tr_id="5CcH4gxISuGkq8eqvr1UyQ==-35a",
extensions=[],
msg_q=None,
def fake_socket(self, login, client):
# Create a fake client object
fake_client = Client(
SocketTransport(
"none",
cert_file="path/to/cert_file",
key_file="path/to/key_file",
password="none",
)
)
return response
return None
return Socket(fake_client, MagicMock())
def user_info(self, *args):
return {
"sub": "TEST",
"email": "test@example.com",
"first_name": "Testy",
"last_name": "Tester",
"phone": "814564000",
}
def patch_success(self):
return True
@patch("djangooidc.views.CLIENT", autospec=True)
def fake_pool(self, mock_client):
# mock client
mock_client.callback.side_effect = self.user_info
# Create a mock transport object
mock_login = MagicMock()
mock_login.cert_file = "path/to/cert_file"
mock_login.key_file = "path/to/key_file"
pool = EPPConnectionPool(
client=mock_client, login=mock_login, options=self.pool_options
)
return pool
@patch("djangooidc.views.CLIENT", autospec=True)
def fake_socket(self, mock_client):
# mock client
mock_client.callback.side_effect = self.user_info
# Create a mock transport object
mock_login = MagicMock()
mock_login.transport.cert_file = "path/to/cert_file"
mock_login.transport.key_file = "path/to/key_file"
return Socket(mock_client, mock_login)
def test(self, client, login):
mock = MagicMock()
mock.response.code = 1000
mock().return_value = 1000
return MagicMock()
@skip("not implemented yet")
def test_pool_timesout(self):
"""The pool timesout and restarts"""
raise
@skip("not implemented yet")
def test_multiple_users_send_data(self):
"""Multiple users send data concurrently"""
raise
def test_pool_tries_create_invalid(self):
"""A .send is invoked on the pool, but the pool
shouldn't be running."""
# Fake data for the _pool object
domain, _ = Domain.objects.get_or_create(name="freeman.gov")
# Trigger the getter - should fail
expected_contact = domain.security_contact
self.assertEqual(registry.pool_status.pool_running, False)
self.assertEqual(registry.pool_status.connection_success, False)
self.assertEqual(len(registry._pool.conn), 0)
def test_pool_sends_data(self):
"""A .send is invoked on the pool successfully"""
# Fake data for the _pool object
domain, _ = Domain.objects.get_or_create(name="freeman.gov")
def fake_send(self):
def fake_send(self, command, cleaned=None):
mock = MagicMock(
code=1000,
msg="Command completed successfully",
@ -156,17 +69,108 @@ class TestConnectionPool(TestCase):
)
return mock
@patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success)
def test_pool_sends_data(self):
"""A .send is invoked on the pool successfully"""
self.maxDiff = None
expected_result = {
'cl_tr_id': None,
'code': 1000,
'extensions': [],
'msg': 'Command completed successfully',
'msg_q': None,
'res_data': [info.InfoDomainResultData(
roid='DF1340360-GOV',
statuses=[
common.Status(
state='serverTransferProhibited',
description=None,
lang='en'
),
common.Status(state='inactive',
description=None,
lang='en')],
cl_id='gov2023-ote',
cr_id='gov2023-ote',
cr_date=datetime.datetime(2023, 8, 15, 23, 56, 36, tzinfo=tzlocal()),
up_id='gov2023-ote',
up_date=datetime.datetime(2023, 8, 17, 2, 3, 19, tzinfo=tzlocal()),
tr_date=None,
name='test3.gov',
registrant='TuaWnx9hnm84GCSU',
admins=[],
nsset=None,
keyset=None,
ex_date=datetime.date(2024, 8, 15),
auth_info=info.DomainAuthInfo(pw='2fooBAR123fooBaz')
)
],
'sv_tr_id': 'wRRNVhKhQW2m6wsUHbo/lA==-29a'
}
def fake_client(mock_client):
client = Client(
SocketTransport(
"none",
cert_file="path/to/cert_file",
key_file="path/to/key_file",
password="none",
)
)
return client
# Mock a response from EPP
def fake_receive(command, cleaned=None):
location= Path(__file__).parent / "utility" / "infoDomain.xml"
xml = (location).read_bytes()
return xml
# Mock what happens inside the "with"
with ExitStack() as stack:
stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket))
stack.enter_context(patch.object(Socket, "connect", fake_client))
stack.enter_context(patch.object(SocketTransport, "send", self.fake_send))
stack.enter_context(patch.object(SocketTransport, "receive", fake_receive))
# Restart the connection pool, since it starts on app startup
registry.start_connection_pool()
# Pool should be running, and be the right size
self.assertEqual(registry.pool_status.pool_running, True)
self.assertEqual(registry.pool_status.connection_success, True)
registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
self.assertEqual(registry.pool_status.pool_running, True)
# Send a command
result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
self.assertEqual(result.__dict__, expected_result)
# The number of open pools should match the number of requested ones.
# If it is 0, then they failed to open
self.assertEqual(len(registry._pool.conn), self.pool_options["size"])
#pool.send()
@patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success)
def test_raises_transport_error(self):
"""A .send is invoked on the pool, but registry connection is lost
right as we send a command."""
# Fake data for the _pool object
def fake_client(self):
client = Client(
SocketTransport(
"none",
cert_file="path/to/cert_file",
key_file="path/to/key_file",
password="none",
)
)
return client
# Trigger the getter - should succeed
#expected_contact = domain.security_contact
with ExitStack() as stack:
stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket))
stack.enter_context(patch.object(Socket, "connect", fake_client))
# Restart the connection pool, since it starts on app startup
registry.start_connection_pool()
# Pool should be running
self.assertEqual(registry.pool_status.connection_success, True)
self.assertEqual(registry.pool_status.pool_running, True)
# Try to send a command out - should fail
with self.assertRaises(TransportError):
registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)

View file

@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<epp xmlns:domain="urn:ietf:params:xml:ns:domain-1.0" xmlns:contact="urn:ietf:params:xml:ns:contact-1.0" xmlns:fee="urn:ietf:params:xml:ns:fee-0.6" xmlns:packageToken="urn:google:params:xml:ns:packageToken-1.0" xmlns="urn:ietf:params:xml:ns:epp-1.0" xmlns:fee11="urn:ietf:params:xml:ns:fee-0.11" xmlns:fee12="urn:ietf:params:xml:ns:fee-0.12" xmlns:launch="urn:ietf:params:xml:ns:launch-1.0" xmlns:secDNS="urn:ietf:params:xml:ns:secDNS-1.1" xmlns:host="urn:ietf:params:xml:ns:host-1.0">
<response>
<result code="1000">
<msg>Command completed successfully</msg>
</result>
<resData>
<domain:infData>
<domain:name>test3.gov</domain:name>
<domain:roid>DF1340360-GOV</domain:roid>
<domain:status s="serverTransferProhibited"/>
<domain:status s="inactive"/>
<domain:registrant>TuaWnx9hnm84GCSU</domain:registrant>
<domain:contact type="security">CONT2</domain:contact>
<domain:contact type="tech">CONT3</domain:contact>
<domain:clID>gov2023-ote</domain:clID>
<domain:crID>gov2023-ote</domain:crID>
<domain:crDate>2023-08-15T23:56:36Z</domain:crDate>
<domain:upID>gov2023-ote</domain:upID>
<domain:upDate>2023-08-17T02:03:19Z</domain:upDate>
<domain:exDate>2024-08-15T23:56:36Z</domain:exDate>
<domain:authInfo>
<domain:pw>2fooBAR123fooBaz</domain:pw>
</domain:authInfo>
</domain:infData>
</resData>
<trID>
<svTRID>wRRNVhKhQW2m6wsUHbo/lA==-29a</svTRID>
</trID>
</response>
</epp>