Merge pull request #540 from cisagov/sspj/epplibwrapper

Include epplibwrapper module
This commit is contained in:
Seamus Johnston 2023-04-28 21:14:13 +00:00 committed by GitHub
commit 6e7ab85e78
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 340 additions and 48 deletions

View file

@ -2,6 +2,6 @@
max-line-length = 88
max-complexity = 10
extend-ignore = E203
per-file-ignores = __init__.py:F401,F403
per-file-ignores = __init__.py:F401,F403,E402
# migrations are auto-generated and often break rules
exclude=registrar/migrations/*

View file

@ -27,6 +27,18 @@ services:
- DJANGO_DEBUG=True
# Tell Django where it is being hosted
- DJANGO_BASE_URL=http://localhost:8080
# Set a username for accessing the registry
- REGISTRY_CL_ID=nothing
# Set a password for accessing the registry
- REGISTRY_PASSWORD=nothing
# Set a private certifcate for accessing the registry
- REGISTRY_CERT=LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUNMekNDQWRXZ0F3SUJBZ0lVWThXWEljcFVlVUk0TVUrU3NWbkIrOGErOUlnd0NnWUlLb1pJemowRUF3SXcKYlRFTE1Ba0dBMVVFQmhNQ1ZWTXhDekFKQmdOVkJBZ01Ba1JETVJNd0VRWURWUVFIREFwWFlYTm9hVzVuZEc5dQpNUXd3Q2dZRFZRUUtEQU5IVTBFeEREQUtCZ05WQkFzTUF6RTRSakVnTUI0R0ExVUVBd3dYUjA5V0lGQnliM1J2CmRIbHdaU0JTWldkcGMzUnlZWEl3SGhjTk1qTXdOREl4TVRVMU5ETTFXaGNOTWpRd05ESXdNVFUxTkRNMVdqQnQKTVFzd0NRWURWUVFHRXdKVlV6RUxNQWtHQTFVRUNBd0NSRU14RXpBUkJnTlZCQWNNQ2xkaGMyaHBibWQwYjI0eApEREFLQmdOVkJBb01BMGRUUVRFTU1Bb0dBMVVFQ3d3RE1UaEdNU0F3SGdZRFZRUUREQmRIVDFZZ1VISnZkRzkwCmVYQmxJRkpsWjJsemRISmhjakJaTUJNR0J5cUdTTTQ5QWdFR0NDcUdTTTQ5QXdFSEEwSUFCRkN4bGVsN1ZoWHkKb1ZIRWY2N3FKamo5UDk0ZWdqdXNtSWVaNFRLYkxkM3RRRVgzZnFKdVk4WmZzWWN4N0s1K0NEdnJLMnZRdjlMYgpmamhMTjZad3FqK2pVekJSTUIwR0ExVWREZ1FXQkJRUEZCRHdnSlhOUXE4a1V0K1hyYzFFWm9wbW9UQWZCZ05WCkhTTUVHREFXZ0JRUEZCRHdnSlhOUXE4a1V0K1hyYzFFWm9wbW9UQVBCZ05WSFJNQkFmOEVCVEFEQVFIL01Bb0cKQ0NxR1NNNDlCQU1DQTBnQU1FVUNJRVJEaml0VGR0UTB3eVNXb1hEbCtYbUpVUmdENUo0VHVudkFGeDlDSitCUwpBaUVBME42eTJoeGdFWkYxRXJGYW1VQW5EUHlQSFlJeFNJQkwwNW5ibE9IZFVLRT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
# Set a private certifcate's key for accessing the registry
- REGISTRY_KEY=LS0tLS1CRUdJTiBFTkNSWVBURUQgUFJJVkFURSBLRVktLS0tLQpNSUhzTUZjR0NTcUdTSWIzRFFFRkRUQktNQ2tHQ1NxR1NJYjNEUUVGRERBY0JBakJYK1UvdUFkQ3hBSUNDQUF3CkRBWUlLb1pJaHZjTkFna0ZBREFkQmdsZ2hrZ0JaUU1FQVNvRUVHWTNnblRGZ3F0UE5sVU93a2hvSHFrRWdaQlAKMG5FMWpSRXliTHBDNHFtaGczRXdaR2lXZDFWV2RLVEtyNXF3d3hsdjhCbHB1UHhtRGN4dTA1U3VReWhMcU5hWgpVNjRoZlFyYy94cnRnT3Mwc0ZXenlhY0hEaFhiQUdTQjdTTjc2WG55NU9wWDVZVGtRTFMvRTk4YmxFY3NQUWVuCkNqNTJnQzVPZ0JtYzl1cjZlbWY2bjd6TE5vUWovSzk4MEdIWjg5OVZHQ1J3OHhGZGIyb3IyU3dMcDd0V1Ixcz0KLS0tLS1FTkQgRU5DUllQVEVEIFBSSVZBVEUgS0VZLS0tLS0K
# set a passphrase for decrypting the registry key
- REGISTRY_KEY_PASSPHRASE=fake
# Set a URI for accessing the registry
- REGISTRY_HOSTNAME=localhost
# --- These keys are obtained from `.env` file ---
# Set a private JWT signing key for Login.gov
- DJANGO_SECRET_LOGIN_KEY

View file

View file

@ -1,40 +0,0 @@
"""
This file defines a number of mock functions which can be used to simulate
communication with the registry until that integration is implemented.
"""
from datetime import datetime
def domain_check(_):
"""Is domain available for registration?"""
return True
def domain_info(domain):
"""What does the registry know about this domain?"""
return {
"name": domain,
"roid": "EXAMPLE1-REP",
"status": ["ok"],
"registrant": "jd1234",
"contact": {
"admin": "sh8013",
"tech": None,
},
"ns": {
f"ns1.{domain}",
f"ns2.{domain}",
},
"host": [
f"ns1.{domain}",
f"ns2.{domain}",
],
"sponsor": "ClientX",
"creator": "ClientY",
# TODO: think about timezones
"creation_date": datetime.today(),
"updator": "ClientX",
"last_update_date": datetime.today(),
"expiration_date": datetime.today(),
"last_transfer_date": datetime.today(),
}

View file

@ -0,0 +1,52 @@
import logging
from types import SimpleNamespace
try:
from epplib import constants
except ImportError:
# allow epplibwrapper to load without epplib, for testing and development
pass
logger = logging.getLogger(__name__)
NAMESPACE = SimpleNamespace(
EPP="urn:ietf:params:xml:ns:epp-1.0",
XSI="http://www.w3.org/2001/XMLSchema-instance",
FRED="noop",
NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0",
NIC_DOMAIN="urn:ietf:params:xml:ns:domain-1.0",
NIC_ENUMVAL="noop",
NIC_EXTRA_ADDR="noop",
NIC_HOST="urn:ietf:params:xml:ns:host-1.0",
NIC_KEYSET="noop",
NIC_NSSET="noop",
)
SCHEMA_LOCATION = SimpleNamespace(
XSI="urn:ietf:params:xml:ns:epp-1.0 epp-1.0.xsd",
FRED="noop fred-1.5.0.xsd",
NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0 contact-1.0.xsd",
NIC_DOMAIN="urn:ietf:params:xml:ns:domain-1.0 domain-1.0.xsd",
NIC_ENUMVAL="noop enumval-1.2.0.xsd",
NIC_EXTRA_ADDR="noop extra-addr-1.0.0.xsd",
NIC_HOST="urn:ietf:params:xml:ns:host-1.0 host-1.0.xsd",
NIC_KEYSET="noop keyset-1.3.2.xsd",
NIC_NSSET="noop nsset-1.2.2.xsd",
)
try:
constants.NAMESPACE = NAMESPACE
constants.SCHEMA_LOCATION = SCHEMA_LOCATION
except NameError:
pass
# Attn: these imports should NOT be at the top of the file
try:
from .client import CLIENT, commands
except ImportError:
pass
__all__ = [
"CLIENT",
"commands",
]

34
src/epplibwrapper/cert.py Normal file
View file

@ -0,0 +1,34 @@
import os
import tempfile
from django.conf import settings
class Cert:
"""
Location of client certificate as written to disk.
This is needed because the certificate is stored as an environment
variable but Python's ssl library requires a file.
"""
def __init__(self, data=settings.SECRET_REGISTRY_CERT) -> None:
self.filename = self._write(data)
def __del__(self):
"""Remove the files when this object is garbage collected."""
os.unlink(self.filename)
def _write(self, data) -> str:
"""Write data to a secure tempfile. Returns the path."""
_, path = tempfile.mkstemp()
with open(path, "wb") as file:
file.write(data)
return path
class Key(Cert):
"""Location of private key as written to disk."""
def __init__(self) -> None:
super().__init__(data=settings.SECRET_REGISTRY_KEY)

128
src/epplibwrapper/client.py Normal file
View file

@ -0,0 +1,128 @@
"""Provide a wrapper around epplib to handle authentication and errors."""
import logging
from time import sleep
try:
from epplib.client import Client
from epplib import commands
from epplib.exceptions import TransportError, ParsingError
from epplib.transport import SocketTransport
except ImportError:
pass
from django.conf import settings
from .cert import Cert, Key
from .errors import LoginError, RegistryError
from .socket import Socket
logger = logging.getLogger(__name__)
try:
# Write cert and key to disk
CERT = Cert()
KEY = Key()
except Exception:
CERT = None # type: ignore
KEY = None # type: ignore
logger.warning(
"Problem with client certificate. Registrar cannot contact registry.",
exc_info=True,
)
class EPPLibWrapper:
"""
A wrapper over epplib's client.
ATTN: This should not be used directly. Use `Domain` from domain.py.
"""
def __init__(self) -> None:
"""Initialize settings which will be used for all connections."""
# prepare (but do not send) a Login command
self._login = commands.Login(
cl_id=settings.SECRET_REGISTRY_CL_ID,
password=settings.SECRET_REGISTRY_PASSWORD,
obj_uris=[
"urn:ietf:params:xml:ns:domain-1.0",
"urn:ietf:params:xml:ns:contact-1.0",
],
)
# establish a client object with a TCP socket transport
self._client = Client(
SocketTransport(
settings.SECRET_REGISTRY_HOSTNAME,
cert_file=CERT.filename,
key_file=KEY.filename,
password=settings.SECRET_REGISTRY_KEY_PASSPHRASE,
)
)
# prepare a context manager which will connect and login when invoked
# (it will also logout and disconnect when the context manager exits)
self._connect = Socket(self._client, self._login)
def _send(self, command):
"""Helper function used by `send`."""
try:
with self._connect as wire:
response = wire.send(command)
except (ValueError, ParsingError) as err:
logger.warning(
"%s failed to execute due to some syntax error."
% command.__class__.__name__,
exc_info=True,
)
raise RegistryError() from err
except TransportError as err:
logger.warning(
"%s failed to execute due to a connection error."
% command.__class__.__name__,
exc_info=True,
)
raise RegistryError() from err
except LoginError as err:
logger.warning(
"%s failed to execute due to a registry login error."
% command.__class__.__name__,
exc_info=True,
)
raise RegistryError() from err
except Exception as err:
logger.warning(
"%s failed to execute due to an unknown error."
% command.__class__.__name__,
exc_info=True,
)
raise RegistryError() from err
else:
if response.code >= 2000:
raise RegistryError(response.msg)
else:
return response
def send(self, command):
"""Login, send the command, then close the connection. Tries 3 times."""
counter = 0 # we'll try 3 times
while True:
try:
return self._send(command)
except RegistryError as err:
if counter == 3: # don't try again
raise err
else:
counter += 1
sleep((counter * 50) / 1000) # sleep 50 ms to 150 ms
try:
# Initialize epplib
CLIENT = EPPLibWrapper()
logger.debug("registry client initialized")
except Exception:
CLIENT = None # type: ignore
logger.warning(
"Unable to configure epplib. Registrar cannot contact registry.", exc_info=True
)

View file

@ -0,0 +1,6 @@
class RegistryError(Exception):
pass
class LoginError(RegistryError):
pass

View file

@ -0,0 +1,37 @@
import logging
try:
from epplib import commands
except ImportError:
pass
from .errors import LoginError
logger = logging.getLogger(__name__)
class Socket:
"""Context manager which establishes a TCP connection with registry."""
def __init__(self, client, login) -> None:
"""Save the epplib client and login details."""
self.client = client
self.login = login
def __enter__(self):
"""Use epplib to connect."""
self.client.connect()
response = self.client.send(self.login)
if response.code >= 2000:
self.client.close()
raise LoginError(response.msg)
return self.client
def __exit__(self, *args, **kwargs):
"""Close the connection."""
try:
self.client.send(commands.Logout())
self.client.close()
except Exception:
logger.warning("Connection to registry was not cleanly closed.")

View file

@ -7,6 +7,8 @@ strict_optional = True
# implicit_optional: treat arguments a None default value as implicitly Optional?
# `var: int = None` is equal to `var: Optional[int] = None`
implicit_optional = True
# we'd still like mypy to succeed when 3rd party libraries are not installed
ignore_missing_imports = True
[mypy.plugins.django-stubs]
django_settings_module = "registrar.config.settings"

View file

@ -55,11 +55,18 @@ secret_key = secret("DJANGO_SECRET_KEY")
secret_aws_ses_key_id = secret("AWS_ACCESS_KEY_ID", None)
secret_aws_ses_key = secret("AWS_SECRET_ACCESS_KEY", None)
secret_registry_cl_id = secret("REGISTRY_CL_ID")
secret_registry_password = secret("REGISTRY_PASSWORD")
secret_registry_cert = b64decode(secret("REGISTRY_CERT", ""))
secret_registry_key = b64decode(secret("REGISTRY_KEY", ""))
secret_registry_key_passphrase = secret("REGISTRY_KEY_PASSPHRASE", "")
secret_registry_hostname = secret("REGISTRY_HOSTNAME")
# region: Basic Django Config-----------------------------------------------###
# Build paths inside the project like this: BASE_DIR / "subdir".
BASE_DIR = path.resolve().parent.parent
# (settings.py is in `src/registrar/config/`: BASE_DIR is `src/`)
BASE_DIR = path.resolve().parent.parent.parent
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = env_debug
@ -156,16 +163,16 @@ WSGI_APPLICATION = "registrar.config.wsgi.application"
# will place static files for deployment.
# Do not use this directory for permanent storage -
# it is for Django!
STATIC_ROOT = BASE_DIR / "public"
STATIC_ROOT = BASE_DIR / "registrar" / "public"
STATICFILES_DIRS = [
BASE_DIR / "assets",
BASE_DIR / "registrar" / "assets",
]
TEMPLATES = [
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [BASE_DIR / "templates"],
"DIRS": [BASE_DIR / "registrar" / "templates"],
# look for templates inside installed apps
# required by django-debug-toolbar
"APP_DIRS": True,
@ -496,6 +503,17 @@ ROOT_URLCONF = "registrar.config.urls"
# Must be relative and end with "/"
STATIC_URL = "public/"
# endregion
# region: Registry----------------------------------------------------------###
# SECURITY WARNING: keep all registry variables in production secret!
SECRET_REGISTRY_CL_ID = secret_registry_cl_id
SECRET_REGISTRY_PASSWORD = secret_registry_password
SECRET_REGISTRY_CERT = secret_registry_cert
SECRET_REGISTRY_KEY = secret_registry_key
SECRET_REGISTRY_KEY_PASSPHRASE = secret_registry_key_passphrase
SECRET_REGISTRY_HOSTNAME = secret_registry_hostname
# endregion
# region: Security and Privacy----------------------------------------------###

View file

@ -7,7 +7,6 @@ from django.db import models
from django_fsm import FSMField, transition # type: ignore
from api.views import in_domains
from epp.mock_epp import domain_info, domain_check
from registrar.utility import errors
from .utility.time_stamped_model import TimeStampedModel
@ -129,7 +128,7 @@ class Domain(TimeStampedModel):
"""Check if a domain is available.
Not implemented. Returns a dummy value for testing."""
return domain_check(domain)
return False # domain_check(domain)
def transfer(self):
"""Going somewhere. Not implemented."""
@ -146,7 +145,7 @@ class Domain(TimeStampedModel):
if not hasattr(self, "info"):
try:
# get info from registry
self.info = domain_info(self.name)
self.info = {} # domain_info(self.name)
except Exception as e:
logger.error(e)
# TODO: back off error handling

View file

@ -118,6 +118,7 @@ class TestDomain(TestCase):
domain = Domain.objects.create(name="igorville.gov")
self.assertEqual(domain.is_active, False)
@skip("cannot activate a domain without mock registry")
def test_get_status(self):
"""Returns proper status based on `is_active`."""
domain = Domain.objects.create(name="igorville.gov")