Merge pull request #1462 from cisagov/dk/1433-oidc-mult-levels

Issues 1433, 1435: Multi-level identity-verification based on users
This commit is contained in:
rachidatecs 2023-12-11 21:06:31 -05:00 committed by GitHub
commit 796bda8493
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 195 additions and 18 deletions

View file

@ -100,7 +100,9 @@ class Client(oic.Client):
"state": session["state"],
"nonce": session["nonce"],
"redirect_uri": self.registration_response["redirect_uris"][0],
"acr_values": self.behaviour.get("acr_value"),
# acr_value may be passed in session if overriding, as in the case
# of step up auth, otherwise get from settings.py
"acr_values": session.get("acr_value") or self.behaviour.get("acr_value"),
}
if extra_args is not None:
@ -162,7 +164,6 @@ class Client(oic.Client):
logger.error(err)
logger.error("Unable to parse response for %s" % state)
raise o_e.AuthenticationFailed(locator=state)
# ErrorResponse is not raised, it is passed back...
if isinstance(authn_response, ErrorResponse):
error = authn_response.get("error", "")
@ -207,7 +208,6 @@ class Client(oic.Client):
logger.error(err)
logger.error("Unable to request user info for %s" % state)
raise o_e.AuthenticationFailed(locator=state)
# ErrorResponse is not raised, it is passed back...
if isinstance(info_response, ErrorResponse):
logger.error("Unable to get user info (%s) for %s" % (info_response.get("error", ""), state))
@ -272,6 +272,11 @@ class Client(oic.Client):
super(Client, self).store_response(resp, info)
def get_step_up_acr_value(self):
"""returns the step_up_acr_value from settings
this helper function is called from djangooidc views"""
return self.behaviour.get("step_up_acr_value")
def __repr__(self):
return "Client {} {} {}".format(
self.client_id,

View file

@ -0,0 +1,30 @@
import logging
from django.test import TestCase
from django.conf import settings
from djangooidc.oidc import Client
logger = logging.getLogger(__name__)
class OidcTest(TestCase):
def test_oidc_create_authn_request_with_acr_value(self):
"""Test that create_authn_request returns a redirect with an acr_value
when an acr_value is passed through session.
This test is only valid locally. On local, client can be initialized.
Client initialization does not work in pipeline, so test is useless in
pipeline. However, it will not fail in pipeline."""
try:
# Initialize provider using pyOICD
OP = getattr(settings, "OIDC_ACTIVE_PROVIDER")
CLIENT = Client(OP)
session = {"acr_value": "some_acr_value_maybe_ial2"}
response = CLIENT.create_authn_request(session)
self.assertEqual(response.status_code, 302)
self.assertIn("some_acr_value_maybe_ial2", response.url)
except Exception as err:
logger.warning(err)
logger.warning("Unable to configure OpenID Connect provider in pipeline. Cannot execute this test.")

View file

@ -1,8 +1,9 @@
from unittest.mock import patch
from unittest.mock import MagicMock, patch
from django.http import HttpResponse
from django.test import Client, TestCase
from django.test import Client, TestCase, RequestFactory
from django.urls import reverse
from ..views import login_callback
from .common import less_console_noise
@ -11,6 +12,7 @@ from .common import less_console_noise
class ViewsTest(TestCase):
def setUp(self):
self.client = Client()
self.factory = RequestFactory()
def say_hi(*args):
return HttpResponse("Hi")
@ -59,19 +61,83 @@ class ViewsTest(TestCase):
# mock
mock_client.callback.side_effect = self.user_info
# test
with less_console_noise():
with patch("djangooidc.views.requires_step_up_auth", return_value=False), less_console_noise():
response = self.client.get(reverse("openid_login_callback"))
# assert
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse("logout"))
def test_login_callback_no_step_up_auth(self, mock_client):
"""Walk through login_callback when requires_step_up_auth returns False
and assert that we have a redirect to /"""
# setup
session = self.client.session
session.save()
# mock
mock_client.callback.side_effect = self.user_info
# test
with patch("djangooidc.views.requires_step_up_auth", return_value=False), less_console_noise():
response = self.client.get(reverse("openid_login_callback"))
# assert
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, "/")
def test_requires_step_up_auth(self, mock_client):
"""Invoke login_callback passing it a request when requires_step_up_auth returns True
and assert that session is updated and create_authn_request (mock) is called."""
# Configure the mock to return an expected value for get_step_up_acr_value
mock_client.return_value.get_step_up_acr_value.return_value = "step_up_acr_value"
# Create a mock request
request = self.factory.get("/some-url")
request.session = {"acr_value": ""}
# Ensure that the CLIENT instance used in login_callback is the mock
# patch requires_step_up_auth to return True
with patch("djangooidc.views.requires_step_up_auth", return_value=True), patch(
"djangooidc.views.CLIENT.create_authn_request", return_value=MagicMock()
) as mock_create_authn_request:
login_callback(request)
# create_authn_request only gets called when requires_step_up_auth is True
# and it changes this acr_value in request.session
# Assert that acr_value is no longer empty string
self.assertNotEqual(request.session["acr_value"], "")
# And create_authn_request was called again
mock_create_authn_request.assert_called_once()
def test_does_not_requires_step_up_auth(self, mock_client):
"""Invoke login_callback passing it a request when requires_step_up_auth returns False
and assert that session is not updated and create_authn_request (mock) is not called.
Possibly redundant with test_login_callback_requires_step_up_auth"""
# Create a mock request
request = self.factory.get("/some-url")
request.session = {"acr_value": ""}
# Ensure that the CLIENT instance used in login_callback is the mock
# patch requires_step_up_auth to return False
with patch("djangooidc.views.requires_step_up_auth", return_value=False), patch(
"djangooidc.views.CLIENT.create_authn_request", return_value=MagicMock()
) as mock_create_authn_request:
login_callback(request)
# create_authn_request only gets called when requires_step_up_auth is True
# and it changes this acr_value in request.session
# Assert that acr_value is NOT updated by testing that it is still an empty string
self.assertEqual(request.session["acr_value"], "")
# Assert create_authn_request was not called
mock_create_authn_request.assert_not_called()
@patch("djangooidc.views.authenticate")
def test_login_callback_raises(self, mock_auth, mock_client):
# mock
mock_client.callback.side_effect = self.user_info
mock_auth.return_value = None
# test
with less_console_noise():
with patch("djangooidc.views.requires_step_up_auth", return_value=False), less_console_noise():
response = self.client.get(reverse("openid_login_callback"))
# assert
self.assertEqual(response.status_code, 401)

