Fix kill_pool()

Kill pool was not killing instances correctly. This fixes that, and adds an additional test case
This commit is contained in:
zandercymatics 2023-10-19 15:59:28 -06:00
parent e7d73d2254
commit 055942fe3a
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
4 changed files with 189 additions and 65 deletions

View file

@ -68,7 +68,9 @@ class EPPLibWrapper:
self.pool_options = { self.pool_options = {
# Pool size # Pool size
"size": settings.EPP_CONNECTION_POOL_SIZE, "size": settings.EPP_CONNECTION_POOL_SIZE,
# Which errors the pool should look out for # Which errors the pool should look out for.
# Avoid changing this unless necessary,
# it can and will break things.
"exc_classes": (TransportError,), "exc_classes": (TransportError,),
# Occasionally pings the registry to keep the connection alive. # Occasionally pings the registry to keep the connection alive.
# Value in seconds => (keepalive / size) # Value in seconds => (keepalive / size)
@ -76,6 +78,7 @@ class EPPLibWrapper:
} }
self._pool = None self._pool = None
# Tracks the status of the pool # Tracks the status of the pool
self.pool_status = PoolStatus() self.pool_status = PoolStatus()
@ -85,9 +88,11 @@ class EPPLibWrapper:
def _send(self, command): def _send(self, command):
"""Helper function used by `send`.""" """Helper function used by `send`."""
cmd_type = command.__class__.__name__ cmd_type = command.__class__.__name__
# Start a timeout to check if the pool is hanging # Start a timeout to check if the pool is hanging
timeout = Timeout(settings.POOL_TIMEOUT) timeout = Timeout(settings.POOL_TIMEOUT)
timeout.start() timeout.start()
try: try:
if not self.pool_status.connection_success: if not self.pool_status.connection_success:
raise LoginError( raise LoginError(
@ -96,6 +101,9 @@ class EPPLibWrapper:
with self._pool.get() as connection: with self._pool.get() as connection:
response = connection.send(command) response = connection.send(command)
except Timeout as t: except Timeout as t:
# If more than one pool exists,
# multiple timeouts can be floating around.
# We need to be specific as to which we are targeting.
if t is timeout: if t is timeout:
# Flag that the pool is frozen, # Flag that the pool is frozen,
# then restart the pool. # then restart the pool.
@ -125,6 +133,7 @@ class EPPLibWrapper:
else: else:
return response return response
finally: finally:
# Close the timeout no matter what happens
timeout.close() timeout.close()
def send(self, command, *, cleaned=False): def send(self, command, *, cleaned=False):
@ -174,11 +183,6 @@ class EPPLibWrapper:
If an instance of the pool already exists, If an instance of the pool already exists,
then then that instance will be killed first. then then that instance will be killed first.
It is generally recommended to keep this enabled. It is generally recommended to keep this enabled.
try_start_if_invalid -> bool:
Designed for use in test cases, if we can't connect
to the registry, ignore that and try to connect anyway
It is generally recommended to keep this disabled.
""" """
# Since we reuse the same creds for each pool, we can test on # Since we reuse the same creds for each pool, we can test on
# one socket, and if successful, then we know we can connect. # one socket, and if successful, then we know we can connect.
@ -209,7 +213,7 @@ class EPPLibWrapper:
self._pool.kill_all_connections() self._pool.kill_all_connections()
self._pool = None self._pool = None
self.pool_status.pool_running = False self.pool_status.pool_running = False
return return None
logger.info("kill_pool() was invoked but there was no pool to delete") logger.info("kill_pool() was invoked but there was no pool to delete")
def _test_registry_connection_success(self): def _test_registry_connection_success(self):
@ -219,9 +223,11 @@ class EPPLibWrapper:
""" """
socket = Socket(self._client, self._login) socket = Socket(self._client, self._login)
can_login = False can_login = False
# Something went wrong if this doesn't exist # Something went wrong if this doesn't exist
if hasattr(socket, "test_connection_success"): if hasattr(socket, "test_connection_success"):
can_login = socket.test_connection_success() can_login = socket.test_connection_success()
return can_login return can_login
@ -229,9 +235,8 @@ try:
# Initialize epplib # Initialize epplib
CLIENT = EPPLibWrapper() CLIENT = EPPLibWrapper()
logger.info("registry client initialized") logger.info("registry client initialized")
except Exception as err: except Exception:
CLIENT = None # type: ignore CLIENT = None # type: ignore
logger.warning( logger.warning(
"Unable to configure epplib. Registrar cannot contact registry.", exc_info=True "Unable to configure epplib. Registrar cannot contact registry.", exc_info=True
) )
logger.warning(err)

View file

@ -38,15 +38,36 @@ class Socket:
raise LoginError(response.msg) raise LoginError(response.msg)
return self.client return self.client
def disconnect(self):
"""Close the connection."""
try:
self.client.send(commands.Logout())
self.client.close()
except Exception:
logger.warning("Connection to registry was not cleanly closed.")
def send(self, command):
"""Sends a command to the registry.
If the response code is >= 2000,
then this function raises a LoginError.
The calling function should handle this."""
response = self.client.send(command)
if self.is_login_error(response.code):
self.client.close()
raise LoginError(response.msg)
return response
def is_login_error(self, code): def is_login_error(self, code):
"""Returns the result of code >= 2000"""
return code >= 2000 return code >= 2000
def test_connection_success(self): def test_connection_success(self):
"""Tests if a successful connection can be made with the registry. """Tests if a successful connection can be made with the registry.
Tries 3 times""" Tries 3 times."""
# Something went wrong if this doesn't exist # Something went wrong if this doesn't exist
if not hasattr(self.client, "connect"): if not hasattr(self.client, "connect"):
logger.warning("self.client does not have a connect method") logger.warning("self.client does not have a connect attribute")
return False return False
counter = 0 # we'll try 3 times counter = 0 # we'll try 3 times
@ -72,21 +93,5 @@ class Socket:
logger.warning("was login error") logger.warning("was login error")
return False return False
# otherwise, just return true # Otherwise, just return true
return True return True
def disconnect(self):
"""Close the connection."""
try:
self.client.send(commands.Logout())
self.client.close()
except Exception:
logger.warning("Connection to registry was not cleanly closed.")
def send(self, command):
response = self.client.send(command)
if response.code >= 2000:
self.client.close()
raise LoginError(response.msg)
return response

View file

@ -68,6 +68,18 @@ class TestConnectionPool(TestCase):
) )
return mock return mock
def fake_client(mock_client):
pw = "none"
client = Client(
SocketTransport(
"none",
cert_file="path/to/cert_file",
key_file="path/to/key_file",
password=pw,
)
)
return client
@patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success) @patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success)
def test_pool_sends_data(self): def test_pool_sends_data(self):
"""A .send is invoked on the pool successfully""" """A .send is invoked on the pool successfully"""
@ -108,18 +120,6 @@ class TestConnectionPool(TestCase):
"sv_tr_id": "wRRNVhKhQW2m6wsUHbo/lA==-29a", "sv_tr_id": "wRRNVhKhQW2m6wsUHbo/lA==-29a",
} }
def fake_client(mock_client):
pw = "none"
client = Client(
SocketTransport(
"none",
cert_file="path/to/cert_file",
key_file="path/to/key_file",
password=pw,
)
)
return client
# Mock a response from EPP # Mock a response from EPP
def fake_receive(command, cleaned=None): def fake_receive(command, cleaned=None):
location = Path(__file__).parent / "utility" / "infoDomain.xml" location = Path(__file__).parent / "utility" / "infoDomain.xml"
@ -131,10 +131,10 @@ class TestConnectionPool(TestCase):
stack.enter_context( stack.enter_context(
patch.object(EPPConnectionPool, "_create_socket", self.fake_socket) patch.object(EPPConnectionPool, "_create_socket", self.fake_socket)
) )
stack.enter_context(patch.object(Socket, "connect", fake_client)) stack.enter_context(patch.object(Socket, "connect", self.fake_client))
stack.enter_context(patch.object(SocketTransport, "send", self.fake_send)) stack.enter_context(patch.object(SocketTransport, "send", self.fake_send))
stack.enter_context(patch.object(SocketTransport, "receive", fake_receive)) stack.enter_context(patch.object(SocketTransport, "receive", fake_receive))
# Restart the connection pool, since it starts on app startup # Restart the connection pool
registry.start_connection_pool() registry.start_connection_pool()
# Pool should be running, and be the right size # Pool should be running, and be the right size
self.assertEqual(registry.pool_status.connection_success, True) self.assertEqual(registry.pool_status.connection_success, True)
@ -152,29 +152,97 @@ class TestConnectionPool(TestCase):
# If it is 0, then they failed to open # If it is 0, then they failed to open
self.assertEqual(len(registry._pool.conn), self.pool_options["size"]) self.assertEqual(len(registry._pool.conn), self.pool_options["size"])
@patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success)
def test_pool_restarts_on_send(self):
"""A .send is invoked, but the pool isn't running.
The pool should restart."""
expected_result = {
"cl_tr_id": None,
"code": 1000,
"extensions": [],
"msg": "Command completed successfully",
"msg_q": None,
"res_data": [
info.InfoDomainResultData(
roid="DF1340360-GOV",
statuses=[
common.Status(
state="serverTransferProhibited",
description=None,
lang="en",
),
common.Status(state="inactive", description=None, lang="en"),
],
cl_id="gov2023-ote",
cr_id="gov2023-ote",
cr_date=datetime.datetime(
2023, 8, 15, 23, 56, 36, tzinfo=tzlocal()
),
up_id="gov2023-ote",
up_date=datetime.datetime(2023, 8, 17, 2, 3, 19, tzinfo=tzlocal()),
tr_date=None,
name="test3.gov",
registrant="TuaWnx9hnm84GCSU",
admins=[],
nsset=None,
keyset=None,
ex_date=datetime.date(2024, 8, 15),
auth_info=info.DomainAuthInfo(pw="2fooBAR123fooBaz"),
)
],
"sv_tr_id": "wRRNVhKhQW2m6wsUHbo/lA==-29a",
}
# Mock a response from EPP
def fake_receive(command, cleaned=None):
location = Path(__file__).parent / "utility" / "infoDomain.xml"
xml = (location).read_bytes()
return xml
# Mock what happens inside the "with"
with ExitStack() as stack:
stack.enter_context(
patch.object(EPPConnectionPool, "_create_socket", self.fake_socket)
)
stack.enter_context(patch.object(Socket, "connect", self.fake_client))
stack.enter_context(patch.object(SocketTransport, "send", self.fake_send))
stack.enter_context(patch.object(SocketTransport, "receive", fake_receive))
# Kill the connection pool
registry.kill_pool()
self.assertEqual(registry.pool_status.connection_success, False)
self.assertEqual(registry.pool_status.pool_running, False)
# An exception should be raised as end user will be informed
# that they cannot connect to EPP
with self.assertRaises(RegistryError):
expected = "InfoDomain failed to execute due to a connection error."
result = registry.send(
commands.InfoDomain(name="test.gov"), cleaned=True
)
self.assertEqual(result, expected)
# A subsequent command should be successful, as the pool restarts
result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
# Should this ever fail, it either means that the schema has changed,
# or the pool is broken.
# If the schema has changed: Update the associated infoDomain.xml file
self.assertEqual(result.__dict__, expected_result)
# The number of open pools should match the number of requested ones.
# If it is 0, then they failed to open
self.assertEqual(len(registry._pool.conn), self.pool_options["size"])
@patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success) @patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success)
def test_raises_connection_error(self): def test_raises_connection_error(self):
"""A .send is invoked on the pool, but registry connection is lost """A .send is invoked on the pool, but registry connection is lost
right as we send a command.""" right as we send a command."""
# Fake data for the _pool object
def fake_client(self):
pw = "none"
client = Client(
SocketTransport(
"none",
cert_file="path/to/cert_file",
key_file="path/to/key_file",
password=pw,
)
)
return client
with ExitStack() as stack: with ExitStack() as stack:
stack.enter_context( stack.enter_context(
patch.object(EPPConnectionPool, "_create_socket", self.fake_socket) patch.object(EPPConnectionPool, "_create_socket", self.fake_socket)
) )
stack.enter_context(patch.object(Socket, "connect", fake_client)) stack.enter_context(patch.object(Socket, "connect", self.fake_client))
# Pool should be running # Pool should be running
self.assertEqual(registry.pool_status.connection_success, True) self.assertEqual(registry.pool_status.connection_success, True)

