mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-05-19 19:09:22 +02:00
test cases written; cleanup; small fix to _retry method signature
This commit is contained in:
parent
c5d525bd48
commit
99d34682b1
3 changed files with 268 additions and 278 deletions
|
@ -42,7 +42,6 @@ class EPPLibWrapper:
|
|||
# set _client to None initially. In the event that the __init__ fails
|
||||
# before _client initializes, app should still start and be in a state
|
||||
# that it can attempt _client initialization on send attempts
|
||||
logger.info("__init__ called")
|
||||
self._client = None
|
||||
# prepare (but do not send) a Login command
|
||||
self._login = commands.Login(
|
||||
|
@ -56,13 +55,12 @@ class EPPLibWrapper:
|
|||
try:
|
||||
self._initialize_client()
|
||||
except Exception:
|
||||
logger.warning("Unable to configure epplib. Registrar cannot contact registry.", exc_info=True)
|
||||
logger.warning("Unable to configure epplib. Registrar cannot contact registry.")
|
||||
|
||||
def _initialize_client(self) -> None:
|
||||
"""Initialize a client, assuming _login defined. Sets _client to initialized
|
||||
client. Raises errors if initialization fails.
|
||||
This method will be called at app initialization, and also during retries."""
|
||||
logger.info("_initialize_client called")
|
||||
# establish a client object with a TCP socket transport
|
||||
self._client = Client(
|
||||
SocketTransport(
|
||||
|
@ -81,18 +79,17 @@ class EPPLibWrapper:
|
|||
raise LoginError(response.msg) # type: ignore
|
||||
except TransportError as err:
|
||||
message = "_initialize_client failed to execute due to a connection error."
|
||||
logger.error(f"{message} Error: {err}", exc_info=True)
|
||||
logger.error(f"{message} Error: {err}")
|
||||
raise RegistryError(message, code=ErrorCode.TRANSPORT_ERROR) from err
|
||||
except LoginError as err:
|
||||
raise err
|
||||
except Exception as err:
|
||||
message = "_initialize_client failed to execute due to an unknown error."
|
||||
logger.error(f"{message} Error: {err}", exc_info=True)
|
||||
logger.error(f"{message} Error: {err}")
|
||||
raise RegistryError(message) from err
|
||||
|
||||
def _disconnect(self) -> None:
|
||||
"""Close the connection."""
|
||||
logger.info("_disconnect called")
|
||||
try:
|
||||
self._client.send(commands.Logout())
|
||||
self._client.close()
|
||||
|
@ -101,7 +98,6 @@ class EPPLibWrapper:
|
|||
|
||||
def _send(self, command):
|
||||
"""Helper function used by `send`."""
|
||||
logger.info("_send called")
|
||||
cmd_type = command.__class__.__name__
|
||||
|
||||
try:
|
||||
|
@ -112,21 +108,21 @@ class EPPLibWrapper:
|
|||
response = self._client.send(command)
|
||||
except (ValueError, ParsingError) as err:
|
||||
message = f"{cmd_type} failed to execute due to some syntax error."
|
||||
logger.error(f"{message} Error: {err}", exc_info=True)
|
||||
logger.error(f"{message} Error: {err}")
|
||||
raise RegistryError(message) from err
|
||||
except TransportError as err:
|
||||
message = f"{cmd_type} failed to execute due to a connection error."
|
||||
logger.error(f"{message} Error: {err}", exc_info=True)
|
||||
logger.error(f"{message} Error: {err}")
|
||||
raise RegistryError(message, code=ErrorCode.TRANSPORT_ERROR) from err
|
||||
except LoginError as err:
|
||||
# For linter due to it not liking this line length
|
||||
text = "failed to execute due to a registry login error."
|
||||
message = f"{cmd_type} {text}"
|
||||
logger.error(f"{message} Error: {err}", exc_info=True)
|
||||
logger.error(f"{message} Error: {err}")
|
||||
raise RegistryError(message) from err
|
||||
except Exception as err:
|
||||
message = f"{cmd_type} failed to execute due to an unknown error."
|
||||
logger.error(f"{message} Error: {err}", exc_info=True)
|
||||
logger.error(f"{message} Error: {err}")
|
||||
raise RegistryError(message) from err
|
||||
else:
|
||||
if response.code >= 2000:
|
||||
|
@ -134,21 +130,16 @@ class EPPLibWrapper:
|
|||
else:
|
||||
return response
|
||||
|
||||
def _retry(self, command, *, cleaned=False):
|
||||
def _retry(self, command):
|
||||
"""Retry sending a command through EPP by re-initializing the client
|
||||
and then sending the command."""
|
||||
logger.info("_retry called")
|
||||
# re-initialize by disconnecting and initial
|
||||
self._disconnect()
|
||||
self._initialize_client()
|
||||
# try to prevent use of this method without appropriate safeguards
|
||||
if not cleaned:
|
||||
raise ValueError("Please sanitize user input before sending it.")
|
||||
return self._send(command)
|
||||
|
||||
def send(self, command, *, cleaned=False):
|
||||
"""Login, send the command, then close the connection. Tries 3 times."""
|
||||
logger.info("send called")
|
||||
# try to prevent use of this method without appropriate safeguards
|
||||
if not cleaned:
|
||||
raise ValueError("Please sanitize user input before sending it.")
|
||||
|
@ -172,4 +163,4 @@ try:
|
|||
CLIENT = EPPLibWrapper()
|
||||
logger.info("registry client initialized")
|
||||
except Exception:
|
||||
logger.warning("Unable to configure epplib. Registrar cannot contact registry.", exc_info=True)
|
||||
logger.warning("Unable to configure epplib. Registrar cannot contact registry.")
|
||||
|
|
259
src/epplibwrapper/tests/test_client.py
Normal file
259
src/epplibwrapper/tests/test_client.py
Normal file
|
@ -0,0 +1,259 @@
|
|||
from unittest.mock import MagicMock, patch
|
||||
from django.test import TestCase
|
||||
from epplibwrapper.client import EPPLibWrapper
|
||||
from epplibwrapper.errors import RegistryError, LoginError
|
||||
from .common import less_console_noise
|
||||
import logging
|
||||
|
||||
try:
|
||||
from epplib import commands
|
||||
from epplib.client import Client
|
||||
from epplib.exceptions import TransportError
|
||||
from epplib.transport import SocketTransport
|
||||
from epplib.models import common, info
|
||||
from epplib.responses import Result
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestClient(TestCase):
|
||||
"""Test the EPPlibwrapper client"""
|
||||
|
||||
def fake_result(self, code, msg):
|
||||
"""Helper function to create a fake Result object"""
|
||||
return Result(
|
||||
code=code,
|
||||
msg=msg,
|
||||
res_data=[],
|
||||
cl_tr_id="cl_tr_id",
|
||||
sv_tr_id="sv_tr_id"
|
||||
)
|
||||
|
||||
@patch("epplibwrapper.client.Client")
|
||||
def test_initialize_client_success(self, mock_client):
|
||||
"""Test when the initialize_client is successful"""
|
||||
with less_console_noise():
|
||||
# Mock the Client instance and its methods
|
||||
mock_connect = MagicMock()
|
||||
# Create a mock Result instance
|
||||
mock_result = MagicMock(spec=Result)
|
||||
mock_result.code = 200
|
||||
mock_result.msg = "Success"
|
||||
mock_result.res_data = ["data1", "data2"]
|
||||
mock_result.cl_tr_id = "client_id"
|
||||
mock_result.sv_tr_id = "server_id"
|
||||
mock_send = MagicMock(return_value=mock_result)
|
||||
mock_client.return_value.connect = mock_connect
|
||||
mock_client.return_value.send = mock_send
|
||||
|
||||
# Create EPPLibWrapper instance and initialize client
|
||||
wrapper = EPPLibWrapper()
|
||||
|
||||
# Assert that connect method is called once
|
||||
mock_connect.assert_called_once()
|
||||
# Assert that _client is not None after initialization
|
||||
self.assertIsNotNone(wrapper._client)
|
||||
|
||||
@patch("epplibwrapper.client.Client")
|
||||
def test_initialize_client_transport_error(self, mock_client):
|
||||
"""Test when the send(login) step of initialize_client raises a TransportError."""
|
||||
with less_console_noise():
|
||||
# Mock the Client instance and its methods
|
||||
mock_connect = MagicMock()
|
||||
mock_send = MagicMock(side_effect=TransportError("Transport error"))
|
||||
mock_client.return_value.connect = mock_connect
|
||||
mock_client.return_value.send = mock_send
|
||||
|
||||
with self.assertRaises(RegistryError):
|
||||
# Create EPPLibWrapper instance and initialize client
|
||||
# if functioning as expected, initial __init__ should except
|
||||
# and log any Exception raised
|
||||
wrapper = EPPLibWrapper()
|
||||
# so call _initialize_client a second time directly to test
|
||||
# the raised exception
|
||||
wrapper._initialize_client()
|
||||
|
||||
@patch("epplibwrapper.client.Client")
|
||||
def test_initialize_client_login_error(self, mock_client):
|
||||
"""Test when the send(login) step of initialize_client returns (2400) comamnd failed code."""
|
||||
with less_console_noise():
|
||||
# Mock the Client instance and its methods
|
||||
mock_connect = MagicMock()
|
||||
# Create a mock Result instance
|
||||
mock_result = MagicMock(spec=Result)
|
||||
mock_result.code = 2400
|
||||
mock_result.msg = "Login failed"
|
||||
mock_result.res_data = ["data1", "data2"]
|
||||
mock_result.cl_tr_id = "client_id"
|
||||
mock_result.sv_tr_id = "server_id"
|
||||
mock_send = MagicMock(return_value=mock_result)
|
||||
mock_client.return_value.connect = mock_connect
|
||||
mock_client.return_value.send = mock_send
|
||||
|
||||
with self.assertRaises(LoginError):
|
||||
# Create EPPLibWrapper instance and initialize client
|
||||
# if functioning as expected, initial __init__ should except
|
||||
# and log any Exception raised
|
||||
wrapper = EPPLibWrapper()
|
||||
# so call _initialize_client a second time directly to test
|
||||
# the raised exception
|
||||
wrapper._initialize_client()
|
||||
|
||||
@patch("epplibwrapper.client.Client")
|
||||
def test_initialize_client_unknown_exception(self, mock_client):
|
||||
"""Test when the send(login) step of initialize_client raises an unexpected Exception."""
|
||||
with less_console_noise():
|
||||
# Mock the Client instance and its methods
|
||||
mock_connect = MagicMock()
|
||||
mock_send = MagicMock(side_effect=Exception("Unknown exception"))
|
||||
mock_client.return_value.connect = mock_connect
|
||||
mock_client.return_value.send = mock_send
|
||||
|
||||
with self.assertRaises(RegistryError):
|
||||
# Create EPPLibWrapper instance and initialize client
|
||||
# if functioning as expected, initial __init__ should except
|
||||
# and log any Exception raised
|
||||
wrapper = EPPLibWrapper()
|
||||
# so call _initialize_client a second time directly to test
|
||||
# the raised exception
|
||||
wrapper._initialize_client()
|
||||
|
||||
@patch("epplibwrapper.client.Client")
|
||||
def test_initialize_client_fails_recovers_with_send_command(self, mock_client):
|
||||
"""Test when the initialize_client fails on the connect() step. And then a subsequent
|
||||
call to send() should recover and re-initialize the client and properly return
|
||||
the successful send command.
|
||||
Flow:
|
||||
Initialization step fails at app init
|
||||
Send command fails (with 2400 code) prompting retry
|
||||
Client closes and re-initializes, and command is sent successfully"""
|
||||
with less_console_noise():
|
||||
# Mock the Client instance and its methods
|
||||
# close() should return successfully
|
||||
mock_close = MagicMock()
|
||||
mock_client.return_value.close = mock_close
|
||||
# Create success and failure results
|
||||
command_success_result = self.fake_result(1000, "Command completed successfully")
|
||||
command_failure_result = self.fake_result(2400, "Command failed")
|
||||
# side_effect for the connect() calls
|
||||
# first connect() should raise an Exception
|
||||
# subsequent connect() calls should return success
|
||||
connect_call_count = 0
|
||||
def connect_side_effect(*args, **kwargs):
|
||||
nonlocal connect_call_count
|
||||
connect_call_count += 1
|
||||
if connect_call_count == 1:
|
||||
raise Exception("Connection failed")
|
||||
else:
|
||||
return command_success_result
|
||||
mock_connect = MagicMock(side_effect=connect_side_effect)
|
||||
mock_client.return_value.connect = mock_connect
|
||||
# side_effect for the send() calls
|
||||
# first send will be the send("InfoDomainCommand") and should fail
|
||||
# subsequend send() calls should return success
|
||||
send_call_count = 0
|
||||
def send_side_effect(*args, **kwargs):
|
||||
nonlocal send_call_count
|
||||
send_call_count += 1
|
||||
if send_call_count == 1:
|
||||
return command_failure_result
|
||||
else:
|
||||
return command_success_result
|
||||
mock_send = MagicMock(side_effect=send_side_effect)
|
||||
mock_client.return_value.send = mock_send
|
||||
# Create EPPLibWrapper instance and call send command
|
||||
wrapper = EPPLibWrapper()
|
||||
wrapper.send("InfoDomainCommand", cleaned=True)
|
||||
# two connect() calls should be made, the initial failed connect()
|
||||
# and the successful connect() during retry()
|
||||
self.assertEquals(mock_connect.call_count,2)
|
||||
# close() should only be called once, during retry()
|
||||
mock_close.assert_called_once()
|
||||
# send called 4 times: failed send("InfoDomainCommand"), passed send(logout),
|
||||
# passed send(login), passed send("InfoDomainCommand")
|
||||
self.assertEquals(mock_send.call_count,4)
|
||||
|
||||
@patch("epplibwrapper.client.Client")
|
||||
def test_send_command_failed_retries_and_fails_again(self, mock_client):
|
||||
"""Test when the send("InfoDomainCommand) call fails with a 2400, prompting a retry
|
||||
and the subsequent send("InfoDomainCommand) call also fails with a 2400, raise
|
||||
a RegistryError
|
||||
Flow:
|
||||
Initialization succeeds
|
||||
Send command fails (with 2400 code) prompting retry
|
||||
Client closes and re-initializes, and command fails again with 2400"""
|
||||
with less_console_noise():
|
||||
# Mock the Client instance and its methods
|
||||
# connect() and close() should succeed throughout
|
||||
mock_connect = MagicMock()
|
||||
mock_close = MagicMock()
|
||||
# Create a mock Result instance
|
||||
send_command_success_result = self.fake_result(1000, "Command completed successfully")
|
||||
send_command_failure_result = self.fake_result(2400, "Command failed")
|
||||
# side_effect for send command, passes for all other sends (login, logout), but
|
||||
# fails for send("InfoDomainCommand")
|
||||
def side_effect(*args, **kwargs):
|
||||
if args[0] == "InfoDomainCommand":
|
||||
return send_command_failure_result
|
||||
else:
|
||||
return send_command_success_result
|
||||
mock_send = MagicMock(side_effect=side_effect)
|
||||
mock_client.return_value.connect = mock_connect
|
||||
mock_client.return_value.close = mock_close
|
||||
mock_client.return_value.send = mock_send
|
||||
|
||||
with self.assertRaises(RegistryError):
|
||||
# Create EPPLibWrapper instance and initialize client
|
||||
wrapper = EPPLibWrapper()
|
||||
# call send, which should throw a RegistryError (after retry)
|
||||
wrapper.send("InfoDomainCommand", cleaned=True)
|
||||
# connect() should be called twice, once during initialization, second time
|
||||
# during retry
|
||||
self.assertEquals(mock_connect.call_count,2)
|
||||
# close() is called once during retry
|
||||
mock_close.assert_called_once()
|
||||
# send() is called 5 times: send(login), send(command) fails, send(logout)
|
||||
# send(login), send(command)
|
||||
self.assertEquals(mock_send.call_count,5)
|
||||
|
||||
@patch("epplibwrapper.client.Client")
|
||||
def test_send_command_failure_prompts_successful_retry(self, mock_client):
|
||||
"""Test when the send("InfoDomainCommand) call fails with a 2400, prompting a retry
|
||||
and the subsequent send("InfoDomainCommand) call succeeds
|
||||
Flow:
|
||||
Initialization succeeds
|
||||
Send command fails (with 2400 code) prompting retry
|
||||
Client closes and re-initializes, and command succeeds"""
|
||||
with less_console_noise():
|
||||
# Mock the Client instance and its methods
|
||||
# connect() and close() should succeed throughout
|
||||
mock_connect = MagicMock()
|
||||
mock_close = MagicMock()
|
||||
# create success and failure result messages
|
||||
send_command_success_result = self.fake_result(1000, "Command completed successfully")
|
||||
send_command_failure_result = self.fake_result(2400, "Command failed")
|
||||
# side_effect for send call, initial send(login) succeeds during initialization, next send(command)
|
||||
# fails, subsequent sends (logout, login, command) all succeed
|
||||
send_call_count = 0
|
||||
def side_effect(*args, **kwargs):
|
||||
nonlocal send_call_count
|
||||
send_call_count += 1
|
||||
if send_call_count == 2:
|
||||
return send_command_failure_result
|
||||
else:
|
||||
return send_command_success_result
|
||||
mock_send = MagicMock(side_effect=side_effect)
|
||||
mock_client.return_value.connect = mock_connect
|
||||
mock_client.return_value.close = mock_close
|
||||
mock_client.return_value.send = mock_send
|
||||
# Create EPPLibWrapper instance and initialize client
|
||||
wrapper = EPPLibWrapper()
|
||||
wrapper.send("InfoDomainCommand", cleaned=True)
|
||||
# connect() is called twice, once during initialization of app, once during retry
|
||||
self.assertEquals(mock_connect.call_count,2)
|
||||
# close() is called once, during retry
|
||||
mock_close.assert_called_once()
|
||||
# send() is called 5 times: send(login), send(command) fail, send(logout), send(login), send(command)
|
||||
self.assertEquals(mock_send.call_count,5)
|
|
@ -1,260 +0,0 @@
|
|||
import datetime
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
from dateutil.tz import tzlocal # type: ignore
|
||||
from django.test import TestCase
|
||||
from epplibwrapper.client import EPPLibWrapper
|
||||
from epplibwrapper.errors import RegistryError
|
||||
from registrar.models.domain import registry
|
||||
from contextlib import ExitStack
|
||||
from .common import less_console_noise
|
||||
import logging
|
||||
|
||||
try:
|
||||
from epplib import commands
|
||||
from epplib.client import Client
|
||||
from epplib.exceptions import TransportError
|
||||
from epplib.transport import SocketTransport
|
||||
from epplib.models import common, info
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestConnectionPool(TestCase):
|
||||
"""Tests for our connection pooling behaviour"""
|
||||
|
||||
# def setUp(self):
|
||||
# # Mimic the settings added to settings.py
|
||||
# self.pool_options = {
|
||||
# # Current pool size
|
||||
# "size": 1,
|
||||
# # Which errors the pool should look out for
|
||||
# "exc_classes": (TransportError,),
|
||||
# # Occasionally pings the registry to keep the connection alive.
|
||||
# # Value in seconds => (keepalive / size)
|
||||
# "keepalive": 60,
|
||||
# }
|
||||
|
||||
# def fake_socket(self, login, client):
|
||||
# # Linter reasons
|
||||
# pw = "none"
|
||||
# # Create a fake client object
|
||||
# fake_client = Client(
|
||||
# SocketTransport(
|
||||
# "none",
|
||||
# cert_file="path/to/cert_file",
|
||||
# key_file="path/to/key_file",
|
||||
# password=pw,
|
||||
# )
|
||||
# )
|
||||
|
||||
# return Socket(fake_client, MagicMock())
|
||||
|
||||
# def patch_success(self):
|
||||
# return True
|
||||
|
||||
# def fake_send(self, command, cleaned=None):
|
||||
# mock = MagicMock(
|
||||
# code=1000,
|
||||
# msg="Command completed successfully",
|
||||
# res_data=None,
|
||||
# cl_tr_id="xkw1uo#2023-10-17T15:29:09.559376",
|
||||
# sv_tr_id="5CcH4gxISuGkq8eqvr1UyQ==-35a",
|
||||
# extensions=[],
|
||||
# msg_q=None,
|
||||
# )
|
||||
# return mock
|
||||
|
||||
# def fake_client(mock_client):
|
||||
# pw = "none"
|
||||
# client = Client(
|
||||
# SocketTransport(
|
||||
# "none",
|
||||
# cert_file="path/to/cert_file",
|
||||
# key_file="path/to/key_file",
|
||||
# password=pw,
|
||||
# )
|
||||
# )
|
||||
# return client
|
||||
|
||||
# @patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success)
|
||||
# def test_pool_sends_data(self):
|
||||
# """A .send is invoked on the pool successfully"""
|
||||
# expected_result = {
|
||||
# "cl_tr_id": None,
|
||||
# "code": 1000,
|
||||
# "extensions": [],
|
||||
# "msg": "Command completed successfully",
|
||||
# "msg_q": None,
|
||||
# "res_data": [
|
||||
# info.InfoDomainResultData(
|
||||
# roid="DF1340360-GOV",
|
||||
# statuses=[
|
||||
# common.Status(
|
||||
# state="serverTransferProhibited",
|
||||
# description=None,
|
||||
# lang="en",
|
||||
# ),
|
||||
# common.Status(state="inactive", description=None, lang="en"),
|
||||
# ],
|
||||
# cl_id="gov2023-ote",
|
||||
# cr_id="gov2023-ote",
|
||||
# cr_date=datetime.datetime(2023, 8, 15, 23, 56, 36, tzinfo=tzlocal()),
|
||||
# up_id="gov2023-ote",
|
||||
# up_date=datetime.datetime(2023, 8, 17, 2, 3, 19, tzinfo=tzlocal()),
|
||||
# tr_date=None,
|
||||
# name="test3.gov",
|
||||
# registrant="TuaWnx9hnm84GCSU",
|
||||
# admins=[],
|
||||
# nsset=None,
|
||||
# keyset=None,
|
||||
# ex_date=datetime.date(2024, 8, 15),
|
||||
# auth_info=info.DomainAuthInfo(pw="2fooBAR123fooBaz"),
|
||||
# )
|
||||
# ],
|
||||
# "sv_tr_id": "wRRNVhKhQW2m6wsUHbo/lA==-29a",
|
||||
# }
|
||||
|
||||
# # Mock a response from EPP
|
||||
# def fake_receive(command, cleaned=None):
|
||||
# location = Path(__file__).parent / "utility" / "infoDomain.xml"
|
||||
# xml = (location).read_bytes()
|
||||
# return xml
|
||||
|
||||
# def do_nothing(command):
|
||||
# pass
|
||||
|
||||
# # Mock what happens inside the "with"
|
||||
# with ExitStack() as stack:
|
||||
# stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket))
|
||||
# stack.enter_context(patch.object(Socket, "connect", self.fake_client))
|
||||
# stack.enter_context(patch.object(EPPConnectionPool, "kill_all_connections", do_nothing))
|
||||
# stack.enter_context(patch.object(SocketTransport, "send", self.fake_send))
|
||||
# stack.enter_context(patch.object(SocketTransport, "receive", fake_receive))
|
||||
# with less_console_noise():
|
||||
# # Restart the connection pool
|
||||
# registry.start_connection_pool()
|
||||
# # Pool should be running, and be the right size
|
||||
# self.assertEqual(registry.pool_status.connection_success, True)
|
||||
# self.assertEqual(registry.pool_status.pool_running, True)
|
||||
|
||||
# # Send a command
|
||||
# result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
|
||||
|
||||
# # Should this ever fail, it either means that the schema has changed,
|
||||
# # or the pool is broken.
|
||||
# # If the schema has changed: Update the associated infoDomain.xml file
|
||||
# self.assertEqual(result.__dict__, expected_result)
|
||||
|
||||
# # The number of open pools should match the number of requested ones.
|
||||
# # If it is 0, then they failed to open
|
||||
# self.assertEqual(len(registry._pool.conn), self.pool_options["size"])
|
||||
# # Kill the connection pool
|
||||
# registry.kill_pool()
|
||||
|
||||
# @patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success)
|
||||
# def test_pool_restarts_on_send(self):
|
||||
# """A .send is invoked, but the pool isn't running.
|
||||
# The pool should restart."""
|
||||
# expected_result = {
|
||||
# "cl_tr_id": None,
|
||||
# "code": 1000,
|
||||
# "extensions": [],
|
||||
# "msg": "Command completed successfully",
|
||||
# "msg_q": None,
|
||||
# "res_data": [
|
||||
# info.InfoDomainResultData(
|
||||
# roid="DF1340360-GOV",
|
||||
# statuses=[
|
||||
# common.Status(
|
||||
# state="serverTransferProhibited",
|
||||
# description=None,
|
||||
# lang="en",
|
||||
# ),
|
||||
# common.Status(state="inactive", description=None, lang="en"),
|
||||
# ],
|
||||
# cl_id="gov2023-ote",
|
||||
# cr_id="gov2023-ote",
|
||||
# cr_date=datetime.datetime(2023, 8, 15, 23, 56, 36, tzinfo=tzlocal()),
|
||||
# up_id="gov2023-ote",
|
||||
# up_date=datetime.datetime(2023, 8, 17, 2, 3, 19, tzinfo=tzlocal()),
|
||||
# tr_date=None,
|
||||
# name="test3.gov",
|
||||
# registrant="TuaWnx9hnm84GCSU",
|
||||
# admins=[],
|
||||
# nsset=None,
|
||||
# keyset=None,
|
||||
# ex_date=datetime.date(2024, 8, 15),
|
||||
# auth_info=info.DomainAuthInfo(pw="2fooBAR123fooBaz"),
|
||||
# )
|
||||
# ],
|
||||
# "sv_tr_id": "wRRNVhKhQW2m6wsUHbo/lA==-29a",
|
||||
# }
|
||||
|
||||
# # Mock a response from EPP
|
||||
# def fake_receive(command, cleaned=None):
|
||||
# location = Path(__file__).parent / "utility" / "infoDomain.xml"
|
||||
# xml = (location).read_bytes()
|
||||
# return xml
|
||||
|
||||
# def do_nothing(command):
|
||||
# pass
|
||||
|
||||
# # Mock what happens inside the "with"
|
||||
# with ExitStack() as stack:
|
||||
# stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket))
|
||||
# stack.enter_context(patch.object(Socket, "connect", self.fake_client))
|
||||
# stack.enter_context(patch.object(EPPConnectionPool, "kill_all_connections", do_nothing))
|
||||
# stack.enter_context(patch.object(SocketTransport, "send", self.fake_send))
|
||||
# stack.enter_context(patch.object(SocketTransport, "receive", fake_receive))
|
||||
# with less_console_noise():
|
||||
# # Start the connection pool
|
||||
# registry.start_connection_pool()
|
||||
# # Kill the connection pool
|
||||
# registry.kill_pool()
|
||||
|
||||
# self.assertEqual(registry.pool_status.pool_running, False)
|
||||
|
||||
# # An exception should be raised as end user will be informed
|
||||
# # that they cannot connect to EPP
|
||||
# with self.assertRaises(RegistryError):
|
||||
# expected = "InfoDomain failed to execute due to a connection error."
|
||||
# result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
|
||||
# self.assertEqual(result, expected)
|
||||
|
||||
# # A subsequent command should be successful, as the pool restarts
|
||||
# result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
|
||||
# # Should this ever fail, it either means that the schema has changed,
|
||||
# # or the pool is broken.
|
||||
# # If the schema has changed: Update the associated infoDomain.xml file
|
||||
# self.assertEqual(result.__dict__, expected_result)
|
||||
|
||||
# # The number of open pools should match the number of requested ones.
|
||||
# # If it is 0, then they failed to open
|
||||
# self.assertEqual(len(registry._pool.conn), self.pool_options["size"])
|
||||
# # Kill the connection pool
|
||||
# registry.kill_pool()
|
||||
|
||||
# @patch.object(EPPLibWrapper, "_test_registry_connection_success", patch_success)
|
||||
# def test_raises_connection_error(self):
|
||||
# """A .send is invoked on the pool, but registry connection is lost
|
||||
# right as we send a command."""
|
||||
|
||||
# with ExitStack() as stack:
|
||||
# stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket))
|
||||
# stack.enter_context(patch.object(Socket, "connect", self.fake_client))
|
||||
# with less_console_noise():
|
||||
# # Start the connection pool
|
||||
# registry.start_connection_pool()
|
||||
|
||||
# # Pool should be running
|
||||
# self.assertEqual(registry.pool_status.connection_success, True)
|
||||
# self.assertEqual(registry.pool_status.pool_running, True)
|
||||
|
||||
# # Try to send a command out - should fail
|
||||
# with self.assertRaises(RegistryError):
|
||||
# expected = "InfoDomain failed to execute due to a connection error."
|
||||
# result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True)
|
||||
# self.assertEqual(result, expected)
|
Loading…
Add table
Add a link
Reference in a new issue