View file

@ -11,7 +11,7 @@ from urllib.parse import parse_qs, urlencode
from djangooidc.oidc import Client
from djangooidc import exceptions as o_e
from registrar.models import User
logger = logging.getLogger(__name__)
@ -68,6 +68,12 @@ def login_callback(request):
try:
query = parse_qs(request.GET.urlencode())
userinfo = CLIENT.callback(query, request.session)
# test for need for identity verification and if it is satisfied
# if not satisfied, redirect user to login with stepped up acr_value
if requires_step_up_auth(userinfo):
# add acr_value to request.session
request.session["acr_value"] = CLIENT.get_step_up_acr_value()
return CLIENT.create_authn_request(request.session)
user = authenticate(request=request, **userinfo)
if user:
login(request, user)
@ -79,10 +85,27 @@ def login_callback(request):
return error_page(request, err)
def requires_step_up_auth(userinfo):
"""if User.needs_identity_verification and step_up_acr_value not in
ial returned from callback, return True"""
step_up_acr_value = CLIENT.get_step_up_acr_value()
acr_value = userinfo.get("ial", "")
uuid = userinfo.get("sub", "")
email = userinfo.get("email", "")
if acr_value != step_up_acr_value:
# The acr of this attempt is not at the highest level
# so check if the user needs the higher level
return User.needs_identity_verification(email, uuid)
else:
# This attempt already came back at the highest level
# so does not require step up
return False
def logout(request, next_page=None):
"""Redirect the user to the authentication provider (OP) logout page."""
try:
username = request.user.username
user = request.user
request_args = {
"client_id": CLIENT.client_id,
"state": request.session["state"],
@ -94,7 +117,6 @@ def logout(request, next_page=None):
request_args.update(
{"post_logout_redirect_uri": CLIENT.registration_response["post_logout_redirect_uris"][0]}
)
url = CLIENT.provider_info["end_session_endpoint"]
url += "?" + urlencode(request_args)
return HttpResponseRedirect(url)
@ -104,7 +126,7 @@ def logout(request, next_page=None):
# Always remove Django session stuff - even if not logged out from OP.
# Don't wait for the callback as it may never come.
auth_logout(request)
logger.info("Successfully logged out user %s" % username)
logger.info("Successfully logged out user %s" % user)
next_page = getattr(settings, "LOGOUT_REDIRECT_URL", None)
if next_page:
request.session["next"] = next_page

View file

@ -541,6 +541,7 @@ OIDC_PROVIDERS = {
"scope": ["email", "profile:name", "phone"],
"user_info_request": ["email", "first_name", "last_name", "phone"],
"acr_value": "http://idmanagement.gov/ns/assurance/ial/1",
"step_up_acr_value": "http://idmanagement.gov/ns/assurance/ial/2",
},
"client_registration": {
"client_id": "cisa_dotgov_registrar",
@ -558,6 +559,7 @@ OIDC_PROVIDERS = {
"scope": ["email", "profile:name", "phone"],
"user_info_request": ["email", "first_name", "last_name", "phone"],
"acr_value": "http://idmanagement.gov/ns/assurance/ial/1",
"step_up_acr_value": "http://idmanagement.gov/ns/assurance/ial/2",
},
"client_registration": {
"client_id": ("urn:gov:cisa:openidconnect.profiles:sp:sso:cisa:dotgov_registrar"),

View file

@ -3,6 +3,8 @@ import logging
from django.contrib.auth.models import AbstractUser
from django.db import models
from registrar.models.user_domain_role import UserDomainRole
from .domain_invitation import DomainInvitation
from .transition_domain import TransitionDomain
from .domain import Domain
@ -64,6 +66,36 @@ class User(AbstractUser):
def is_restricted(self):
return self.status == self.RESTRICTED
@classmethod
def needs_identity_verification(cls, email, uuid):
"""A method used by our oidc classes to test whether a user needs email/uuid verification
or the full identity PII verification"""
# An existing user who is a domain manager of a domain (that is,
# they have an entry in UserDomainRole for their User)
try:
existing_user = cls.objects.get(username=uuid)
if existing_user and UserDomainRole.objects.filter(user=existing_user).exists():
return False
except cls.DoesNotExist:
# Do nothing when the user is not found, as we're checking for existence.
pass
except Exception as err:
raise err
# A new incoming user who is a domain manager for one of the domains
# that we inputted from Verisign (that is, their email address appears
# in the username field of a TransitionDomain)
if TransitionDomain.objects.filter(username=email).exists():
return False
# A new incoming user who is being invited to be a domain manager (that is,
# their email address is in DomainInvitation for an invitation that is not yet "retrieved").
if DomainInvitation.objects.filter(email=email, status=DomainInvitation.INVITED).exists():
return False
return True
def check_domain_invitations_on_login(self):
"""When a user first arrives on the site, we need to retrieve any domain
invitations that match their email address."""

View file

@ -606,19 +606,15 @@ class TestInvitations(TestCase):
class TestUser(TestCase):
"""For now, just test actions that
occur on user login."""
"""Test actions that occur on user login,
test class method that controls how users get validated."""
def setUp(self):
self.email = "mayor@igorville.gov"
self.domain_name = "igorvilleInTransition.gov"
self.domain, _ = Domain.objects.get_or_create(name="igorville.gov")
self.user, _ = User.objects.get_or_create(email=self.email)
# clean out the roles each time
UserDomainRole.objects.all().delete()
TransitionDomain.objects.get_or_create(username="mayor@igorville.gov", domain_name=self.domain_name)
def tearDown(self):
super().tearDown()
Domain.objects.all().delete()
@ -626,6 +622,7 @@ class TestUser(TestCase):
DomainInformation.objects.all().delete()
TransitionDomain.objects.all().delete()
User.objects.all().delete()
UserDomainRole.objects.all().delete()
def test_check_transition_domains_without_domains_on_login(self):
"""A user's on_each_login callback does not check transition domains.
@ -634,3 +631,26 @@ class TestUser(TestCase):
are created."""
self.user.on_each_login()
self.assertFalse(Domain.objects.filter(name=self.domain_name).exists())
def test_identity_verification_with_domain_manager(self):
"""A domain manager should return False when tested with class
method needs_identity_verification"""
UserDomainRole.objects.get_or_create(user=self.user, domain=self.domain, role=UserDomainRole.Roles.MANAGER)
self.assertFalse(User.needs_identity_verification(self.user.email, self.user.username))
def test_identity_verification_with_transition_user(self):
"""A user from the Verisign transition should return False
when tested with class method needs_identity_verification"""
TransitionDomain.objects.get_or_create(username=self.user.email, domain_name=self.domain_name)
self.assertFalse(User.needs_identity_verification(self.user.email, self.user.username))
def test_identity_verification_with_invited_user(self):
"""An invited user should return False when tested with class
method needs_identity_verification"""
DomainInvitation.objects.get_or_create(email=self.user.email, domain=self.domain)
self.assertFalse(User.needs_identity_verification(self.user.email, self.user.username))
def test_identity_verification_with_new_user(self):
"""A new user who's neither transitioned nor invited should
return True when tested with class method needs_identity_verification"""
self.assertTrue(User.needs_identity_verification(self.user.email, self.user.username))