removed EPP connection pool, simplified epplibwrapper, removed socket

This commit is contained in:
David Kennedy 2024-02-29 23:36:15 -05:00
parent b355f1813d
commit c5d525bd48
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B
8 changed files with 281 additions and 679 deletions

View file

@ -2,10 +2,6 @@
import logging
from time import sleep
from gevent import Timeout
from epplibwrapper.utility.pool_status import PoolStatus
try:
from epplib.client import Client
from epplib import commands
@ -18,8 +14,6 @@ from django.conf import settings
from .cert import Cert, Key
from .errors import ErrorCode, LoginError, RegistryError
from .socket import Socket
from .utility.pool import EPPConnectionPool
logger = logging.getLogger(__name__)
@ -43,8 +37,13 @@ class EPPLibWrapper:
ATTN: This should not be used directly. Use `Domain` from domain.py.
"""
def __init__(self, start_connection_pool=True) -> None:
def __init__(self) -> None:
"""Initialize settings which will be used for all connections."""
# set _client to None initially. In the event that the __init__ fails
# before _client initializes, app should still start and be in a state
# that it can attempt _client initialization on send attempts
logger.info("__init__ called")
self._client = None
# prepare (but do not send) a Login command
self._login = commands.Login(
cl_id=settings.SECRET_REGISTRY_CL_ID,
@ -54,7 +53,16 @@ class EPPLibWrapper:
"urn:ietf:params:xml:ns:contact-1.0",
],
)
try:
self._initialize_client()
except Exception:
logger.warning("Unable to configure epplib. Registrar cannot contact registry.", exc_info=True)
def _initialize_client(self) -> None:
"""Initialize a client, assuming _login defined. Sets _client to initialized
client. Raises errors if initialization fails.
This method will be called at app initialization, and also during retries."""
logger.info("_initialize_client called")
# establish a client object with a TCP socket transport
self._client = Client(
SocketTransport(
@ -64,50 +72,44 @@ class EPPLibWrapper:
password=settings.SECRET_REGISTRY_KEY_PASSPHRASE,
)
)
try:
# use the _client object to connect
self._client.connect()
response = self._client.send(self._login)
if response.code >= 2000: # type: ignore
self._client.close()
raise LoginError(response.msg) # type: ignore
except TransportError as err:
message = "_initialize_client failed to execute due to a connection error."
logger.error(f"{message} Error: {err}", exc_info=True)
raise RegistryError(message, code=ErrorCode.TRANSPORT_ERROR) from err
except LoginError as err:
raise err
except Exception as err:
message = "_initialize_client failed to execute due to an unknown error."
logger.error(f"{message} Error: {err}", exc_info=True)
raise RegistryError(message) from err
self.pool_options = {
# Pool size
"size": settings.EPP_CONNECTION_POOL_SIZE,
# Which errors the pool should look out for.
# Avoid changing this unless necessary,
# it can and will break things.
"exc_classes": (TransportError,),
# Occasionally pings the registry to keep the connection alive.
# Value in seconds => (keepalive / size)
"keepalive": settings.POOL_KEEP_ALIVE,
}
self._pool = None
# Tracks the status of the pool
self.pool_status = PoolStatus()
if start_connection_pool:
self.start_connection_pool()
def _disconnect(self) -> None:
"""Close the connection."""
logger.info("_disconnect called")
try:
self._client.send(commands.Logout())
self._client.close()
except Exception:
logger.warning("Connection to registry was not cleanly closed.")
def _send(self, command):
"""Helper function used by `send`."""
logger.info("_send called")
cmd_type = command.__class__.__name__
# Start a timeout to check if the pool is hanging
timeout = Timeout(settings.POOL_TIMEOUT)
timeout.start()
try:
if not self.pool_status.connection_success:
raise LoginError("Couldn't connect to the registry after three attempts")
with self._pool.get() as connection:
response = connection.send(command)
except Timeout as t:
# If more than one pool exists,
# multiple timeouts can be floating around.
# We need to be specific as to which we are targeting.
if t is timeout:
# Flag that the pool is frozen,
# then restart the pool.
self.pool_status.pool_hanging = True
logger.error("Pool timed out")
self.start_connection_pool()
# check for the condition that the _client was not initialized properly
# at app initialization
if self._client is None:
self._initialize_client()
response = self._client.send(command)
except (ValueError, ParsingError) as err:
message = f"{cmd_type} failed to execute due to some syntax error."
logger.error(f"{message} Error: {err}", exc_info=True)
@ -131,109 +133,38 @@ class EPPLibWrapper:
raise RegistryError(response.msg, code=response.code)
else:
return response
finally:
# Close the timeout no matter what happens
timeout.close()
def send(self, command, *, cleaned=False):
"""Login, send the command, then close the connection. Tries 3 times."""
def _retry(self, command, *, cleaned=False):
"""Retry sending a command through EPP by re-initializing the client
and then sending the command."""
logger.info("_retry called")
# re-initialize by disconnecting and initial
self._disconnect()
self._initialize_client()
# try to prevent use of this method without appropriate safeguards
if not cleaned:
raise ValueError("Please sanitize user input before sending it.")
return self._send(command)
# Reopen the pool if its closed
# Only occurs when a login error is raised, after connection is successful
if not self.pool_status.pool_running:
# We want to reopen the connection pool,
# but we don't want the end user to wait while it opens.
# Raise syntax doesn't allow this, so we use a try/catch
# block.
try:
logger.error("Can't contact the Registry. Pool was not running.")
raise RegistryError("Can't contact the Registry. Pool was not running.")
except RegistryError as err:
raise err
finally:
# Code execution will halt after here.
# The end user will need to recall .send.
self.start_connection_pool()
counter = 0 # we'll try 3 times
while True:
def send(self, command, *, cleaned=False):
"""Login, send the command, then close the connection. Tries 3 times."""
logger.info("send called")
# try to prevent use of this method without appropriate safeguards
if not cleaned:
raise ValueError("Please sanitize user input before sending it.")
try:
return self._send(command)
except RegistryError as err:
if counter < 3 and (err.should_retry() or err.is_transport_error()):
logger.info(f"Retrying transport error. Attempt #{counter+1} of 3.")
counter += 1
sleep((counter * 50) / 1000) # sleep 50 ms to 150 ms
else: # don't try again
raise err
def get_pool(self):
"""Get the current pool instance"""
return self._pool
def _create_pool(self, client, login, options):
"""Creates and returns new pool instance"""
logger.info("New pool was created")
return EPPConnectionPool(client, login, options)
def start_connection_pool(self, restart_pool_if_exists=True):
"""Starts a connection pool for the registry.
restart_pool_if_exists -> bool:
If an instance of the pool already exists,
then then that instance will be killed first.
It is generally recommended to keep this enabled.
"""
# Since we reuse the same creds for each pool, we can test on
# one socket, and if successful, then we know we can connect.
if not self._test_registry_connection_success():
logger.warning("start_connection_pool() -> Cannot contact the Registry")
self.pool_status.connection_success = False
if (
err.is_transport_error()
or err.is_connection_error()
or err.is_session_error()
or err.is_server_error()
or err.should_retry()
):
return self._retry(command)
else:
self.pool_status.connection_success = True
# If this function is reinvoked, then ensure
# that we don't have duplicate data sitting around.
if self._pool is not None and restart_pool_if_exists:
logger.info("Connection pool restarting...")
self.kill_pool()
logger.info("Old pool killed")
self._pool = self._create_pool(self._client, self._login, self.pool_options)
self.pool_status.pool_running = True
self.pool_status.pool_hanging = False
logger.info("Connection pool started")
def kill_pool(self):
"""Kills the existing pool. Use this instead
of self._pool = None, as that doesn't clear
gevent instances."""
if self._pool is not None:
self._pool.kill_all_connections()
self._pool = None
self.pool_status.pool_running = False
return None
logger.info("kill_pool() was invoked but there was no pool to delete")
def _test_registry_connection_success(self):
"""Check that determines if our login
credentials are valid, and/or if the Registrar
can be contacted
"""
# This is closed in test_connection_success
socket = Socket(self._client, self._login)
can_login = False
# Something went wrong if this doesn't exist
if hasattr(socket, "test_connection_success"):
can_login = socket.test_connection_success()
return can_login
raise err
try:
@ -241,5 +172,4 @@ try:
CLIENT = EPPLibWrapper()
logger.info("registry client initialized")
except Exception:
CLIENT = None # type: ignore
logger.warning("Unable to configure epplib. Registrar cannot contact registry.", exc_info=True)

View file

@ -1,102 +0,0 @@
import logging
from time import sleep
try:
from epplib import commands
from epplib.client import Client
except ImportError:
pass
from .errors import LoginError
logger = logging.getLogger(__name__)
class Socket:
"""Context manager which establishes a TCP connection with registry."""
def __init__(self, client: Client, login: commands.Login) -> None:
"""Save the epplib client and login details."""
self.client = client
self.login = login
def __enter__(self):
"""Runs connect(), which opens a connection with EPPLib."""
self.connect()
def __exit__(self, *args, **kwargs):
"""Runs disconnect(), which closes a connection with EPPLib."""
self.disconnect()
def connect(self):
"""Use epplib to connect."""
logger.info("Opening socket on connection pool")
self.client.connect()
response = self.client.send(self.login)
if self.is_login_error(response.code):
self.client.close()
raise LoginError(response.msg)
return self.client
def disconnect(self):
"""Close the connection."""
logger.info("Closing socket on connection pool")
try:
self.client.send(commands.Logout())
self.client.close()
except Exception as err:
logger.warning("Connection to registry was not cleanly closed.")
logger.error(err)
def send(self, command):
"""Sends a command to the registry.
If the RegistryError code is >= 2000,
then this function raises a LoginError.
The calling function should handle this."""
response = self.client.send(command)
if self.is_login_error(response.code):
self.client.close()
raise LoginError(response.msg)
return response
def is_login_error(self, code):
"""Returns the result of code >= 2000 for RegistryError.
This indicates that something weird happened on the Registry,
and that we should return a LoginError."""
return code >= 2000
def test_connection_success(self):
"""Tests if a successful connection can be made with the registry.
Tries 3 times."""
# Something went wrong if this doesn't exist
if not hasattr(self.client, "connect"):
logger.warning("self.client does not have a connect attribute")
return False
counter = 0 # we'll try 3 times
while True:
try:
self.client.connect()
response = self.client.send(self.login)
except (LoginError, OSError) as err:
logger.error(err)
should_retry = True
if isinstance(err, LoginError):
should_retry = err.should_retry()
if should_retry and counter < 3:
counter += 1
sleep((counter * 50) / 1000) # sleep 50 ms to 150 ms
else: # don't try again
return False
else:
# If we encounter a login error, fail
if self.is_login_error(response.code):
logger.warning("A login error was found in test_connection_success")
return False
# Otherwise, just return true
return True
finally:
self.disconnect()

View file

@ -5,8 +5,6 @@ from dateutil.tz import tzlocal # type: ignore
from django.test import TestCase
from epplibwrapper.client import EPPLibWrapper
from epplibwrapper.errors import RegistryError
from epplibwrapper.socket import Socket
from epplibwrapper.utility.pool import EPPConnectionPool
from registrar.models.domain import registry
from contextlib import ExitStack
from .common import less_console_noise
@ -27,236 +25,236 @@ logger = logging.getLogger(__name__)
class TestConnectionPool(TestCase):
"""Tests for our connection pooling behaviour"""
def setUp(self):
# Mimic the settings added to settings.py
self.pool_options = {
# Current pool size
"size": 1,
# Which errors the pool should look out for
"exc_classes": (TransportError,),
# Occasionally pings the registry to keep the connection alive.
# Value in seconds => (keepalive / size)
"keepalive": 60,
}
# def setUp(self):
# # Mimic the settings added to settings.py
# self.pool_options = {
# # Current pool size
# "size": 1,
# # Which errors the pool should look out for
# "exc_classes": (TransportError,),
# # Occasionally pings the registry to keep the connection alive.
# # Value in seconds => (keepalive / size)
# "keepalive": 60,
# }
def fake_socket(self, login, client):
# Linter reasons
pw = "none"
# Create a fake client object
fake_client = Client(
SocketTransport(
"none",
cert_file="path/to/cert_file",
key_file="path/to/key_file",
password=pw,
)
)
# def fake_socket(self, login, client):
# # Linter reasons
# pw = "none"
# # Create a fake client object
# fake_client = Client(
# SocketTransport(
# "none",
# cert_file="path/to/cert_file",
# key_file="path/to/key_file",
# password=pw,
# )
# )
return Socket(fake_client, MagicMock())
# return Socket(fake_client, MagicMock())
def patch_success(self):
return True
# def patch_success(self):
# return True
def fake_send(self, command, cleaned=None):
mock = MagicMock(
code=1000,
msg="Command completed successfully",
res_data=None,
cl_tr_id="xkw1uo#2023-10-17T15:29:09.559376",
sv_tr_id="5CcH4gxISuGkq8eqvr1UyQ==-35a",
extensions=[],
msg_q=None,
)
return mock
# def fake_send(self, command, cleaned=None):
# mock = MagicMock(
# code=1000,
# msg="Command completed successfully",
# res_data=None,
# cl_tr_id="xkw1uo#2023-10-17T15:29:09.559376",
# sv_tr_id="5CcH4gxISuGkq8eqvr1UyQ==-35a",
# extensions=[],
# msg_q=None,
# )
# return mock
def fake_client(mock_client):
pw = "none"
client = Client(
SocketTransport(
"none",
cert_file="path/to/cert_file",
key_file="path/to/key_file",
password=pw,
)
)
return client
# def fake_client(mock_client):
# pw = "none"
# client = Client(
# SocketTransport(
# "none",
# cert_file="path/to/cert_file",
# key_file="path/to/key_file",
# password=pw,
# )
# )
# return client
@patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success)
def test_pool_sends_data(self):
"""A .send is invoked on the pool successfully"""
expected_result = {
"cl_tr_id": None,
"code": 1000,
"extensions": [],
"msg": "Command completed successfully",
"msg_q": None,
"res_data": [
info.InfoDomainResultData(
roid="DF1340360-GOV",
statuses=[
common.Status(
state="serverTransferProhibited",
description=None,
lang="en",
),
common.Status(state="inactive", description=None, lang="en"),
],
cl_id="gov2023-ote",
cr_id="gov2023-ote",
cr_date=datetime.datetime(2023, 8, 15, 23, 56, 36, tzinfo=tzlocal()),
up_id="gov2023-ote",
up_date=datetime.datetime(2023, 8, 17, 2, 3, 19, tzinfo=tzlocal()),
tr_date=None,
name="test3.gov",
registrant="TuaWnx9hnm84GCSU",
admins=[],
nsset=None,
keyset=None,
ex_date=datetime.date(2024, 8, 15),
auth_info=info.DomainAuthInfo(pw="2fooBAR123fooBaz"),
)
],
"sv_tr_id": "wRRNVhKhQW2m6wsUHbo/lA==-29a",
}
# @patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success)
# def test_pool_sends_data(self):
# """A .send is invoked on the pool successfully"""
# expected_result = {
# "cl_tr_id": None,
# "code": 1000,
# "extensions": [],
# "msg": "Command completed successfully",
# "msg_q": None,
# "res_data": [
# info.InfoDomainResultData(
# roid="DF1340360-GOV",
# statuses=[
# common.Status(
# state="serverTransferProhibited",
# description=None,
# lang="en",
# ),
# common.Status(state="inactive", description=None, lang="en"),
# ],
# cl_id="gov2023-ote",
# cr_id="gov2023-ote",
# cr_date=datetime.datetime(2023, 8, 15, 23, 56, 36, tzinfo=tzlocal()),
# up_id="gov2023-ote",
# up_date=datetime.datetime(2023, 8, 17, 2, 3, 19, tzinfo=tzlocal()),
# tr_date=None,
# name="test3.gov",
# registrant="TuaWnx9hnm84GCSU",
# admins=[],
# nsset=None,
# keyset=None,
# ex_date=datetime.date(2024, 8, 15),
# auth_info=info.DomainAuthInfo(pw="2fooBAR123fooBaz"),
# )
# ],
# "sv_tr_id": "wRRNVhKhQW2m6wsUHbo/lA==-29a",
# }
# Mock a response from EPP
def fake_receive(command, cleaned=None):
location = Path(__file__).parent / "utility" / "infoDomain.xml"
xml = (location).read_bytes()
return xml
# # Mock a response from EPP
# def fake_receive(command, cleaned=None):
# location = Path(__file__).parent / "utility" / "infoDomain.xml"
# xml = (location).read_bytes()
# return xml
def do_nothing(command):
pass
# def do_nothing(command):
# pass
# Mock what happens inside the "with"
with ExitStack() as stack:
stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket))
stack.enter_context(patch.object(Socket, "connect", self.fake_client))
stack.enter_context(patch.object(EPPConnectionPool, "kill_all_connections", do_nothing))
stack.enter_context(patch.object(SocketTransport, "send", self.fake_send))
stack.enter_context(patch.object(SocketTransport, "receive", fake_receive))
with less_console_noise():
# Restart the connection pool
registry.start_connection_pool()
# Pool should be running, and be the right size
self.assertEqual(registry.pool_status.connection_success, True)
self.assertEqual(registry.pool_status.pool_running, True)
# # Mock what happens inside the "with"
# with ExitStack() as stack:
# stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket))
# stack.enter_context(patch.object(Socket, "connect", self.fake_client))
# stack.enter_context(patch.object(EPPConnectionPool, "kill_all_connections", do_nothing))
# stack.enter_context(patch.object(SocketTransport, "send", self.fake_send))
# stack.enter_context(patch.object(SocketTransport, "receive", fake_receive))
# with less_console_noise():
# # Restart the connection pool
# registry.start_connection_pool()
# # Pool should be running, and be the right size
# self.assertEqual(registry.pool_status.connection_success, True)
# self.assertEqual(registry.pool_status.pool_running, True)
# Send a command
result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
# # Send a command
# result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
# Should this ever fail, it either means that the schema has changed,
# or the pool is broken.
# If the schema has changed: Update the associated infoDomain.xml file
self.assertEqual(result.__dict__, expected_result)
# # Should this ever fail, it either means that the schema has changed,
# # or the pool is broken.
# # If the schema has changed: Update the associated infoDomain.xml file
# self.assertEqual(result.__dict__, expected_result)
# The number of open pools should match the number of requested ones.
# If it is 0, then they failed to open
self.assertEqual(len(registry._pool.conn), self.pool_options["size"])
# Kill the connection pool
registry.kill_pool()
# # The number of open pools should match the number of requested ones.
# # If it is 0, then they failed to open
# self.assertEqual(len(registry._pool.conn), self.pool_options["size"])
# # Kill the connection pool
# registry.kill_pool()
@patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success)
def test_pool_restarts_on_send(self):
"""A .send is invoked, but the pool isn't running.
The pool should restart."""
expected_result = {
"cl_tr_id": None,
"code": 1000,
"extensions": [],
"msg": "Command completed successfully",
"msg_q": None,
"res_data": [
info.InfoDomainResultData(
roid="DF1340360-GOV",
statuses=[
common.Status(
state="serverTransferProhibited",
description=None,
lang="en",
),
common.Status(state="inactive", description=None, lang="en"),
],
cl_id="gov2023-ote",
cr_id="gov2023-ote",
cr_date=datetime.datetime(2023, 8, 15, 23, 56, 36, tzinfo=tzlocal()),
up_id="gov2023-ote",
up_date=datetime.datetime(2023, 8, 17, 2, 3, 19, tzinfo=tzlocal()),
tr_date=None,
name="test3.gov",
registrant="TuaWnx9hnm84GCSU",
admins=[],
nsset=None,
keyset=None,
ex_date=datetime.date(2024, 8, 15),
auth_info=info.DomainAuthInfo(pw="2fooBAR123fooBaz"),
)
],
"sv_tr_id": "wRRNVhKhQW2m6wsUHbo/lA==-29a",
}
# @patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success)
# def test_pool_restarts_on_send(self):
# """A .send is invoked, but the pool isn't running.
# The pool should restart."""
# expected_result = {
# "cl_tr_id": None,
# "code": 1000,
# "extensions": [],
# "msg": "Command completed successfully",
# "msg_q": None,
# "res_data": [
# info.InfoDomainResultData(
# roid="DF1340360-GOV",
# statuses=[
# common.Status(
# state="serverTransferProhibited",
# description=None,
# lang="en",
# ),
# common.Status(state="inactive", description=None, lang="en"),
# ],
# cl_id="gov2023-ote",
# cr_id="gov2023-ote",
# cr_date=datetime.datetime(2023, 8, 15, 23, 56, 36, tzinfo=tzlocal()),
# up_id="gov2023-ote",
# up_date=datetime.datetime(2023, 8, 17, 2, 3, 19, tzinfo=tzlocal()),
# tr_date=None,
# name="test3.gov",
# registrant="TuaWnx9hnm84GCSU",
# admins=[],
# nsset=None,
# keyset=None,
# ex_date=datetime.date(2024, 8, 15),
# auth_info=info.DomainAuthInfo(pw="2fooBAR123fooBaz"),
# )
# ],
# "sv_tr_id": "wRRNVhKhQW2m6wsUHbo/lA==-29a",
# }
# Mock a response from EPP
def fake_receive(command, cleaned=None):
location = Path(__file__).parent / "utility" / "infoDomain.xml"
xml = (location).read_bytes()
return xml
# # Mock a response from EPP
# def fake_receive(command, cleaned=None):
# location = Path(__file__).parent / "utility" / "infoDomain.xml"
# xml = (location).read_bytes()
# return xml
def do_nothing(command):
pass
# def do_nothing(command):
# pass
# Mock what happens inside the "with"
with ExitStack() as stack:
stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket))
stack.enter_context(patch.object(Socket, "connect", self.fake_client))
stack.enter_context(patch.object(EPPConnectionPool, "kill_all_connections", do_nothing))
stack.enter_context(patch.object(SocketTransport, "send", self.fake_send))
stack.enter_context(patch.object(SocketTransport, "receive", fake_receive))
with less_console_noise():
# Start the connection pool
registry.start_connection_pool()
# Kill the connection pool
registry.kill_pool()
# # Mock what happens inside the "with"
# with ExitStack() as stack:
# stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket))
# stack.enter_context(patch.object(Socket, "connect", self.fake_client))
# stack.enter_context(patch.object(EPPConnectionPool, "kill_all_connections", do_nothing))
# stack.enter_context(patch.object(SocketTransport, "send", self.fake_send))
# stack.enter_context(patch.object(SocketTransport, "receive", fake_receive))
# with less_console_noise():
# # Start the connection pool
# registry.start_connection_pool()
# # Kill the connection pool
# registry.kill_pool()
self.assertEqual(registry.pool_status.pool_running, False)
# self.assertEqual(registry.pool_status.pool_running, False)
# An exception should be raised as end user will be informed
# that they cannot connect to EPP
with self.assertRaises(RegistryError):
expected = "InfoDomain failed to execute due to a connection error."
result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
self.assertEqual(result, expected)
# # An exception should be raised as end user will be informed
# # that they cannot connect to EPP
# with self.assertRaises(RegistryError):
# expected = "InfoDomain failed to execute due to a connection error."
# result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
# self.assertEqual(result, expected)
# A subsequent command should be successful, as the pool restarts
result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
# Should this ever fail, it either means that the schema has changed,
# or the pool is broken.
# If the schema has changed: Update the associated infoDomain.xml file
self.assertEqual(result.__dict__, expected_result)
# # A subsequent command should be successful, as the pool restarts
# result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
# # Should this ever fail, it either means that the schema has changed,
# # or the pool is broken.
# # If the schema has changed: Update the associated infoDomain.xml file
# self.assertEqual(result.__dict__, expected_result)
# The number of open pools should match the number of requested ones.
# If it is 0, then they failed to open
self.assertEqual(len(registry._pool.conn), self.pool_options["size"])
# Kill the connection pool
registry.kill_pool()
# # The number of open pools should match the number of requested ones.
# # If it is 0, then they failed to open
# self.assertEqual(len(registry._pool.conn), self.pool_options["size"])
# # Kill the connection pool
# registry.kill_pool()
@patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success)
def test_raises_connection_error(self):
"""A .send is invoked on the pool, but registry connection is lost
right as we send a command."""
# @patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success)
# def test_raises_connection_error(self):
# """A .send is invoked on the pool, but registry connection is lost
# right as we send a command."""
with ExitStack() as stack:
stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket))
stack.enter_context(patch.object(Socket, "connect", self.fake_client))
with less_console_noise():
# Start the connection pool
registry.start_connection_pool()
# with ExitStack() as stack:
# stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket))
# stack.enter_context(patch.object(Socket, "connect", self.fake_client))
# with less_console_noise():
# # Start the connection pool
# registry.start_connection_pool()
# Pool should be running
self.assertEqual(registry.pool_status.connection_success, True)
self.assertEqual(registry.pool_status.pool_running, True)
# # Pool should be running
# self.assertEqual(registry.pool_status.connection_success, True)
# self.assertEqual(registry.pool_status.pool_running, True)
# Try to send a command out - should fail
with self.assertRaises(RegistryError):
expected = "InfoDomain failed to execute due to a connection error."
result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
self.assertEqual(result, expected)
# # Try to send a command out - should fail
# with self.assertRaises(RegistryError):
# expected = "InfoDomain failed to execute due to a connection error."
# result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
# self.assertEqual(result, expected)

