Tests / code cleanup

This commit is contained in:
zandercymatics 2023-10-17 11:21:18 -06:00
parent 3aac1e38df
commit d7c7fb9d23
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
6 changed files with 188 additions and 130 deletions

View file

@ -1,10 +1,14 @@
from unittest import skip
from unittest.mock import patch
from unittest.mock import MagicMock, patch
from django.conf import settings
from django.test import Client
from django.test import TestCase
from epplibwrapper.client import EPPLibWrapper
from epplibwrapper.utility.pool import EPPConnectionPool
from registrar.models.domain import Domain
from registrar.tests.common import MockEppLib
from registrar.models.domain import registry
import logging
@ -12,24 +16,17 @@ try:
from epplib.client import Client
from epplib import commands
from epplib.exceptions import TransportError
from epplib.transport import SocketTransport
from epplib.responses import base
except ImportError:
pass
logger = logging.getLogger(__name__)
@patch("djangooidc.views.CLIENT", autospec=True)
class TestConnectionPool(MockEppLib):
class TestConnectionPool(TestCase):
"""Tests for our connection pooling behaviour"""
def setUp(self):
"""
Background:
Given the registrant is logged in
And the registrant is the admin on a domain
"""
super().setUp()
self.pool_options = {
# Current pool size
"size": 1,
@ -40,10 +37,45 @@ class TestConnectionPool(MockEppLib):
"keepalive": 60,
}
def tearDown(self):
super().tearDown()
# Mock a successful connection
self.mock_connect_patch = patch("epplib.client.Client.connect")
self.mocked_connect_function = self.mock_connect_patch.start()
self.mocked_connect_function.side_effect = self.mock_connect
def user_info(*args):
# Mock the send behaviour
self.mock_send_patch = patch("epplib.client.Client.send")
self.mocked_send_function = self.mock_send_patch.start()
self.mocked_send_function.side_effect = self.mock_send
# Mock the pool object
self.mockSendPatch = patch("registrar.models.domain.registry._pool")
self.mockedSendFunction = self.mockSendPatch.start()
self.mockedSendFunction.side_effect = self.fake_pool
def tearDown(self):
self.mock_send_patch.stop()
self.mock_connect_patch.stop()
self.mockSendPatch.stop()
def mock_connect(self, _request):
return None
def mock_send(self, _request):
if isinstance(_request, commands.Login):
response = MagicMock(
code=1000,
msg="Command completed successfully",
res_data=None,
cl_tr_id="xkw1uo#2023-10-17T15:29:09.559376",
sv_tr_id="5CcH4gxISuGkq8eqvr1UyQ==-35a",
extensions=[],
msg_q=None,
)
return response
return None
def user_info(self, *args):
return {
"sub": "TEST",
"email": "test@example.com",
@ -52,67 +84,19 @@ class TestConnectionPool(MockEppLib):
"phone": "814564000",
}
def test_pool_created_successfully(self, mock_client):
# setup
session = self.client.session
session["state"] = "TEST" # nosec B105
session.save()
# mock
@patch("djangooidc.views.CLIENT", autospec=True)
def fake_pool(self, mock_client):
# mock client
mock_client.callback.side_effect = self.user_info
# Create a mock transport object
mock_login = MagicMock()
mock_login.cert_file = "path/to/cert_file"
mock_login.key_file = "path/to/key_file"
client = EPPLibWrapper()
pool = client._pool
# These are defined outside of the pool,
# so we can reimplement how this is being done
# in client.py. They should remain unchanged,
# and if they aren't, something went wrong.
expected_login = commands.Login(
cl_id="nothing",
password="nothing",
obj_uris=[
"urn:ietf:params:xml:ns:domain-1.0",
"urn:ietf:params:xml:ns:contact-1.0",
],
new_pw=None,
version="1.0",
lang="en",
ext_uris=[],
pool = EPPConnectionPool(
client=mock_client, login=mock_login, options=self.pool_options
)
# Key/cert will generate a new file everytime.
# This should never be null, so we can check for that.
try:
expected_client = Client(
SocketTransport(
settings.SECRET_REGISTRY_HOSTNAME,
cert_file=pool._client.transport.cert_file,
key_file=pool._client.transport.key_file,
password=settings.SECRET_REGISTRY_KEY_PASSPHRASE,
)
).__dict__
except Exception as err:
self.fail(err)
# We don't care about checking if the objects are both of
# the same reference, we only care about data parity, so
# we do a dict conversion.
actual_client = pool._client.__dict__
actual_client["transport"] = actual_client["transport"].__dict__
expected_client["transport"] = expected_client["transport"].__dict__
# Ensure that we're getting the credentials we expect
self.assertEqual(pool._login, expected_login)
self.assertEqual(actual_client, expected_client)
# Check that options are set correctly
self.assertEqual(pool.size, self.pool_options["size"])
self.assertEqual(pool.keepalive, self.pool_options["keepalive"])
self.assertEqual(pool.exc_classes, self.pool_options["exc_classes"])
# Check that it is running
self.assertEqual(client.pool_status.connection_success, True)
self.assertEqual(client.pool_status.pool_running, True)
return pool
@skip("not implemented yet")
def test_pool_timesout(self):
@ -124,7 +108,33 @@ class TestConnectionPool(MockEppLib):
"""Multiple users send data concurrently"""
raise
@skip("not implemented yet")
def test_pool_tries_create_invalid(self):
"""A .send is invoked on the pool, but the pool
shouldn't be running."""
# Fake data for the _pool object
domain, _ = Domain.objects.get_or_create(name="freeman.gov")
# Trigger the getter - should fail
expected_contact = domain.security_contact
self.assertEqual(registry.pool_status.pool_running, False)
self.assertEqual(registry.pool_status.connection_success, False)
self.assertEqual(len(registry._pool.conn), 0)
def test_pool_sends_data(self):
"""A .send is invoked on the pool"""
raise
"""A .send is invoked on the pool successfully"""
# Fake data for the _pool object
domain, _ = Domain.objects.get_or_create(name="freeman.gov")
# The connection pool will fail to start, start it manually
# so that our mocks can take over
registry.start_connection_pool(try_start_if_invalid=True)
# Pretend that we've connected
registry.pool_status.pool_running = True
registry.pool_status.connection_success = True
# Trigger the getter - should succeed
expected_contact = domain.security_contact
self.assertEqual(registry.pool_status.pool_running, True)
self.assertEqual(registry.pool_status.connection_success, True)
self.assertEqual(len(registry._pool.conn), self.pool_options["size"])