Fix linter errors

This commit is contained in:
Seamus Johnston 2022-09-22 15:57:40 -05:00
parent 00f87168e5
commit f16d657c6a
No known key found for this signature in database
GPG key ID: 2F21225985069105
5 changed files with 101 additions and 97 deletions

View file

@ -10,6 +10,7 @@ from django.utils import timezone
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class OpenIdConnectBackend(ModelBackend): class OpenIdConnectBackend(ModelBackend):
""" """
This backend checks a previously performed OIDC authentication. This backend checks a previously performed OIDC authentication.
@ -33,18 +34,12 @@ class OpenIdConnectBackend(ModelBackend):
# Some OP may actually choose to withhold some information, so we must # Some OP may actually choose to withhold some information, so we must
# test if it is present # test if it is present
openid_data = {"last_login": timezone.now()} openid_data = {"last_login": timezone.now()}
if "first_name" in kwargs.keys(): openid_data["first_name"] = kwargs.get("first_name", "")
openid_data["first_name"] = kwargs["first_name"] openid_data["first_name"] = kwargs.get("given_name", "")
if "given_name" in kwargs.keys(): openid_data["first_name"] = kwargs.get("christian_name", "")
openid_data["first_name"] = kwargs["given_name"] openid_data["last_name"] = kwargs.get("family_name", "")
if "christian_name" in kwargs.keys(): openid_data["last_name"] = kwargs.get("last_name", "")
openid_data["first_name"] = kwargs["christian_name"] openid_data["email"] = kwargs.get("email", "")
if "family_name" in kwargs.keys():
openid_data["last_name"] = kwargs["family_name"]
if "last_name" in kwargs.keys():
openid_data["last_name"] = kwargs["last_name"]
if "email" in kwargs.keys():
openid_data["email"] = kwargs["email"]
# Note that this could be accomplished in one try-except clause, but # Note that this could be accomplished in one try-except clause, but
# instead we use get_or_create when creating unknown users since it has # instead we use get_or_create when creating unknown users since it has

View file

@ -9,16 +9,12 @@ class OIDCException(Exception):
`.locator`, if used, should be a useful, unique identifier for `.locator`, if used, should be a useful, unique identifier for
locating related log messages. locating related log messages.
""" """
status = status.INTERNAL_SERVER_ERROR status = status.INTERNAL_SERVER_ERROR
friendly_message = "A server error occurred." friendly_message = "A server error occurred."
locator = None locator = None
def __init__( def __init__(self, friendly_message=None, status=None, locator=None):
self,
friendly_message=None,
status=None,
locator=None
):
if friendly_message is not None: if friendly_message is not None:
self.friendly_message = friendly_message self.friendly_message = friendly_message
if status is not None: if status is not None:
@ -43,4 +39,4 @@ class InternalError(OIDCException):
class BannedUser(AuthenticationFailed): class BannedUser(AuthenticationFailed):
friendly_message = "Your user is not valid in this application." friendly_message = "Your user is not valid in this application."

View file