View file

@ -1,151 +0,0 @@
import logging
from typing import List
import gevent
from geventconnpool import ConnectionPool
from epplibwrapper.socket import Socket
from epplibwrapper.utility.pool_error import PoolError, PoolErrorCodes
try:
from epplib.commands import Hello
from epplib.exceptions import TransportError
except ImportError:
pass
from gevent.lock import BoundedSemaphore
from collections import deque
logger = logging.getLogger(__name__)
class EPPConnectionPool(ConnectionPool):
"""A connection pool for EPPLib.
Args:
client (Client): The client
login (commands.Login): Login creds
options (dict): Options for the ConnectionPool
base class
"""
def __init__(self, client, login, options: dict):
# For storing shared credentials
self._client = client
self._login = login
# Keep track of each greenlet
self.greenlets: List[gevent.Greenlet] = []
# Define optional pool settings.
# Kept in a dict so that the parent class,
# client.py, can maintain seperation/expandability
self.size = 1
if "size" in options:
self.size = options["size"]
self.exc_classes = tuple((TransportError,))
if "exc_classes" in options:
self.exc_classes = options["exc_classes"]
self.keepalive = None
if "keepalive" in options:
self.keepalive = options["keepalive"]
# Determines the period in which new
# gevent threads are spun up.
# This time period is in seconds. So for instance, .1 would be .1 seconds.
self.spawn_frequency = 0.1
if "spawn_frequency" in options:
self.spawn_frequency = options["spawn_frequency"]
self.conn: deque = deque()
self.lock = BoundedSemaphore(self.size)
self.populate_all_connections()
def _new_connection(self):
socket = self._create_socket(self._client, self._login)
try:
connection = socket.connect()
return connection
except Exception as err:
message = f"Failed to execute due to a registry error: {err}"
logger.error(message, exc_info=True)
# We want to raise a pool error rather than a LoginError here
# because if this occurs internally, we should handle this
# differently than we otherwise would for LoginError.
raise PoolError(code=PoolErrorCodes.NEW_CONNECTION_FAILED) from err
def _keepalive(self, c):
"""Sends a command to the server to keep the connection alive."""
try:
# Sends a ping to the registry via EPPLib
c.send(Hello())
except Exception as err:
message = "Failed to keep the connection alive."
logger.error(message, exc_info=True)
raise PoolError(code=PoolErrorCodes.KEEP_ALIVE_FAILED) from err
def _keepalive_periodic(self):
"""Overriding _keepalive_periodic from geventconnpool so that PoolErrors
are properly handled, as opposed to printing to stdout"""
delay = float(self.keepalive) / self.size
while 1:
try:
with self.get() as c:
self._keepalive(c)
except PoolError as err:
logger.error(err.message, exc_info=True)
except self.exc_classes:
# Nothing to do, the pool will generate a new connection later
pass
gevent.sleep(delay)
def _create_socket(self, client, login) -> Socket:
"""Creates and returns a socket instance"""
socket = Socket(client, login)
return socket
def get_connections(self):
"""Returns the connection queue"""
return self.conn
def kill_all_connections(self):
"""Kills all active connections in the pool."""
try:
if len(self.conn) > 0 or len(self.greenlets) > 0:
logger.info("Attempting to kill connections")
gevent.killall(self.greenlets)
self.greenlets.clear()
for connection in self.conn:
connection.disconnect()
self.conn.clear()
# Clear the semaphore
self.lock = BoundedSemaphore(self.size)
logger.info("Finished killing connections")
else:
logger.info("No connections to kill.")
except Exception as err:
logger.error("Could not kill all connections.")
raise PoolError(code=PoolErrorCodes.KILL_ALL_FAILED) from err
def populate_all_connections(self):
"""Generates the connection pool.
If any connections exist, kill them first.
Based off of the __init__ definition for geventconnpool.
"""
if len(self.conn) > 0 or len(self.greenlets) > 0:
self.kill_all_connections()
# Setup the lock
for i in range(self.size):
self.lock.acquire()
# Open multiple connections
for i in range(self.size):
self.greenlets.append(gevent.spawn_later(self.spawn_frequency * i, self._addOne))
# Open a "keepalive" thread if we want to ping open connections
if self.keepalive:
self.greenlets.append(gevent.spawn(self._keepalive_periodic))

View file

@ -1,46 +0,0 @@
from enum import IntEnum
class PoolErrorCodes(IntEnum):
"""Used in the PoolError class for
error mapping.
Overview of contact error codes:
- 2000 KILL_ALL_FAILED
- 2001 NEW_CONNECTION_FAILED
- 2002 KEEP_ALIVE_FAILED
"""
KILL_ALL_FAILED = 2000
NEW_CONNECTION_FAILED = 2001
KEEP_ALIVE_FAILED = 2002
class PoolError(Exception):
"""
Overview of contact error codes:
- 2000 KILL_ALL_FAILED
- 2001 NEW_CONNECTION_FAILED
- 2002 KEEP_ALIVE_FAILED
Note: These are separate from the error codes returned from EppLib
"""
# Used variables due to linter requirements
kill_failed = "Could not kill all connections. Are multiple pools running?"
conn_failed = "Failed to execute due to a registry error. See previous logs to determine the cause of the error."
alive_failed = "Failed to keep the connection alive. It is likely that the registry returned a LoginError."
_error_mapping = {
PoolErrorCodes.KILL_ALL_FAILED: kill_failed,
PoolErrorCodes.NEW_CONNECTION_FAILED: conn_failed,
PoolErrorCodes.KEEP_ALIVE_FAILED: alive_failed,
}
def __init__(self, *args, code=None, **kwargs):
super().__init__(*args, **kwargs)
self.code = code
if self.code in self._error_mapping:
self.message = self._error_mapping.get(self.code)
def __str__(self):
return f"{self.message}"

