Check if registry connection can be made

This commit is contained in:
zandercymatics 2023-10-12 14:38:49 -06:00
parent c4d2950ac9
commit dfec8c200e
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
2 changed files with 54 additions and 11 deletions

View file

@ -15,6 +15,7 @@ from django.conf import settings
from .cert import Cert, Key
from .errors import LoginError, RegistryError
from .socket import Socket
from .utility.pool import EppConnectionPool
logger = logging.getLogger(__name__)
@ -41,7 +42,6 @@ class EPPLibWrapper:
def __init__(self) -> None:
"""Initialize settings which will be used for all connections."""
# prepare (but do not send) a Login command
self._login = commands.Login(
cl_id=settings.SECRET_REGISTRY_CL_ID,
@ -51,6 +51,7 @@ class EPPLibWrapper:
"urn:ietf:params:xml:ns:contact-1.0",
],
)
# establish a client object with a TCP socket transport
self._client = Client(
SocketTransport(
@ -71,19 +72,23 @@ class EPPLibWrapper:
# Occasionally pings the registry to keep the connection alive
"keepalive": settings.POOL_KEEP_ALIVE,
}
self._pool = EppConnectionPool(
client=self._client, login=self._login, options=options
)
self._pool = None
if not settings.DEBUG or self._test_registry_connection_success():
self._pool = EppConnectionPool(
client=self._client, login=self._login, options=options
)
else:
logger.warning("Cannot contact the Registry")
# TODO - signal that the app may need to restart?
def _send(self, command):
"""Helper function used by `send`."""
cmd_type = command.__class__.__name__
try:
# We won't have an EPP connection locally,
# shortcut this and raise an err
# TODO - implement a timeout in _pool.get()
if settings.DEBUG:
if self._pool is None:
raise LoginError
# TODO - add a timeout
with self._pool.get() as connection:
response = connection.send(command)
except (ValueError, ParsingError) as err:
@ -127,6 +132,18 @@ class EPPLibWrapper:
else: # don't try again
raise err
def _test_registry_connection_success(self):
"""Check that determines if our login
credentials are valid, and/or if the Registrar
can be contacted
"""
socket = Socket(self._login, self._client)
can_login = False
# Something went wrong if this doesn't exist
if hasattr(socket, "test_connection_success"):
can_login = socket.test_connection_success()
return can_login
try:
# Initialize epplib