@ -4,11 +4,6 @@ from __future__ import unicode_literals
import logging import logging
import json import json
try:
from builtins import unicode as str
except ImportError:
pass
from django.conf import settings from django.conf import settings
from django.http import HttpResponseRedirect from django.http import HttpResponseRedirect
from Cryptodome.PublicKey.RSA import importKey from Cryptodome.PublicKey.RSA import importKey
@ -50,7 +45,8 @@ class Client(oic.Client):
except Exception as err: except Exception as err:
logger.error(err) logger.error(err)
logger.error( logger.error(
"Key jar preparation failed for %s", provider["srv_discovery_url"], "Key jar preparation failed for %s",
provider["srv_discovery_url"],
) )
raise o_e.InternalError() raise o_e.InternalError()
@ -68,7 +64,8 @@ class Client(oic.Client):
except Exception as err: except Exception as err:
logger.error(err) logger.error(err)
logger.error( logger.error(
"Client creation failed for %s", provider["srv_discovery_url"], "Client creation failed for %s",
provider["srv_discovery_url"],
) )
raise o_e.InternalError() raise o_e.InternalError()
@ -81,7 +78,8 @@ class Client(oic.Client):
except Exception as err: except Exception as err:
logger.error(err) logger.error(err)
logger.error( logger.error(
"Provider info discovery failed for %s", provider["srv_discovery_url"], "Provider info discovery failed for %s",
provider["srv_discovery_url"],
) )
raise o_e.InternalError() raise o_e.InternalError()
@ -104,7 +102,7 @@ class Client(oic.Client):
"state": session["state"], "state": session["state"],
"nonce": session["nonce"], "nonce": session["nonce"],
"redirect_uri": self.registration_response["redirect_uris"][0], "redirect_uri": self.registration_response["redirect_uris"][0],
"acr_values": self.behaviour.get("acr_value") "acr_values": self.behaviour.get("acr_value"),
} }
if extra_args is not None: if extra_args is not None:
@ -160,7 +158,7 @@ class Client(oic.Client):
AuthorizationResponse, AuthorizationResponse,
unparsed_response, unparsed_response,
sformat="dict", sformat="dict",
keyjar=self.keyjar keyjar=self.keyjar,
) )
except Exception as err: except Exception as err:
logger.error(err) logger.error(err)
@ -171,7 +169,9 @@ class Client(oic.Client):
if isinstance(authn_response, ErrorResponse): if isinstance(authn_response, ErrorResponse):
error = authn_response.get("error", "") error = authn_response.get("error", "")
if error == "login_required": if error == "login_required":
logger.warning("User was not logged in (%s), trying again for %s" % (error, state)) logger.warning(
"User was not logged in (%s), trying again for %s" % (error, state)
)
return self.create_authn_request(session) return self.create_authn_request(session)
else: else:
logger.error("Unable to process response %s for %s" % (error, state)) logger.error("Unable to process response %s for %s" % (error, state))
@ -189,56 +189,19 @@ class Client(oic.Client):
raise o_e.AuthenticationFailed(locator=state) raise o_e.AuthenticationFailed(locator=state)
if self.behaviour.get("response_type") == "code": if self.behaviour.get("response_type") == "code":
try: # need an access token to get user info (and to log the user out later)
# request a new token by which we may interact with OP self._request_token(
# on behalf of the user authn_response["state"], authn_response["code"], session
token_response = self.do_access_token_request( )
scope="openid",
state=authn_response["state"],
request_args={
"code": authn_response["code"],
"redirect_uri": self.registration_response["redirect_uris"][0],
"client_id": self.client_id,
"client_secret": self.client_secret,
},
authn_method=self.registration_response[
"token_endpoint_auth_method"
],
)
except Exception as err:
logger.error(err)
logger.error("Unable to obtain access token for %s" % state)
raise o_e.AuthenticationFailed(locator=state)
# ErrorResponse is not raised, it is passed back... user_info = self._get_user_info(state, session)
if isinstance(token_response, ErrorResponse):
logger.error("Unable to get token (%s) for %s" % (token_response.get("error",""), state))
raise o_e.AuthenticationFailed(locator=state)
logger.debug("token response %s" % token_response) return user_info
try:
# get the token and other bits of info
id_token = token_response["id_token"]._dict
if id_token["nonce"] != session["nonce"]:
logger.error("Received nonce not the same as expected for %s" % state)
raise o_e.AuthenticationFailed(locator=state)
session["id_token"] = id_token
session["id_token_raw"] = getattr(self, "id_token_raw", None)
session["access_token"] = token_response["access_token"]
session["refresh_token"] = token_response.get("refresh_token", "")
session["expires_in"] = token_response.get("expires_in", "")
self.id_token[authn_response["state"]] = getattr(self, "id_token_raw", None)
except Exception as err:
logger.error(err)
logger.error("Unable to parse access token response for %s" % state)
raise o_e.AuthenticationFailed(locator=state)
def _get_user_info(self, state, session):
"""Get information from OP about the user."""
scopes = list(self.behaviour.get("user_info_request", [])) scopes = list(self.behaviour.get("user_info_request", []))
scopes.append("openid") scopes.append("openid")
try: try:
# get info about the user from OP # get info about the user from OP
info_response = self.do_user_info_request( info_response = self.do_user_info_request(
@ -253,13 +216,62 @@ class Client(oic.Client):
# ErrorResponse is not raised, it is passed back... # ErrorResponse is not raised, it is passed back...
if isinstance(info_response, ErrorResponse): if isinstance(info_response, ErrorResponse):
logger.error("Unable to get user info (%s) for %s" % (info_response.get("error",""), state)) logger.error(
"Unable to get user info (%s) for %s"
% (info_response.get("error", ""), state)
)
raise o_e.AuthenticationFailed(locator=state) raise o_e.AuthenticationFailed(locator=state)
user_info = info_response.to_dict() logger.debug("user info: %s" % info_response)
logger.debug("user info: %s" % user_info) return info_response.to_dict()
return user_info def _request_token(self, state, code, session):
"""Request a token from OP to allow us to then request user info."""
try:
token_response = self.do_access_token_request(
scope="openid",
state=state,
request_args={
"code": code,
"redirect_uri": self.registration_response["redirect_uris"][0],
"client_id": self.client_id,
"client_secret": self.client_secret,
},
authn_method=self.registration_response["token_endpoint_auth_method"],
)
except Exception as err:
logger.error(err)
logger.error("Unable to obtain access token for %s" % state)
raise o_e.AuthenticationFailed(locator=state)
# ErrorResponse is not raised, it is passed back...
if isinstance(token_response, ErrorResponse):
logger.error(
"Unable to get token (%s) for %s"
% (token_response.get("error", ""), state)
)
raise o_e.AuthenticationFailed(locator=state)
logger.debug("token response %s" % token_response)
try:
# get the token and other bits of info
id_token = token_response["id_token"]._dict
if id_token["nonce"] != session["nonce"]:
logger.error("Received nonce not the same as expected for %s" % state)
raise o_e.AuthenticationFailed(locator=state)
session["id_token"] = id_token
session["id_token_raw"] = getattr(self, "id_token_raw", None)
session["access_token"] = token_response["access_token"]
session["refresh_token"] = token_response.get("refresh_token", "")
session["expires_in"] = token_response.get("expires_in", "")
self.id_token[state] = getattr(self, "id_token_raw", None)
except Exception as err:
logger.error(err)
logger.error("Unable to parse access token response for %s" % state)
raise o_e.AuthenticationFailed(locator=state)
def store_response(self, resp, info): def store_response(self, resp, info):
"""Make raw ID token available for internal use.""" """Make raw ID token available for internal use."""

View file

@ -2,18 +2,13 @@
import logging import logging
try:
from urllib.parse import parse_qs, urlencode
except ImportError:
from urllib import urlencode
from urlparse import parse_qs
from django.conf import settings from django.conf import settings
from django.contrib.auth import logout as auth_logout from django.contrib.auth import logout as auth_logout
from django.contrib.auth import authenticate, login from django.contrib.auth import authenticate, login
from django.http import HttpResponseRedirect from django.http import HttpResponseRedirect
from django.shortcuts import redirect, render from django.shortcuts import redirect, render
from urllib.parse import parse_qs, urlencode
from djangooidc.oidc import Client from djangooidc.oidc import Client
from djangooidc import exceptions as o_e from djangooidc import exceptions as o_e
@ -35,9 +30,9 @@ def error_page(request, error):
"401.html", "401.html",
context={ context={
"friendly_message": error.friendly_message, "friendly_message": error.friendly_message,
"log_identifier": error.locator "log_identifier": error.locator,
}, },
status=401 status=401,
) )
if isinstance(error, o_e.InternalError): if isinstance(error, o_e.InternalError):
return render( return render(
@ -45,13 +40,14 @@ def error_page(request, error):
"500.html", "500.html",
context={ context={
"friendly_message": error.friendly_message, "friendly_message": error.friendly_message,
"log_identifier": error.locator "log_identifier": error.locator,
}, },
status=500 status=500,
) )
if isinstance(error, Exception): if isinstance(error, Exception):
return render(request, "500.html", status=500) return render(request, "500.html", status=500)
def openid(request): def openid(request):
"""Redirect the user to an authentication provider (OP).""" """Redirect the user to an authentication provider (OP)."""
request.session["next"] = request.GET.get("next", "/") request.session["next"] = request.GET.get("next", "/")
@ -61,6 +57,7 @@ def openid(request):
except Exception as err: except Exception as err:
return error_page(request, err) return error_page(request, err)
def login_callback(request): def login_callback(request):
"""Analyze the token returned by the authentication provider (OP).""" """Analyze the token returned by the authentication provider (OP)."""
try: try:
@ -90,11 +87,13 @@ def logout(request, next_page=None):
"post_logout_redirect_uris" in CLIENT.registration_response.keys() "post_logout_redirect_uris" in CLIENT.registration_response.keys()
and len(CLIENT.registration_response["post_logout_redirect_uris"]) > 0 and len(CLIENT.registration_response["post_logout_redirect_uris"]) > 0
): ):
request_args.update({ request_args.update(
"post_logout_redirect_uri": {
CLIENT.registration_response["post_logout_redirect_uris"][0] "post_logout_redirect_uri": CLIENT.registration_response[
}) "post_logout_redirect_uris"
][0]
}
)
url = CLIENT.provider_info["end_session_endpoint"] url = CLIENT.provider_info["end_session_endpoint"]
url += "?" + urlencode(request_args) url += "?" + urlencode(request_args)
@ -110,6 +109,7 @@ def logout(request, next_page=None):
if next_page: if next_page:
request.session["next"] = next_page request.session["next"] = next_page
def logout_callback(request): def logout_callback(request):
"""Simple redirection view: after logout, redirect to `next`.""" """Simple redirection view: after logout, redirect to `next`."""
next = request.session["next"] if "next" in request.session.keys() else "/" next = request.session["next"] if "next" in request.session.keys() else "/"

View file

@ -309,7 +309,7 @@ LOGGING = {
"()": "django.utils.log.ServerFormatter", "()": "django.utils.log.ServerFormatter",
"format": "[{server_time}] {message}", "format": "[{server_time}] {message}",
"style": "{", "style": "{",
} },
}, },
# define where log messages will be sent; # define where log messages will be sent;
# each logger can have one or more handlers # each logger can have one or more handlers
@ -381,7 +381,8 @@ LOGIN_URL = "openid/openid/login"
# where to go after logging out # where to go after logging out
LOGOUT_REDIRECT_URL = "home" LOGOUT_REDIRECT_URL = "home"
# disable dynamic client registration, only the OP inside OIDC_PROVIDERS will be available # disable dynamic client registration,
# only the OP inside OIDC_PROVIDERS will be available
OIDC_ALLOW_DYNAMIC_OP = False OIDC_ALLOW_DYNAMIC_OP = False
# which provider to use if multiple are available # which provider to use if multiple are available