Merge branch 'main' into za/2146-remove-generic-org

This commit is contained in:
zandercymatics 2024-05-23 12:34:24 -06:00
commit 59dfb974c1
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
8 changed files with 166 additions and 53 deletions

View file

@ -15,6 +15,7 @@ from oic.oic.message import AccessTokenResponse
from oic.utils.authn.client import CLIENT_AUTHN_METHOD from oic.utils.authn.client import CLIENT_AUTHN_METHOD
from oic.utils import keyio from oic.utils import keyio
from . import exceptions as o_e from . import exceptions as o_e
__author__ = "roland" __author__ = "roland"
@ -84,6 +85,7 @@ class Client(oic.Client):
def create_authn_request( def create_authn_request(
self, self,
session, session,
do_step_up_auth=False,
extra_args=None, extra_args=None,
): ):
"""Step 2: Construct a login URL at OP's domain and send the user to it.""" """Step 2: Construct a login URL at OP's domain and send the user to it."""
@ -100,10 +102,11 @@ class Client(oic.Client):
"state": session["state"], "state": session["state"],
"nonce": session["nonce"], "nonce": session["nonce"],
"redirect_uri": self.registration_response["redirect_uris"][0], "redirect_uri": self.registration_response["redirect_uris"][0],
# acr_value may be passed in session if overriding, as in the case
# of step up auth, otherwise get from settings.py
"acr_values": session.get("acr_value") or self.behaviour.get("acr_value"),
} }
if do_step_up_auth:
self._set_args_for_biometric_auth_request(session, request_args)
else:
request_args["acr_values"] = self.behaviour.get("acr_value")
if extra_args is not None: if extra_args is not None:
request_args.update(extra_args) request_args.update(extra_args)
@ -114,6 +117,35 @@ class Client(oic.Client):
logger.debug("request args: %s" % request_args) logger.debug("request args: %s" % request_args)
url, headers = self._prepare_authn_request(request_args, state) # C901 too complex
try:
# create the redirect object
response = HttpResponseRedirect(str(url))
# add headers to the object, if any
if headers:
for key, value in headers.items():
response[key] = value
except Exception as err:
logger.error(err)
logger.error("Failed to create redirect object for %s" % state)
raise o_e.InternalError(locator=state)
return response
def _set_args_for_biometric_auth_request(self, session, request_args):
if "acr_value" in session:
session.pop("acr_value")
request_args["vtr"] = session.get("vtr")
request_args["vtm"] = session.get("vtm")
def _prepare_authn_request(self, request_args, state):
"""
Constructs an authorization request. Then, assembles the url, body, headers, and cis.
Returns the assembled url and associated header information: `(url, headers)`
"""
try: try:
# prepare the request for sending # prepare the request for sending
cis = self.construct_AuthorizationRequest(request_args=request_args) cis = self.construct_AuthorizationRequest(request_args=request_args)
@ -126,6 +158,7 @@ class Client(oic.Client):
method="GET", method="GET",
request_args=request_args, request_args=request_args,
) )
logger.debug("body: %s" % body) logger.debug("body: %s" % body)
logger.debug("URL: %s" % url) logger.debug("URL: %s" % url)
logger.debug("headers: %s" % headers) logger.debug("headers: %s" % headers)
@ -134,19 +167,7 @@ class Client(oic.Client):
logger.error("Failed to prepare request for %s" % state) logger.error("Failed to prepare request for %s" % state)
raise o_e.InternalError(locator=state) raise o_e.InternalError(locator=state)
try: return (url, headers)
# create the redirect object
response = HttpResponseRedirect(str(url))
# add headers to the object, if any
if headers:
for key, value in headers.items():
response[key] = value
except Exception as err:
logger.error(err)
logger.error("Failed to create redirect object for %s" % state)
raise o_e.InternalError(locator=state)
return response
def callback(self, unparsed_response, session): def callback(self, unparsed_response, session):
"""Step 3: Receive OP's response, request an access token, and user info.""" """Step 3: Receive OP's response, request an access token, and user info."""
@ -224,9 +245,18 @@ class Client(oic.Client):
if isinstance(info_response, ErrorResponse): if isinstance(info_response, ErrorResponse):
logger.error("Unable to get user info (%s) for %s" % (info_response.get("error", ""), state)) logger.error("Unable to get user info (%s) for %s" % (info_response.get("error", ""), state))
raise o_e.AuthenticationFailed(locator=state) raise o_e.AuthenticationFailed(locator=state)
info_response_dict = info_response.to_dict()
logger.debug("user info: %s" % info_response) # Define vtm/vtr information on the user dictionary so we can track this in one location.
return info_response.to_dict() # If a user has this information, then they are bumped up in terms of verification level.
if session.get("needs_step_up_auth") is True:
if "ial" in info_response_dict:
info_response_dict.pop("ial")
info_response_dict["vtm"] = session.get("vtm", "")
info_response_dict["vtr"] = session.get("vtr", "")
logger.debug("user info: %s" % info_response_dict)
return info_response_dict
def _request_token(self, state, code, session): def _request_token(self, state, code, session):
"""Request a token from OP to allow us to then request user info.""" """Request a token from OP to allow us to then request user info."""
@ -285,14 +315,20 @@ class Client(oic.Client):
super(Client, self).store_response(resp, info) super(Client, self).store_response(resp, info)
def get_default_acr_value(self): def get_default_acr_value(self):
"""returns the acr_value from settings """Returns the acr_value from settings.
this helper function is called from djangooidc views""" This helper function is called from djangooidc views."""
return self.behaviour.get("acr_value") return self.behaviour.get("acr_value")
def get_step_up_acr_value(self): def get_vtm_value(self):
"""returns the step_up_acr_value from settings """Returns the vtm value from settings.
this helper function is called from djangooidc views""" This helper function is called from djangooidc views."""
return self.behaviour.get("step_up_acr_value") return self.behaviour.get("vtm")
def get_vtr_value(self, cleaned=True):
"""Returns the vtr value from settings.
This helper function is called from djangooidc views."""
vtr = self.behaviour.get("vtr")
return json.dumps(vtr) if cleaned else vtr
def __repr__(self): def __repr__(self):
return "Client {} {} {}".format( return "Client {} {} {}".format(

View file

@ -397,25 +397,31 @@ class ViewsTest(TestCase):
"""Invoke login_callback passing it a request when _requires_step_up_auth returns True """Invoke login_callback passing it a request when _requires_step_up_auth returns True
and assert that session is updated and create_authn_request (mock) is called.""" and assert that session is updated and create_authn_request (mock) is called."""
with less_console_noise(): with less_console_noise():
# MOCK
# Configure the mock to return an expected value for get_step_up_acr_value
mock_client.return_value.get_step_up_acr_value.return_value = "step_up_acr_value"
# Create a mock request # Create a mock request
request = self.factory.get("/some-url") request = self.factory.get("/some-url")
request.session = {"acr_value": ""} request.session = {"acr_value": ""}
# Ensure that the CLIENT instance used in login_callback is the mock # Ensure that the CLIENT instance used in login_callback is the mock
# patch _requires_step_up_auth to return True # patch _requires_step_up_auth to return True
with patch("djangooidc.views._requires_step_up_auth", return_value=True), patch( with patch("djangooidc.views._requires_step_up_auth", return_value=True), patch(
"djangooidc.views.CLIENT.create_authn_request", return_value=MagicMock() "djangooidc.views.CLIENT.create_authn_request"
) as mock_create_authn_request: ) as mock_create_authn_request:
# TEST with patch("djangooidc.views.CLIENT.get_vtm_value") as mock_get_vtm_value, patch(
# test the login callback "djangooidc.views.CLIENT.get_vtr_value"
login_callback(request) ) as mock_get_vtr_value:
mock_get_vtm_value.return_value = "test_vtm"
mock_get_vtr_value.return_value = "test_vtr"
# TEST
# test the login callback
login_callback(request)
# ASSERTIONS # ASSERTIONS
# create_authn_request only gets called when _requires_step_up_auth is True # create_authn_request only gets called when _requires_step_up_auth is True.
# and it changes this acr_value in request.session # The acr_value should be blank here
# Assert that acr_value is no longer empty string self.assertEqual(request.session["acr_value"], "")
self.assertNotEqual(request.session["acr_value"], "") self.assertEqual(request.session["vtm"], "test_vtm")
self.assertEqual(request.session["vtr"], "test_vtr")
# And create_authn_request was called again # And create_authn_request was called again
mock_create_authn_request.assert_called_once() mock_create_authn_request.assert_called_once()

View file

@ -91,12 +91,21 @@ def login_callback(request):
_initialize_client() _initialize_client()
query = parse_qs(request.GET.urlencode()) query = parse_qs(request.GET.urlencode())
userinfo = CLIENT.callback(query, request.session) userinfo = CLIENT.callback(query, request.session)
# test for need for identity verification and if it is satisfied # test for need for identity verification and if it is satisfied
# if not satisfied, redirect user to login with stepped up acr_value # if not satisfied, redirect user to login requiring biometric auth
if _requires_step_up_auth(userinfo):
# add acr_value to request.session # Tests for the presence of the vtm/vtr values in the userinfo object.
request.session["acr_value"] = CLIENT.get_step_up_acr_value() # If they are there, then we can set a flag in our session for tracking purposes.
return CLIENT.create_authn_request(request.session) needs_step_up_auth = _requires_step_up_auth(userinfo)
request.session["needs_step_up_auth"] = needs_step_up_auth
# Return a redirect request to a new auth url that does biometric validation
if needs_step_up_auth:
request.session["vtm"] = CLIENT.get_vtm_value()
request.session["vtr"] = CLIENT.get_vtr_value()
return CLIENT.create_authn_request(request.session, do_step_up_auth=True)
user = authenticate(request=request, **userinfo) user = authenticate(request=request, **userinfo)
if user: if user:
@ -138,14 +147,27 @@ def login_callback(request):
return error_page(request, err) return error_page(request, err)
def _requires_step_up_auth(userinfo): def _requires_step_up_auth(userinfo) -> bool:
"""if User.needs_identity_verification and step_up_acr_value not in """
ial returned from callback, return True""" Checks for the presence of the key 'vtm' and 'vtr' in the provided `userinfo` object.
step_up_acr_value = CLIENT.get_step_up_acr_value()
acr_value = userinfo.get("ial", "") If they are not found, then we call `User.needs_identity_verification()`.
Args:
userinfo (dict): A dictionary of data from the returned user object.
Return Conditions:
If the provided user does not exist in any tables which would preclude them from doing
biometric authentication, then we return True. Otherwise, we return False.
Alternatively, if 'vtm' and 'vtr' already exist on `userinfo`, then we return False.
"""
uuid = userinfo.get("sub", "") uuid = userinfo.get("sub", "")
email = userinfo.get("email", "") email = userinfo.get("email", "")
if acr_value != step_up_acr_value: # This value is returned after successful auth
user_verified = userinfo.get("vot", "")
if not userinfo.get("vtm") or not userinfo.get("vtr") or not user_verified:
# The acr of this attempt is not at the highest level # The acr of this attempt is not at the highest level
# so check if the user needs the higher level # so check if the user needs the higher level
return User.needs_identity_verification(email, uuid) return User.needs_identity_verification(email, uuid)

View file

@ -594,7 +594,7 @@ class MyUserAdmin(BaseUserAdmin, ImportExportModelAdmin):
None, None,
{"fields": ("username", "password", "status", "verification_type")}, {"fields": ("username", "password", "status", "verification_type")},
), ),
("Personal Info", {"fields": ("first_name", "last_name", "email")}), ("Personal Info", {"fields": ("first_name", "middle_name", "last_name", "email", "title")}),
( (
"Permissions", "Permissions",
{ {
@ -625,7 +625,7 @@ class MyUserAdmin(BaseUserAdmin, ImportExportModelAdmin):
) )
}, },
), ),
("Personal Info", {"fields": ("first_name", "last_name", "email")}), ("Personal Info", {"fields": ("first_name", "middle_name", "last_name", "email", "title")}),
( (
"Permissions", "Permissions",
{ {
@ -651,7 +651,9 @@ class MyUserAdmin(BaseUserAdmin, ImportExportModelAdmin):
analyst_readonly_fields = [ analyst_readonly_fields = [
"Personal Info", "Personal Info",
"first_name", "first_name",
"middle_name",
"last_name", "last_name",
"title",
"email", "email",
"Permissions", "Permissions",
"is_active", "is_active",
@ -1170,7 +1172,7 @@ class DomainInformationAdmin(ListHeaderAdmin, ImportExportModelAdmin):
] ]
# Readonly fields for analysts and superusers # Readonly fields for analysts and superusers
readonly_fields = ("other_contacts", "is_election_board") readonly_fields = ("other_contacts", "is_election_board", "federal_agency")
# Read only that we'll leverage for CISA Analysts # Read only that we'll leverage for CISA Analysts
analyst_readonly_fields = [ analyst_readonly_fields = [
@ -1435,6 +1437,7 @@ class DomainRequestAdmin(ListHeaderAdmin, ImportExportModelAdmin):
"current_websites", "current_websites",
"alternative_domains", "alternative_domains",
"is_election_board", "is_election_board",
"federal_agency",
) )
# Read only that we'll leverage for CISA Analysts # Read only that we'll leverage for CISA Analysts
@ -1876,7 +1879,7 @@ class DomainAdmin(ListHeaderAdmin, ImportExportModelAdmin):
search_fields = ["name"] search_fields = ["name"]
search_help_text = "Search by domain name." search_help_text = "Search by domain name."
change_form_template = "django/admin/domain_change_form.html" change_form_template = "django/admin/domain_change_form.html"
readonly_fields = ["state", "expiration_date", "first_ready", "deleted"] readonly_fields = ("state", "expiration_date", "first_ready", "deleted", "federal_agency")
# Table ordering # Table ordering
ordering = ["name"] ordering = ["name"]

View file

@ -562,7 +562,11 @@ OIDC_PROVIDERS = {
"scope": ["email", "profile:name", "phone"], "scope": ["email", "profile:name", "phone"],
"user_info_request": ["email", "first_name", "last_name", "phone"], "user_info_request": ["email", "first_name", "last_name", "phone"],
"acr_value": "http://idmanagement.gov/ns/assurance/ial/1", "acr_value": "http://idmanagement.gov/ns/assurance/ial/1",
"step_up_acr_value": "http://idmanagement.gov/ns/assurance/ial/2", # "P1" is the current IdV option; "Pb" stands for 'biometric'
"vtr": ["Pb", "P1"],
# The url that biometric authentication takes place at.
# A similar analog is the url for acr_value.
"vtm": "https://developer.login.gov/vot-trust-framework",
}, },
"client_registration": { "client_registration": {
"client_id": "cisa_dotgov_registrar", "client_id": "cisa_dotgov_registrar",
@ -580,7 +584,11 @@ OIDC_PROVIDERS = {
"scope": ["email", "profile:name", "phone"], "scope": ["email", "profile:name", "phone"],
"user_info_request": ["email", "first_name", "last_name", "phone"], "user_info_request": ["email", "first_name", "last_name", "phone"],
"acr_value": "http://idmanagement.gov/ns/assurance/ial/1", "acr_value": "http://idmanagement.gov/ns/assurance/ial/1",
"step_up_acr_value": "http://idmanagement.gov/ns/assurance/ial/2", # "P1" is the current IdV option; "Pb" stands for 'biometric'
"vtr": ["Pb", "P1"],
# The url that biometric authentication takes place at.
# A similar analog is the url for acr_value.
"vtm": "https://developer.login.gov/vot-trust-framework",
}, },
"client_registration": { "client_registration": {
"client_id": ("urn:gov:cisa:openidconnect.profiles:sp:sso:cisa:dotgov_registrar"), "client_id": ("urn:gov:cisa:openidconnect.profiles:sp:sso:cisa:dotgov_registrar"),

View file

@ -0,0 +1,23 @@
# Generated by Django 4.2.10 on 2024-05-22 14:54
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("registrar", "0094_create_groups_v12"),
]
operations = [
migrations.AddField(
model_name="user",
name="middle_name",
field=models.CharField(blank=True, null=True),
),
migrations.AddField(
model_name="user",
name="title",
field=models.CharField(blank=True, null=True, verbose_name="title / role"),
),
]

View file

@ -80,6 +80,17 @@ class User(AbstractUser):
db_index=True, db_index=True,
) )
middle_name = models.CharField(
null=True,
blank=True,
)
title = models.CharField(
null=True,
blank=True,
verbose_name="title / role",
)
verification_type = models.CharField( verification_type = models.CharField(
choices=VerificationTypeChoices.choices, choices=VerificationTypeChoices.choices,
null=True, null=True,

View file

@ -2231,6 +2231,7 @@ class TestDomainRequestAdmin(MockEppLib):
"current_websites", "current_websites",
"alternative_domains", "alternative_domains",
"is_election_board", "is_election_board",
"federal_agency",
"id", "id",
"created_at", "created_at",
"updated_at", "updated_at",
@ -2284,6 +2285,7 @@ class TestDomainRequestAdmin(MockEppLib):
"current_websites", "current_websites",
"alternative_domains", "alternative_domains",
"is_election_board", "is_election_board",
"federal_agency",
"creator", "creator",
"about_your_organization", "about_your_organization",
"requested_domain", "requested_domain",
@ -2311,6 +2313,7 @@ class TestDomainRequestAdmin(MockEppLib):
"current_websites", "current_websites",
"alternative_domains", "alternative_domains",
"is_election_board", "is_election_board",
"federal_agency",
] ]
self.assertEqual(readonly_fields, expected_fields) self.assertEqual(readonly_fields, expected_fields)
@ -3168,6 +3171,7 @@ class TestDomainInformationAdmin(TestCase):
expected_fields = [ expected_fields = [
"other_contacts", "other_contacts",
"is_election_board", "is_election_board",
"federal_agency",
"creator", "creator",
"type_of_work", "type_of_work",
"more_organization_information", "more_organization_information",
@ -3530,7 +3534,7 @@ class TestMyUserAdmin(TestCase):
) )
}, },
), ),
("Personal Info", {"fields": ("first_name", "last_name", "email")}), ("Personal Info", {"fields": ("first_name", "middle_name", "last_name", "email", "title")}),
("Permissions", {"fields": ("is_active", "groups")}), ("Permissions", {"fields": ("is_active", "groups")}),
("Important dates", {"fields": ("last_login", "date_joined")}), ("Important dates", {"fields": ("last_login", "date_joined")}),
) )