View file

@ -1,12 +0,0 @@
class PoolStatus:
"""A list of Booleans to keep track of Pool Status.
pool_running -> bool: Tracks if the pool itself is active or not.
connection_success -> bool: Tracks if connection is possible with the registry.
pool_hanging -> pool: Tracks if the pool has exceeded its timeout period.
"""
def __init__(self):
self.pool_running = False
self.connection_success = False
self.pool_hanging = False

View file

@ -601,20 +601,6 @@ SECRET_REGISTRY_KEY = secret_registry_key
SECRET_REGISTRY_KEY_PASSPHRASE = secret_registry_key_passphrase
SECRET_REGISTRY_HOSTNAME = secret_registry_hostname
# Use this variable to set the size of our connection pool in client.py
# WARNING: Setting this value too high could cause frequent app crashes!
# Having too many connections open could cause the sandbox to timeout,
# as the spinup time could exceed the timeout time.
EPP_CONNECTION_POOL_SIZE = 1
# Determines the interval in which we ping open connections in seconds
# Calculated as POOL_KEEP_ALIVE / EPP_CONNECTION_POOL_SIZE
POOL_KEEP_ALIVE = 60
# Determines how long we try to keep a pool alive for,
# before restarting it.
POOL_TIMEOUT = 60
# endregion
# region: Security and Privacy----------------------------------------------###

View file

@ -5,7 +5,6 @@ import phonenumber_field.modelfields
class Migration(migrations.Migration):
dependencies = [
("registrar", "0070_domainapplication_rejection_reason"),
]