View file

@ -6,9 +6,12 @@ from epplibwrapper.utility.pool_error import PoolError, PoolErrorCodes
try: try:
from epplib.commands import Hello from epplib.commands import Hello
from epplib.exceptions import TransportError
except ImportError: except ImportError:
pass pass
from gevent.lock import BoundedSemaphore
from collections import deque
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -27,7 +30,34 @@ class EPPConnectionPool(ConnectionPool):
# For storing shared credentials # For storing shared credentials
self._client = client self._client = client
self._login = login self._login = login
super().__init__(**options) # Keep track of each greenlet
self.greenlets = []
# Define optional pool settings.
# Kept in a dict so that the parent class,
# client.py, can maintain seperation/expanadability
self.size = 1
if "size" in options:
self.size = options["size"]
self.exc_classes = tuple((TransportError,))
if "exc_classes" in options:
self.exc_classes = options["exc_classes"]
self.keepalive = None
if "keepalive" in options:
self.keepalive = options["keepalive"]
# Determines the period in which new
# gevent threads are spun up
self.spawn_frequency = 0.1
if "spawn_frequency" in options:
self.spawn_frequency = options["spawn_frequency"]
self.conn = deque()
self.lock = BoundedSemaphore(self.size)
self.populate_all_connections()
def _new_connection(self): def _new_connection(self):
socket = self._create_socket(self._client, self._login) socket = self._create_socket(self._client, self._login)
@ -64,22 +94,38 @@ class EPPConnectionPool(ConnectionPool):
def kill_all_connections(self): def kill_all_connections(self):
"""Kills all active connections in the pool.""" """Kills all active connections in the pool."""
try: try:
gevent.killall(self.conn) if len(self.conn) > 0:
gevent.killall(self.greenlets)
self.greenlets.clear()
self.conn.clear() self.conn.clear()
# Clear the semaphore # Clear the semaphore
for i in range(self.lock.counter): self.lock = BoundedSemaphore(self.size)
self.lock.release() else:
logger.info("No connections to kill.")
except Exception as err: except Exception as err:
logger.error("Could not kill all connections.") logger.error("Could not kill all connections.")
raise err raise err
def repopulate_all_connections(self): def populate_all_connections(self):
"""Regenerates the connection pool. """Generates the connection pool.
If any connections exist, kill them first. If any connections exist, kill them first.
Based off of the __init__ definition for geventconnpool.
""" """
if len(self.conn) > 0: if len(self.conn) > 0:
self.kill_all_connections() self.kill_all_connections()
# Setup the lock
for i in range(self.size): for i in range(self.size):
self.lock.acquire() self.lock.acquire()
# Open multiple connections
for i in range(self.size): for i in range(self.size):
gevent.spawn_later(self.SPAWN_FREQUENCY * i, self._addOne) self.greenlets.append(
gevent.spawn_later(self.spawn_frequency * i, self._addOne)
)
# Open a "keepalive" thread if we want to ping open connections
if self.keepalive:
self.greenlets.append(gevent.spawn(self._keepalive_periodic))