Include epplibwrapper module

This commit is contained in:
Seamus Johnston 2023-04-21 09:08:47 -05:00
parent d3cc3853c1
commit ce7cfc1a53
No known key found for this signature in database
GPG key ID: 2F21225985069105
14 changed files with 325 additions and 49 deletions

View file

@ -65,3 +65,38 @@ You also need to upload the `public.crt` key if recently created to the login.go
To access the AWS Simple Email Service, we need credentials from the CISA AWS
account for an IAM user who has limited access to only SES. Those credentials
need to be specified in the environment.
## REGISTRY_CL_ID and REGISTRY_PASSWORD
These are the login credentials for accessing the registry.
## REGISTRY_CERT and REGISTRY_KEY and REGISTRY_KEY_PASSPHRASE
These are the client certificate and its private key used to identify the registrar to the registry during the establishment of a TCP connection.
The private key is protected by a passphrase for safer transport and storage.
These were generated with:
```bash
openssl genpkey -out client.key \
-algorithm EC -pkeyopt ec_paramgen_curve:P-256 \
-aes-256-cbc
openssl req -new -x509 -days 365 \
-key client.key -out client.crt \
-subj "/C=US/ST=DC/L=Washington/O=GSA/OU=18F/CN=GOV Prototype Registrar"
```
Encode them using:
```bash
base64 client.key
base64 client.crt
```
You'll need to give the new certificate to the registry vendor _before_ rotating it in production.
## REGISTRY_HOSTNAME
This is the hostname at which the registry can be found.

View file

