diff --git a/src/djangooidc/backends.py b/src/djangooidc/backends.py index 41e442f2d..38fde0ced 100644 --- a/src/djangooidc/backends.py +++ b/src/djangooidc/backends.py @@ -21,49 +21,66 @@ class OpenIdConnectBackend(ModelBackend): """ def authenticate(self, request, **kwargs): - logger.debug("kwargs %s" % kwargs) - user = None - if not kwargs or "sub" not in kwargs.keys(): - return user + logger.debug("kwargs %s", kwargs) + + if not kwargs or "sub" not in kwargs: + return None UserModel = get_user_model() username = self.clean_username(kwargs["sub"]) + openid_data = self.extract_openid_data(kwargs) - # Some OP may actually choose to withhold some information, so we must - # test if it is present - openid_data = {"last_login": timezone.now()} - openid_data["first_name"] = kwargs.get("given_name", "") - openid_data["last_name"] = kwargs.get("family_name", "") - openid_data["email"] = kwargs.get("email", "") - openid_data["phone"] = kwargs.get("phone", "") - - # Note that this could be accomplished in one try-except clause, but - # instead we use get_or_create when creating unknown users since it has - # built-in safeguards for multiple threads. if getattr(settings, "OIDC_CREATE_UNKNOWN_USER", True): - args = { - UserModel.USERNAME_FIELD: username, - # defaults _will_ be updated, these are not fallbacks - "defaults": openid_data, - } - - user, created = UserModel.objects.get_or_create(**args) - - if not created: - # If user exists, update existing user - self.update_existing_user(user, args["defaults"]) - else: - # If user is created, configure the user - user = self.configure_user(user, **kwargs) + user = self.get_or_create_user(UserModel, username, openid_data, kwargs) else: - try: - user = UserModel.objects.get_by_natural_key(username) - except UserModel.DoesNotExist: - return None - # run this callback for a each login - user.on_each_login() + user = self.get_user_by_username(UserModel, username) + + if user: + user.on_each_login() + return user + def extract_openid_data(self, kwargs): + """Extract OpenID data from authentication kwargs.""" + return { + "last_login": timezone.now(), + "first_name": kwargs.get("given_name", ""), + "last_name": kwargs.get("family_name", ""), + "email": kwargs.get("email", ""), + "phone": kwargs.get("phone", ""), + } + + def get_or_create_user(self, UserModel, username, openid_data, kwargs): + """Retrieve user by username or email, or create a new user.""" + user = self.get_user_by_username(UserModel, username) + + if not user and openid_data["email"]: + user = self.get_user_by_email(UserModel, openid_data["email"]) + if user: + # if found by email, update the username + setattr(user, UserModel.USERNAME_FIELD, username) + + if not user: + user = UserModel.objects.create(**{UserModel.USERNAME_FIELD: username}, **openid_data) + return self.configure_user(user, **kwargs) + + self.update_existing_user(user, openid_data) + return user + + def get_user_by_username(self, UserModel, username): + """Retrieve user by username.""" + try: + return UserModel.objects.get(**{UserModel.USERNAME_FIELD: username}) + except UserModel.DoesNotExist: + return None + + def get_user_by_email(self, UserModel, email): + """Retrieve user by email.""" + try: + return UserModel.objects.get(email=email) + except UserModel.DoesNotExist: + return None + def update_existing_user(self, user, kwargs): """ Update user fields without overwriting certain fields. diff --git a/src/djangooidc/tests/test_backends.py b/src/djangooidc/tests/test_backends.py index c15106fa9..4e8f80a23 100644 --- a/src/djangooidc/tests/test_backends.py +++ b/src/djangooidc/tests/test_backends.py @@ -1,5 +1,6 @@ from django.test import TestCase from registrar.models import User +from api.tests.common import less_console_noise_decorator from ..backends import OpenIdConnectBackend # Adjust the import path based on your project structure @@ -17,6 +18,7 @@ class OpenIdConnectBackendTestCase(TestCase): def tearDown(self) -> None: User.objects.all().delete() + @less_console_noise_decorator def test_authenticate_with_create_user(self): """Test that authenticate creates a new user if it does not find existing user""" @@ -32,6 +34,7 @@ class OpenIdConnectBackendTestCase(TestCase): self.assertEqual(user.email, "john.doe@example.com") self.assertEqual(user.phone, "123456789") + @less_console_noise_decorator def test_authenticate_with_existing_user(self): """Test that authenticate updates an existing user if it finds one. For this test, given_name and family_name are supplied""" @@ -50,6 +53,30 @@ class OpenIdConnectBackendTestCase(TestCase): self.assertEqual(user.email, "john.doe@example.com") self.assertEqual(user.phone, "123456789") + @less_console_noise_decorator + def test_authenticate_with_existing_user_same_email_different_username(self): + """Test that authenticate updates an existing user if it finds one. + In this case, match is to an existing record with matching email but + a non-matching username. The existing record's username should be udpated. + For this test, given_name and family_name are supplied""" + # Create an existing user with the same username + User.objects.create_user(username="old_username", email="john.doe@example.com") + + # Ensure that the authenticate method updates the existing user + user = self.backend.authenticate(request=None, **self.kwargs) + self.assertIsNotNone(user) + self.assertIsInstance(user, User) + + # Verify that user fields are correctly updated + self.assertEqual(user.first_name, "John") + self.assertEqual(user.last_name, "Doe") + self.assertEqual(user.email, "john.doe@example.com") + self.assertEqual(user.phone, "123456789") + self.assertEqual(user.username, "test_user") + # Assert that a user no longer exists by the old username + self.assertFalse(User.objects.filter(username="old_username").exists()) + + @less_console_noise_decorator def test_authenticate_with_existing_user_with_existing_first_last_phone(self): """Test that authenticate updates an existing user if it finds one. For this test, given_name and family_name are not supplied. @@ -79,6 +106,7 @@ class OpenIdConnectBackendTestCase(TestCase): self.assertEqual(user.email, "john.doe@example.com") self.assertEqual(user.phone, "9999999999") + @less_console_noise_decorator def test_authenticate_with_existing_user_different_name_phone(self): """Test that authenticate updates an existing user if it finds one. For this test, given_name and family_name are supplied and overwrite""" @@ -100,6 +128,7 @@ class OpenIdConnectBackendTestCase(TestCase): self.assertEqual(user.email, "john.doe@example.com") self.assertEqual(user.phone, "123456789") + @less_console_noise_decorator def test_authenticate_with_unknown_user(self): """Test that authenticate returns None when no kwargs are supplied""" # Ensure that the authenticate method handles the case when the user is not found diff --git a/src/registrar/assets/src/sass/_theme/_typography.scss b/src/registrar/assets/src/sass/_theme/_typography.scss index 75d5170e8..22069f726 100644 --- a/src/registrar/assets/src/sass/_theme/_typography.scss +++ b/src/registrar/assets/src/sass/_theme/_typography.scss @@ -11,7 +11,8 @@ address, } h1:not(.usa-alert__heading), -h2:not(.usa-alert__heading), +// .module h2 excludes headers in DJA +h2:not(.usa-alert__heading, .module h2), h3:not(.usa-alert__heading), h4:not(.usa-alert__heading), h5:not(.usa-alert__heading), diff --git a/src/registrar/fixtures/fixtures_users.py b/src/registrar/fixtures/fixtures_users.py index f1623e674..977bf0858 100644 --- a/src/registrar/fixtures/fixtures_users.py +++ b/src/registrar/fixtures/fixtures_users.py @@ -352,12 +352,37 @@ class UserFixture: @staticmethod def _get_existing_users(users): + # if users match existing users in db by email address, update the users with the username + # from the db. this will prevent duplicate users (with same email) from being added to db. + # it is ok to keep the old username in the db because the username will be updated by oidc process during login + + # Extract email addresses from users + emails = [user.get("email") for user in users] + + # Fetch existing users by email + existing_users_by_email = User.objects.filter(email__in=emails).values_list("email", "username", "id") + + # Create a dictionary to map emails to existing usernames + email_to_existing_user = {user[0]: user[1] for user in existing_users_by_email} + + # Update the users list with the usernames from existing users by email + for user in users: + email = user.get("email") + if email and email in email_to_existing_user: + user["username"] = email_to_existing_user[email] # Update username with the existing one + + # Get the user identifiers (username, id) for the existing users to query the database user_identifiers = [(user.get("username"), user.get("id")) for user in users] + + # Fetch existing users by username or id existing_users = User.objects.filter( username__in=[user[0] for user in user_identifiers] + [user[1] for user in user_identifiers] ).values_list("username", "id") + + # Create sets for usernames and ids that exist existing_usernames = set(user[0] for user in existing_users) existing_user_ids = set(user[1] for user in existing_users) + return existing_usernames, existing_user_ids @staticmethod