Update tests

This commit is contained in:
zandercymatics 2023-10-17 16:00:21 -06:00
parent 2f4425d914
commit 1502e7a693
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
3 changed files with 11 additions and 12 deletions

View file

@ -189,7 +189,7 @@ class EPPLibWrapper:
# Since we reuse the same creds for each pool, we can test on
# one socket, and if successful, then we know we can connect.
if (
not try_start_if_invalid
try_start_if_invalid
and (settings.DEBUG
or not self._test_registry_connection_success())
):

View file

@ -46,6 +46,7 @@ class Socket:
Tries 3 times"""
# Something went wrong if this doesn't exist
if not hasattr(self.client, "connect"):
logger.warning("self.client does not have a connect method")
return False
counter = 0 # we'll try 3 times
@ -58,12 +59,15 @@ class Socket:
counter += 1
sleep((counter * 50) / 1000) # sleep 50 ms to 150 ms
else: # don't try again
logger.warning("LoginError raised and should not retry or has been retried 3 times already")
logger.warning(f"should retry? {err.should_retry()}")
return False
else:
self.disconnect()
# If we encounter a login error, fail
if self.is_login_error(response.code):
logger.warning("was login error")
return False
# otherwise, just return true

View file

@ -137,18 +137,13 @@ class TestConnectionPool(TestCase):
with ExitStack() as stack:
stack.enter_context(patch.object(Socket, "connect", None))
stack.enter_context(patch.object(Socket, "send", fake_send))
stack.enter_context(patch.object(Socket, "_create_socket", Socket()))
stack.enter_context(patch.object(EPPLibWrapper, "_create_pool", self.fake_pool))
#stack.enter_context(patch.object(EPPLibWrapper, "get_pool", self.fake_pool))
pool = EPPLibWrapper(False)
# The connection pool will fail to start, start it manually
# so that our mocks can take over
pool.start_connection_pool(try_start_if_invalid=True)
print(f"this is pool {pool._pool.__dict__}")
# Pool should be running, and be the right size
self.assertEqual(pool.pool_status.pool_running, True)
self.assertEqual(pool.pool_status.connection_success, True)
pool.send(commands.InfoDomain(name="test.gov"), cleaned=True)
self.assertEqual(len(pool._pool.conn), self.pool_options["size"])
self.assertEqual(registry.pool_status.pool_running, True)
self.assertEqual(registry.pool_status.connection_success, True)
registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
self.assertEqual(len(registry._pool.conn), self.pool_options["size"])
#pool.send()