@ -2,6 +2,6 @@
max-line-length = 88
max-complexity = 10
extend-ignore = E203
per-file-ignores = __init__.py:F401,F403
per-file-ignores = __init__.py:F401,F403,E402
# migrations are auto-generated and often break rules
exclude=registrar/migrations/*

View file

@ -33,6 +33,16 @@ services:
# AWS credentials
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
# Set a username for accessing the registry
- REGISTRY_CL_ID
# Set a password for accessing the registry
- REGISTRY_PASSWORD
# Set a private certifcate for accessing the registry
- REGISTRY_CERT
# Set a private certifcate's key for accessing the registry
- REGISTRY_KEY
# Set a URI for accessing the registry
- REGISTRY_HOSTNAME
stdin_open: true
tty: true
ports:

View file

View file

@ -1,40 +0,0 @@
"""
This file defines a number of mock functions which can be used to simulate
communication with the registry until that integration is implemented.
"""
from datetime import datetime
def domain_check(_):
"""Is domain available for registration?"""
return True
def domain_info(domain):
"""What does the registry know about this domain?"""
return {
"name": domain,
"roid": "EXAMPLE1-REP",
"status": ["ok"],
"registrant": "jd1234",
"contact": {
"admin": "sh8013",
"tech": None,
},
"ns": {
f"ns1.{domain}",
f"ns2.{domain}",
},
"host": [
f"ns1.{domain}",
f"ns2.{domain}",
],
"sponsor": "ClientX",
"creator": "ClientY",
# TODO: think about timezones
"creation_date": datetime.today(),
"updator": "ClientX",
"last_update_date": datetime.today(),
"expiration_date": datetime.today(),
"last_transfer_date": datetime.today(),
}

View file

@ -0,0 +1,48 @@
import logging
from types import SimpleNamespace
try:
from epplib import constants
except ImportError:
pass
logger = logging.getLogger(__name__)
NAMESPACE = SimpleNamespace(
EPP="urn:ietf:params:xml:ns:epp-1.0",
XSI="http://www.w3.org/2001/XMLSchema-instance",
FRED="noop",
NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0",
NIC_DOMAIN="urn:ietf:params:xml:ns:domain-1.0",
NIC_ENUMVAL="noop",
NIC_EXTRA_ADDR="noop",
NIC_HOST="urn:ietf:params:xml:ns:host-1.0",
NIC_KEYSET="noop",
NIC_NSSET="noop",
)
SCHEMA_LOCATION = SimpleNamespace(
XSI="urn:ietf:params:xml:ns:epp-1.0 epp-1.0.xsd",
FRED="noop fred-1.5.0.xsd",
NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0 contact-1.0.xsd",
NIC_DOMAIN="urn:ietf:params:xml:ns:domain-1.0 domain-1.0.xsd",
NIC_ENUMVAL="noop enumval-1.2.0.xsd",
NIC_EXTRA_ADDR="noop extra-addr-1.0.0.xsd",
NIC_HOST="urn:ietf:params:xml:ns:host-1.0 host-1.0.xsd",
NIC_KEYSET="noop keyset-1.3.2.xsd",
NIC_NSSET="noop nsset-1.2.2.xsd",
)
try:
constants.NAMESPACE = NAMESPACE
constants.SCHEMA_LOCATION = SCHEMA_LOCATION
except NameError:
pass
# Attn: these imports should NOT be at the top of the file
from .client import CLIENT, commands
__all__ = [
"CLIENT",
"commands",
]

34
src/epplibwrapper/cert.py Normal file
View file

@ -0,0 +1,34 @@
import os
import tempfile
from django.conf import settings
class Cert:
"""
Location of client certificate as written to disk.
This is needed because the certificate is stored as an environment
variable but Python's ssl library requires a file.
"""
def __init__(self, data=settings.SECRET_REGISTRY_CERT) -> None:
self.filename = self._write(data)
def __del__(self):
"""Remove the files when this object is garbage collected."""
os.unlink(self.filename)
def _write(self, data) -> str:
"""Write data to a secure tempfile. Returns the path."""
_, path = tempfile.mkstemp()
with open(path, "wb") as file:
file.write(data)
return path
class Key(Cert):
"""Location of private key as written to disk."""
def __init__(self) -> None:
super().__init__(data=settings.SECRET_REGISTRY_KEY)

125
src/epplibwrapper/client.py Normal file
View file

@ -0,0 +1,125 @@
import logging
from time import sleep
try:
from epplib.client import Client
from epplib import commands
from epplib.exceptions import TransportError, ParsingError
from epplib.transport import SocketTransport
except ImportError:
pass
from django.conf import settings
from .cert import Cert, Key
from .errors import LoginError, RegistryError
from .socket import Socket
logger = logging.getLogger(__name__)
try:
# Write cert and key to disk
CERT = Cert()
KEY = Key()
except Exception as err:
CERT = None # type: ignore
KEY = None # type: ignore
logger.warning(err)
logger.warning(
"Problem with client certificate. Registrar cannot contact registry."
)
class EPPLibWrapper:
"""
A wrapper over epplib's client.
ATTN: This should not be used directly. Use `Domain` from domain.py.
"""
def __init__(self) -> None:
"""Initialize settings which will be used for all connections."""
# prepare (but do not send) a Login command
self._login = commands.Login(
cl_id=settings.SECRET_REGISTRY_CL_ID,
password=settings.SECRET_REGISTRY_PASSWORD,
obj_uris=[
"urn:ietf:params:xml:ns:domain-1.0",
"urn:ietf:params:xml:ns:contact-1.0",
],
)
# establish a client object with a TCP socket transport
self._client = Client(
SocketTransport(
settings.SECRET_REGISTRY_HOSTNAME,
cert_file=CERT.filename,
key_file=KEY.filename,
password=settings.SECRET_REGISTRY_KEY_PASSPHRASE,
)
)
# prepare a context manager which will connect and login when invoked
# (it will also logout and disconnect when the context manager exits)
self._connect = Socket(self._client, self._login)
def _send(self, command):
"""Helper function used by `send`."""
try:
with self._connect as wire:
response = wire.send(command)
except (ValueError, ParsingError) as err:
logger.debug(err)
logger.warning(
"%s failed to execute due to some syntax error."
% command.__class__.__name__
)
raise RegistryError() from err
except TransportError as err:
logger.debug(err)
logger.warning(
"%s failed to execute due to a connection error."
% command.__class__.__name__
)
raise RegistryError() from err
except LoginError as err:
logger.debug(err)
logger.warning(
"%s failed to execute due to a registry login error."
% command.__class__.__name__
)
raise RegistryError() from err
except Exception as err:
logger.debug(err)
logger.warning(
"%s failed to execute due to an unknown error."
% command.__class__.__name__
)
raise RegistryError() from err
else:
if response.code >= 2000:
raise RegistryError(response.msg)
else:
return response
def send(self, command):
"""Login, send the command, then close the connection. Tries 3 times."""
counter = 0 # we'll try 3 times
while True:
try:
return self._send(command)
except RegistryError as err:
if counter == 3: # don't try again
raise err
else:
counter += 1
sleep((counter * 50) / 1000) # sleep 50 ms to 150 ms
try:
# Initialize epplib
CLIENT = EPPLibWrapper()
logger.debug("registry client initialized")
except Exception as err:
CLIENT = None # type: ignore
logger.warning(err)
logger.warning("Unable to configure epplib. Registrar cannot contact registry.")

View file

@ -0,0 +1,6 @@
class RegistryError(Exception):
pass
class LoginError(RegistryError):
pass

View file

@ -0,0 +1,37 @@
import logging
try:
from epplib import commands
except ImportError:
pass
from .errors import LoginError
logger = logging.getLogger(__name__)
class Socket:
"""Context manager which establishes a TCP connection with registry."""
def __init__(self, client, login) -> None:
"""Save the epplib client and login details."""
self.client = client
self.login = login
def __enter__(self):
"""Use epplib to connect."""
self.client.connect()
response = self.client.send(self.login)
if response.code >= 2000:
self.client.close()
raise LoginError(response.msg)
return self.client
def __exit__(self, *args, **kwargs):
"""Close the connection."""
try:
self.client.send(commands.Logout())
self.client.close()
except Exception:
logger.warning("Connection to registry was not cleanly closed.")

View file

@ -55,11 +55,18 @@ secret_key = secret("DJANGO_SECRET_KEY")
secret_aws_ses_key_id = secret("AWS_ACCESS_KEY_ID", None)
secret_aws_ses_key = secret("AWS_SECRET_ACCESS_KEY", None)
secret_registry_cl_id = secret("REGISTRY_CL_ID")
secret_registry_password = secret("REGISTRY_PASSWORD")
secret_registry_cert = b64decode(secret("REGISTRY_CERT", ""))
secret_registry_key = b64decode(secret("REGISTRY_KEY", ""))
secret_registry_key_passphrase = secret("REGISTRY_KEY_PASSPHRASE", "")
secret_registry_hostname = secret("REGISTRY_HOSTNAME")
# region: Basic Django Config-----------------------------------------------###
# Build paths inside the project like this: BASE_DIR / "subdir".
BASE_DIR = path.resolve().parent.parent
# (settings.py is in `src/registrar/config/`: BASE_DIR is `src/`)
BASE_DIR = path.resolve().parent.parent.parent
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = env_debug
@ -156,16 +163,17 @@ WSGI_APPLICATION = "registrar.config.wsgi.application"
# will place static files for deployment.
# Do not use this directory for permanent storage -
# it is for Django!
STATIC_ROOT = BASE_DIR / "public"
STATIC_ROOT = BASE_DIR / "registrar" / "public"
STATICFILES_DIRS = [
BASE_DIR / "assets",
BASE_DIR / "registrar" / "assets",
BASE_DIR / "epplibwrapper" / "assets",
]
TEMPLATES = [
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [BASE_DIR / "templates"],
"DIRS": [BASE_DIR / "registrar" / "templates"],
# look for templates inside installed apps
# required by django-debug-toolbar
"APP_DIRS": True,
@ -496,6 +504,17 @@ ROOT_URLCONF = "registrar.config.urls"
# Must be relative and end with "/"
STATIC_URL = "public/"
# endregion
# region: Registry----------------------------------------------------------###
# SECURITY WARNING: keep all registry variables in production secret!
SECRET_REGISTRY_CL_ID = secret_registry_cl_id
SECRET_REGISTRY_PASSWORD = secret_registry_password
SECRET_REGISTRY_CERT = secret_registry_cert
SECRET_REGISTRY_KEY = secret_registry_key
SECRET_REGISTRY_KEY_PASSPHRASE = secret_registry_key_passphrase
SECRET_REGISTRY_HOSTNAME = secret_registry_hostname
# endregion
# region: Security and Privacy----------------------------------------------###

View file

@ -7,7 +7,8 @@ from django.db import models
from django_fsm import FSMField, transition # type: ignore
from api.views import in_domains
from epp.mock_epp import domain_info, domain_check
# from epplibwrapper import CLIENT as registry, commands
from registrar.utility import errors
from .utility.time_stamped_model import TimeStampedModel
@ -129,7 +130,7 @@ class Domain(TimeStampedModel):
"""Check if a domain is available.
Not implemented. Returns a dummy value for testing."""
return domain_check(domain)
return False # domain_check(domain)
def transfer(self):
"""Going somewhere. Not implemented."""
@ -146,7 +147,7 @@ class Domain(TimeStampedModel):
if not hasattr(self, "info"):
try:
# get info from registry
self.info = domain_info(self.name)
self.info = {} # domain_info(self.name)
except Exception as e:
logger.error(e)
# TODO: back off error handling

View file

@ -118,6 +118,7 @@ class TestDomain(TestCase):
domain = Domain.objects.create(name="igorville.gov")
self.assertEqual(domain.is_active, False)
@skip("cannot activate a domain without mock registry")
def test_get_status(self):
"""Returns proper status based on `is_active`."""
domain = Domain.objects.create(name="igorville.gov")

View file

@ -6,4 +6,4 @@ set -o pipefail
# Make sure that django's `collectstatic` has been run locally before pushing up to any environment,
# so that the styles and static assets to show up correctly on any environment.
gunicorn registrar.config.wsgi -t 60
gunicorn registrar.config.wsgi -t 60 --preload