mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-07-26 04:28:39 +02:00
update logic for oidc backend and update tests
This commit is contained in:
parent
5b6776f81d
commit
ff959ce519
2 changed files with 81 additions and 35 deletions
|
@ -21,49 +21,66 @@ class OpenIdConnectBackend(ModelBackend):
|
|||
"""
|
||||
|
||||
def authenticate(self, request, **kwargs):
|
||||
logger.debug("kwargs %s" % kwargs)
|
||||
user = None
|
||||
if not kwargs or "sub" not in kwargs.keys():
|
||||
return user
|
||||
logger.debug("kwargs %s", kwargs)
|
||||
|
||||
if not kwargs or "sub" not in kwargs:
|
||||
return None
|
||||
|
||||
UserModel = get_user_model()
|
||||
username = self.clean_username(kwargs["sub"])
|
||||
openid_data = self.extract_openid_data(kwargs)
|
||||
|
||||
# Some OP may actually choose to withhold some information, so we must
|
||||
# test if it is present
|
||||
openid_data = {"last_login": timezone.now()}
|
||||
openid_data["first_name"] = kwargs.get("given_name", "")
|
||||
openid_data["last_name"] = kwargs.get("family_name", "")
|
||||
openid_data["email"] = kwargs.get("email", "")
|
||||
openid_data["phone"] = kwargs.get("phone", "")
|
||||
|
||||
# Note that this could be accomplished in one try-except clause, but
|
||||
# instead we use get_or_create when creating unknown users since it has
|
||||
# built-in safeguards for multiple threads.
|
||||
if getattr(settings, "OIDC_CREATE_UNKNOWN_USER", True):
|
||||
args = {
|
||||
UserModel.USERNAME_FIELD: username,
|
||||
# defaults _will_ be updated, these are not fallbacks
|
||||
"defaults": openid_data,
|
||||
}
|
||||
|
||||
user, created = UserModel.objects.get_or_create(**args)
|
||||
|
||||
if not created:
|
||||
# If user exists, update existing user
|
||||
self.update_existing_user(user, args["defaults"])
|
||||
else:
|
||||
# If user is created, configure the user
|
||||
user = self.configure_user(user, **kwargs)
|
||||
user = self.get_or_create_user(UserModel, username, openid_data, kwargs)
|
||||
else:
|
||||
try:
|
||||
user = UserModel.objects.get_by_natural_key(username)
|
||||
except UserModel.DoesNotExist:
|
||||
return None
|
||||
# run this callback for a each login
|
||||
user.on_each_login()
|
||||
user = self.get_user_by_username(UserModel, username)
|
||||
|
||||
if user:
|
||||
user.on_each_login()
|
||||
|
||||
return user
|
||||
|
||||
def extract_openid_data(self, kwargs):
|
||||
"""Extract OpenID data from authentication kwargs."""
|
||||
return {
|
||||
"last_login": timezone.now(),
|
||||
"first_name": kwargs.get("given_name", ""),
|
||||
"last_name": kwargs.get("family_name", ""),
|
||||
"email": kwargs.get("email", ""),
|
||||
"phone": kwargs.get("phone", ""),
|
||||
}
|
||||
|
||||
def get_or_create_user(self, UserModel, username, openid_data, kwargs):
|
||||
"""Retrieve user by username or email, or create a new user."""
|
||||
user = self.get_user_by_username(UserModel, username)
|
||||
|
||||
if not user and openid_data["email"]:
|
||||
user = self.get_user_by_email(UserModel, openid_data["email"])
|
||||
if user:
|
||||
# if found by email, update the username
|
||||
setattr(user, UserModel.USERNAME_FIELD, username)
|
||||
|
||||
if not user:
|
||||
user = UserModel.objects.create(**{UserModel.USERNAME_FIELD: username}, **openid_data)
|
||||
return self.configure_user(user, **kwargs)
|
||||
|
||||
self.update_existing_user(user, openid_data)
|
||||
return user
|
||||
|
||||
def get_user_by_username(self, UserModel, username):
|
||||
"""Retrieve user by username."""
|
||||
try:
|
||||
return UserModel.objects.get(**{UserModel.USERNAME_FIELD: username})
|
||||
except UserModel.DoesNotExist:
|
||||
return None
|
||||
|
||||
def get_user_by_email(self, UserModel, email):
|
||||
"""Retrieve user by email."""
|
||||
try:
|
||||
return UserModel.objects.get(email=email)
|
||||
except UserModel.DoesNotExist:
|
||||
return None
|
||||
|
||||
def update_existing_user(self, user, kwargs):
|
||||
"""
|
||||
Update user fields without overwriting certain fields.
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from django.test import TestCase
|
||||
from registrar.models import User
|
||||
from api.tests.common import less_console_noise_decorator
|
||||
from ..backends import OpenIdConnectBackend # Adjust the import path based on your project structure
|
||||
|
||||
|
||||
|
@ -17,6 +18,7 @@ class OpenIdConnectBackendTestCase(TestCase):
|
|||
def tearDown(self) -> None:
|
||||
User.objects.all().delete()
|
||||
|
||||
@less_console_noise_decorator
|
||||
def test_authenticate_with_create_user(self):
|
||||
"""Test that authenticate creates a new user if it does not find
|
||||
existing user"""
|
||||
|
@ -32,6 +34,7 @@ class OpenIdConnectBackendTestCase(TestCase):
|
|||
self.assertEqual(user.email, "john.doe@example.com")
|
||||
self.assertEqual(user.phone, "123456789")
|
||||
|
||||
@less_console_noise_decorator
|
||||
def test_authenticate_with_existing_user(self):
|
||||
"""Test that authenticate updates an existing user if it finds one.
|
||||
For this test, given_name and family_name are supplied"""
|
||||
|
@ -50,6 +53,30 @@ class OpenIdConnectBackendTestCase(TestCase):
|
|||
self.assertEqual(user.email, "john.doe@example.com")
|
||||
self.assertEqual(user.phone, "123456789")
|
||||
|
||||
@less_console_noise_decorator
|
||||
def test_authenticate_with_existing_user_same_email_different_username(self):
|
||||
"""Test that authenticate updates an existing user if it finds one.
|
||||
In this case, match is to an existing record with matching email but
|
||||
a non-matching username. The existing record's username should be udpated.
|
||||
For this test, given_name and family_name are supplied"""
|
||||
# Create an existing user with the same username
|
||||
User.objects.create_user(username="old_username", email="john.doe@example.com")
|
||||
|
||||
# Ensure that the authenticate method updates the existing user
|
||||
user = self.backend.authenticate(request=None, **self.kwargs)
|
||||
self.assertIsNotNone(user)
|
||||
self.assertIsInstance(user, User)
|
||||
|
||||
# Verify that user fields are correctly updated
|
||||
self.assertEqual(user.first_name, "John")
|
||||
self.assertEqual(user.last_name, "Doe")
|
||||
self.assertEqual(user.email, "john.doe@example.com")
|
||||
self.assertEqual(user.phone, "123456789")
|
||||
self.assertEqual(user.username, "test_user")
|
||||
# Assert that a user no longer exists by the old username
|
||||
self.assertFalse(User.objects.filter(username="old_username").exists())
|
||||
|
||||
@less_console_noise_decorator
|
||||
def test_authenticate_with_existing_user_with_existing_first_last_phone(self):
|
||||
"""Test that authenticate updates an existing user if it finds one.
|
||||
For this test, given_name and family_name are not supplied.
|
||||
|
@ -79,6 +106,7 @@ class OpenIdConnectBackendTestCase(TestCase):
|
|||
self.assertEqual(user.email, "john.doe@example.com")
|
||||
self.assertEqual(user.phone, "9999999999")
|
||||
|
||||
@less_console_noise_decorator
|
||||
def test_authenticate_with_existing_user_different_name_phone(self):
|
||||
"""Test that authenticate updates an existing user if it finds one.
|
||||
For this test, given_name and family_name are supplied and overwrite"""
|
||||
|
@ -100,6 +128,7 @@ class OpenIdConnectBackendTestCase(TestCase):
|
|||
self.assertEqual(user.email, "john.doe@example.com")
|
||||
self.assertEqual(user.phone, "123456789")
|
||||
|
||||
@less_console_noise_decorator
|
||||
def test_authenticate_with_unknown_user(self):
|
||||
"""Test that authenticate returns None when no kwargs are supplied"""
|
||||
# Ensure that the authenticate method handles the case when the user is not found
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue