diff --git a/src/djangooidc/oidc.py b/src/djangooidc/oidc.py index b2b1acd8e..91bfddc66 100644 --- a/src/djangooidc/oidc.py +++ b/src/djangooidc/oidc.py @@ -100,7 +100,9 @@ class Client(oic.Client): "state": session["state"], "nonce": session["nonce"], "redirect_uri": self.registration_response["redirect_uris"][0], - "acr_values": self.behaviour.get("acr_value"), + # acr_value may be passed in session if overriding, as in the case + # of step up auth, otherwise get from settings.py + "acr_values": session.get("acr_value") or self.behaviour.get("acr_value"), } if extra_args is not None: @@ -162,7 +164,6 @@ class Client(oic.Client): logger.error(err) logger.error("Unable to parse response for %s" % state) raise o_e.AuthenticationFailed(locator=state) - # ErrorResponse is not raised, it is passed back... if isinstance(authn_response, ErrorResponse): error = authn_response.get("error", "") @@ -207,7 +208,6 @@ class Client(oic.Client): logger.error(err) logger.error("Unable to request user info for %s" % state) raise o_e.AuthenticationFailed(locator=state) - # ErrorResponse is not raised, it is passed back... if isinstance(info_response, ErrorResponse): logger.error("Unable to get user info (%s) for %s" % (info_response.get("error", ""), state)) @@ -272,6 +272,11 @@ class Client(oic.Client): super(Client, self).store_response(resp, info) + def get_step_up_acr_value(self): + """returns the step_up_acr_value from settings + this helper function is called from djangooidc views""" + return self.behaviour.get("step_up_acr_value") + def __repr__(self): return "Client {} {} {}".format( self.client_id, diff --git a/src/djangooidc/tests/test_oidc.py b/src/djangooidc/tests/test_oidc.py new file mode 100644 index 000000000..21249aa90 --- /dev/null +++ b/src/djangooidc/tests/test_oidc.py @@ -0,0 +1,30 @@ +import logging + +from django.test import TestCase + +from django.conf import settings + +from djangooidc.oidc import Client + +logger = logging.getLogger(__name__) + + +class OidcTest(TestCase): + def test_oidc_create_authn_request_with_acr_value(self): + """Test that create_authn_request returns a redirect with an acr_value + when an acr_value is passed through session. + + This test is only valid locally. On local, client can be initialized. + Client initialization does not work in pipeline, so test is useless in + pipeline. However, it will not fail in pipeline.""" + try: + # Initialize provider using pyOICD + OP = getattr(settings, "OIDC_ACTIVE_PROVIDER") + CLIENT = Client(OP) + session = {"acr_value": "some_acr_value_maybe_ial2"} + response = CLIENT.create_authn_request(session) + self.assertEqual(response.status_code, 302) + self.assertIn("some_acr_value_maybe_ial2", response.url) + except Exception as err: + logger.warning(err) + logger.warning("Unable to configure OpenID Connect provider in pipeline. Cannot execute this test.") diff --git a/src/djangooidc/tests/test_views.py b/src/djangooidc/tests/test_views.py index 5ff36a77c..da12f4fd5 100644 --- a/src/djangooidc/tests/test_views.py +++ b/src/djangooidc/tests/test_views.py @@ -1,8 +1,9 @@ -from unittest.mock import patch +from unittest.mock import MagicMock, patch from django.http import HttpResponse -from django.test import Client, TestCase +from django.test import Client, TestCase, RequestFactory from django.urls import reverse +from ..views import login_callback from .common import less_console_noise @@ -11,6 +12,7 @@ from .common import less_console_noise class ViewsTest(TestCase): def setUp(self): self.client = Client() + self.factory = RequestFactory() def say_hi(*args): return HttpResponse("Hi") @@ -59,19 +61,83 @@ class ViewsTest(TestCase): # mock mock_client.callback.side_effect = self.user_info # test - with less_console_noise(): + with patch("djangooidc.views.requires_step_up_auth", return_value=False), less_console_noise(): response = self.client.get(reverse("openid_login_callback")) # assert self.assertEqual(response.status_code, 302) self.assertEqual(response.url, reverse("logout")) + def test_login_callback_no_step_up_auth(self, mock_client): + """Walk through login_callback when requires_step_up_auth returns False + and assert that we have a redirect to /""" + # setup + session = self.client.session + session.save() + # mock + mock_client.callback.side_effect = self.user_info + # test + with patch("djangooidc.views.requires_step_up_auth", return_value=False), less_console_noise(): + response = self.client.get(reverse("openid_login_callback")) + # assert + self.assertEqual(response.status_code, 302) + self.assertEqual(response.url, "/") + + def test_requires_step_up_auth(self, mock_client): + """Invoke login_callback passing it a request when requires_step_up_auth returns True + and assert that session is updated and create_authn_request (mock) is called.""" + # Configure the mock to return an expected value for get_step_up_acr_value + mock_client.return_value.get_step_up_acr_value.return_value = "step_up_acr_value" + + # Create a mock request + request = self.factory.get("/some-url") + request.session = {"acr_value": ""} + + # Ensure that the CLIENT instance used in login_callback is the mock + # patch requires_step_up_auth to return True + with patch("djangooidc.views.requires_step_up_auth", return_value=True), patch( + "djangooidc.views.CLIENT.create_authn_request", return_value=MagicMock() + ) as mock_create_authn_request: + login_callback(request) + + # create_authn_request only gets called when requires_step_up_auth is True + # and it changes this acr_value in request.session + + # Assert that acr_value is no longer empty string + self.assertNotEqual(request.session["acr_value"], "") + # And create_authn_request was called again + mock_create_authn_request.assert_called_once() + + def test_does_not_requires_step_up_auth(self, mock_client): + """Invoke login_callback passing it a request when requires_step_up_auth returns False + and assert that session is not updated and create_authn_request (mock) is not called. + + Possibly redundant with test_login_callback_requires_step_up_auth""" + # Create a mock request + request = self.factory.get("/some-url") + request.session = {"acr_value": ""} + + # Ensure that the CLIENT instance used in login_callback is the mock + # patch requires_step_up_auth to return False + with patch("djangooidc.views.requires_step_up_auth", return_value=False), patch( + "djangooidc.views.CLIENT.create_authn_request", return_value=MagicMock() + ) as mock_create_authn_request: + login_callback(request) + + # create_authn_request only gets called when requires_step_up_auth is True + # and it changes this acr_value in request.session + + # Assert that acr_value is NOT updated by testing that it is still an empty string + self.assertEqual(request.session["acr_value"], "") + # Assert create_authn_request was not called + mock_create_authn_request.assert_not_called() + @patch("djangooidc.views.authenticate") def test_login_callback_raises(self, mock_auth, mock_client): # mock mock_client.callback.side_effect = self.user_info mock_auth.return_value = None # test - with less_console_noise(): + with patch("djangooidc.views.requires_step_up_auth", return_value=False), less_console_noise(): response = self.client.get(reverse("openid_login_callback")) # assert self.assertEqual(response.status_code, 401) diff --git a/src/djangooidc/views.py b/src/djangooidc/views.py index ea893daf2..b5905df48 100644 --- a/src/djangooidc/views.py +++ b/src/djangooidc/views.py @@ -11,7 +11,7 @@ from urllib.parse import parse_qs, urlencode from djangooidc.oidc import Client from djangooidc import exceptions as o_e - +from registrar.models import User logger = logging.getLogger(__name__) @@ -68,6 +68,12 @@ def login_callback(request): try: query = parse_qs(request.GET.urlencode()) userinfo = CLIENT.callback(query, request.session) + # test for need for identity verification and if it is satisfied + # if not satisfied, redirect user to login with stepped up acr_value + if requires_step_up_auth(userinfo): + # add acr_value to request.session + request.session["acr_value"] = CLIENT.get_step_up_acr_value() + return CLIENT.create_authn_request(request.session) user = authenticate(request=request, **userinfo) if user: login(request, user) @@ -79,10 +85,27 @@ def login_callback(request): return error_page(request, err) +def requires_step_up_auth(userinfo): + """if User.needs_identity_verification and step_up_acr_value not in + ial returned from callback, return True""" + step_up_acr_value = CLIENT.get_step_up_acr_value() + acr_value = userinfo.get("ial", "") + uuid = userinfo.get("sub", "") + email = userinfo.get("email", "") + if acr_value != step_up_acr_value: + # The acr of this attempt is not at the highest level + # so check if the user needs the higher level + return User.needs_identity_verification(email, uuid) + else: + # This attempt already came back at the highest level + # so does not require step up + return False + + def logout(request, next_page=None): """Redirect the user to the authentication provider (OP) logout page.""" try: - username = request.user.username + user = request.user request_args = { "client_id": CLIENT.client_id, "state": request.session["state"], @@ -94,7 +117,6 @@ def logout(request, next_page=None): request_args.update( {"post_logout_redirect_uri": CLIENT.registration_response["post_logout_redirect_uris"][0]} ) - url = CLIENT.provider_info["end_session_endpoint"] url += "?" + urlencode(request_args) return HttpResponseRedirect(url) @@ -104,7 +126,7 @@ def logout(request, next_page=None): # Always remove Django session stuff - even if not logged out from OP. # Don't wait for the callback as it may never come. auth_logout(request) - logger.info("Successfully logged out user %s" % username) + logger.info("Successfully logged out user %s" % user) next_page = getattr(settings, "LOGOUT_REDIRECT_URL", None) if next_page: request.session["next"] = next_page diff --git a/src/registrar/config/settings.py b/src/registrar/config/settings.py index 9bdcd4e98..c99daf72b 100644 --- a/src/registrar/config/settings.py +++ b/src/registrar/config/settings.py @@ -541,6 +541,7 @@ OIDC_PROVIDERS = { "scope": ["email", "profile:name", "phone"], "user_info_request": ["email", "first_name", "last_name", "phone"], "acr_value": "http://idmanagement.gov/ns/assurance/ial/1", + "step_up_acr_value": "http://idmanagement.gov/ns/assurance/ial/2", }, "client_registration": { "client_id": "cisa_dotgov_registrar", @@ -558,6 +559,7 @@ OIDC_PROVIDERS = { "scope": ["email", "profile:name", "phone"], "user_info_request": ["email", "first_name", "last_name", "phone"], "acr_value": "http://idmanagement.gov/ns/assurance/ial/1", + "step_up_acr_value": "http://idmanagement.gov/ns/assurance/ial/2", }, "client_registration": { "client_id": ("urn:gov:cisa:openidconnect.profiles:sp:sso:cisa:dotgov_registrar"), diff --git a/src/registrar/models/user.py b/src/registrar/models/user.py index 1f83870df..0a153b5c8 100644 --- a/src/registrar/models/user.py +++ b/src/registrar/models/user.py @@ -3,6 +3,8 @@ import logging from django.contrib.auth.models import AbstractUser from django.db import models +from registrar.models.user_domain_role import UserDomainRole + from .domain_invitation import DomainInvitation from .transition_domain import TransitionDomain from .domain import Domain @@ -64,6 +66,36 @@ class User(AbstractUser): def is_restricted(self): return self.status == self.RESTRICTED + @classmethod + def needs_identity_verification(cls, email, uuid): + """A method used by our oidc classes to test whether a user needs email/uuid verification + or the full identity PII verification""" + + # An existing user who is a domain manager of a domain (that is, + # they have an entry in UserDomainRole for their User) + try: + existing_user = cls.objects.get(username=uuid) + if existing_user and UserDomainRole.objects.filter(user=existing_user).exists(): + return False + except cls.DoesNotExist: + # Do nothing when the user is not found, as we're checking for existence. + pass + except Exception as err: + raise err + + # A new incoming user who is a domain manager for one of the domains + # that we inputted from Verisign (that is, their email address appears + # in the username field of a TransitionDomain) + if TransitionDomain.objects.filter(username=email).exists(): + return False + + # A new incoming user who is being invited to be a domain manager (that is, + # their email address is in DomainInvitation for an invitation that is not yet "retrieved"). + if DomainInvitation.objects.filter(email=email, status=DomainInvitation.INVITED).exists(): + return False + + return True + def check_domain_invitations_on_login(self): """When a user first arrives on the site, we need to retrieve any domain invitations that match their email address.""" diff --git a/src/registrar/tests/test_models.py b/src/registrar/tests/test_models.py index 0a02ccb90..83126ab7c 100644 --- a/src/registrar/tests/test_models.py +++ b/src/registrar/tests/test_models.py @@ -606,19 +606,15 @@ class TestInvitations(TestCase): class TestUser(TestCase): - """For now, just test actions that - occur on user login.""" + """Test actions that occur on user login, + test class method that controls how users get validated.""" def setUp(self): self.email = "mayor@igorville.gov" self.domain_name = "igorvilleInTransition.gov" + self.domain, _ = Domain.objects.get_or_create(name="igorville.gov") self.user, _ = User.objects.get_or_create(email=self.email) - # clean out the roles each time - UserDomainRole.objects.all().delete() - - TransitionDomain.objects.get_or_create(username="mayor@igorville.gov", domain_name=self.domain_name) - def tearDown(self): super().tearDown() Domain.objects.all().delete() @@ -626,6 +622,7 @@ class TestUser(TestCase): DomainInformation.objects.all().delete() TransitionDomain.objects.all().delete() User.objects.all().delete() + UserDomainRole.objects.all().delete() def test_check_transition_domains_without_domains_on_login(self): """A user's on_each_login callback does not check transition domains. @@ -634,3 +631,26 @@ class TestUser(TestCase): are created.""" self.user.on_each_login() self.assertFalse(Domain.objects.filter(name=self.domain_name).exists()) + + def test_identity_verification_with_domain_manager(self): + """A domain manager should return False when tested with class + method needs_identity_verification""" + UserDomainRole.objects.get_or_create(user=self.user, domain=self.domain, role=UserDomainRole.Roles.MANAGER) + self.assertFalse(User.needs_identity_verification(self.user.email, self.user.username)) + + def test_identity_verification_with_transition_user(self): + """A user from the Verisign transition should return False + when tested with class method needs_identity_verification""" + TransitionDomain.objects.get_or_create(username=self.user.email, domain_name=self.domain_name) + self.assertFalse(User.needs_identity_verification(self.user.email, self.user.username)) + + def test_identity_verification_with_invited_user(self): + """An invited user should return False when tested with class + method needs_identity_verification""" + DomainInvitation.objects.get_or_create(email=self.user.email, domain=self.domain) + self.assertFalse(User.needs_identity_verification(self.user.email, self.user.username)) + + def test_identity_verification_with_new_user(self): + """A new user who's neither transitioned nor invited should + return True when tested with class method needs_identity_verification""" + self.assertTrue(User.needs_identity_verification(self.user.email, self.user.username))