This commit is contained in:
Rachid Mrad 2023-12-12 18:28:25 -05:00
parent 8febad976d
commit e7e3df0422
No known key found for this signature in database
GPG key ID: EF38E4CEC4A8F3CF
6 changed files with 107 additions and 99 deletions

View file

@ -69,7 +69,7 @@ class OpenIdConnectBackend(ModelBackend):
# Overwrite first_name and last_name if not empty string # Overwrite first_name and last_name if not empty string
for key, value in kwargs.items(): for key, value in kwargs.items():
# Check if the key is not first_name or last_name or value is not empty string # Check if the key is not first_name or last_name or value is not empty string
if key not in ['first_name', 'last_name'] or value: if key not in ["first_name", "last_name"] or value:
setattr(user, key, value) setattr(user, key, value)
user.save() user.save()

View file

@ -4,8 +4,8 @@ from django.utils import timezone
from registrar.models import User from registrar.models import User
from ..backends import OpenIdConnectBackend # Adjust the import path based on your project structure from ..backends import OpenIdConnectBackend # Adjust the import path based on your project structure
class OpenIdConnectBackendTestCase(TestCase):
class OpenIdConnectBackendTestCase(TestCase):
def setUp(self): def setUp(self):
self.backend = OpenIdConnectBackend() self.backend = OpenIdConnectBackend()
self.kwargs = { self.kwargs = {
@ -56,10 +56,10 @@ class OpenIdConnectBackendTestCase(TestCase):
"""Test that authenticate updates an existing user if it finds one. """Test that authenticate updates an existing user if it finds one.
For this test, given_name and family_name are not supplied""" For this test, given_name and family_name are not supplied"""
# Create an existing user with the same username and with first and last names # Create an existing user with the same username and with first and last names
existing_user = User.objects.create_user(username="test_user",first_name="John",last_name="Doe") existing_user = User.objects.create_user(username="test_user", first_name="John", last_name="Doe")
# Remove given_name and family_name from the input, self.kwargs # Remove given_name and family_name from the input, self.kwargs
self.kwargs.pop("given_name", None) self.kwargs.pop("given_name", None)
self.kwargs.pop("family_name", None) self.kwargs.pop("family_name", None)
# Ensure that the authenticate method updates the existing user # Ensure that the authenticate method updates the existing user
@ -79,7 +79,7 @@ class OpenIdConnectBackendTestCase(TestCase):
"""Test that authenticate updates an existing user if it finds one. """Test that authenticate updates an existing user if it finds one.
For this test, given_name and family_name are supplied and overwrite""" For this test, given_name and family_name are supplied and overwrite"""
# Create an existing user with the same username and with first and last names # Create an existing user with the same username and with first and last names
existing_user = User.objects.create_user(username="test_user",first_name="WillBe",last_name="Replaced") existing_user = User.objects.create_user(username="test_user", first_name="WillBe", last_name="Replaced")
# Ensure that the authenticate method updates the existing user # Ensure that the authenticate method updates the existing user
# and preserves existing first and last names # and preserves existing first and last names

View file

@ -58,7 +58,7 @@ def openid(request):
request.session["next"] = request.GET.get("next", "/") request.session["next"] = request.GET.get("next", "/")
try: try:
logger.info('openid() calls create_authn_request in oidc') logger.info("openid() calls create_authn_request in oidc")
return CLIENT.create_authn_request(request.session) return CLIENT.create_authn_request(request.session)
except Exception as err: except Exception as err:
return error_page(request, err) return error_page(request, err)
@ -72,22 +72,23 @@ def login_callback(request):
# test for need for identity verification and if it is satisfied # test for need for identity verification and if it is satisfied
# if not satisfied, redirect user to login with stepped up acr_value # if not satisfied, redirect user to login with stepped up acr_value
if requires_step_up_auth(userinfo): if requires_step_up_auth(userinfo):
logger.info('login_callback() calls get_step_up_acr_value and create_authn_request in oidc') logger.info("login_callback() calls get_step_up_acr_value and create_authn_request in oidc")
# add acr_value to request.session # add acr_value to request.session
request.session["acr_value"] = CLIENT.get_step_up_acr_value() request.session["acr_value"] = CLIENT.get_step_up_acr_value()
return CLIENT.create_authn_request(request.session) return CLIENT.create_authn_request(request.session)
logger.info(f'login_callback() before calling authenticate: {userinfo}') logger.info(f"login_callback() before calling authenticate: {userinfo}")
try: try:
user_in_db = User.objects.get(username=userinfo["sub"]) user_in_db = User.objects.get(username=userinfo["sub"])
if user_in_db: if user_in_db:
logger.info(f"This user exists in the DB (before authenticate): {user_in_db.first_name} {user_in_db.last_name}") logger.info(
f"This user exists in the DB (before authenticate): {user_in_db.first_name} {user_in_db.last_name}"
)
except: except:
pass pass
user = authenticate(request=request, **userinfo) user = authenticate(request=request, **userinfo)
if user: if user:
login(request, user) login(request, user)

View file

@ -63,7 +63,7 @@ class Command(BaseCommand):
{TerminalColors.ENDC} {TerminalColors.ENDC}
""" # noqa """ # noqa
) )
# DEBUG: # DEBUG:
TerminalHelper.print_conditional( TerminalHelper.print_conditional(
debug_on, debug_on,
@ -91,7 +91,7 @@ class Command(BaseCommand):
Returns the corresponding User object. Returns the corresponding User object.
""" """
user_exists = User.objects.filter(contact=contact).exists() user_exists = User.objects.filter(contact=contact).exists()
if user_exists: if user_exists:
try: try:
@ -105,7 +105,7 @@ class Command(BaseCommand):
f"""{TerminalColors.YELLOW} f"""{TerminalColors.YELLOW}
> Found linked user for contact: > Found linked user for contact:
{contact} {contact.email} {contact.first_name} {contact.last_name} {contact} {contact.email} {contact.first_name} {contact.last_name}
> The linked user is {eligible_user} > The linked user is {eligible_user} {eligible_user.username}
{TerminalColors.ENDC}""", # noqa {TerminalColors.ENDC}""", # noqa
) )
@ -113,8 +113,10 @@ class Command(BaseCommand):
# ---- LET'S KEEP A LIGHT TOUCH # ---- LET'S KEEP A LIGHT TOUCH
if not eligible_user.first_name or not eligible_user.last_name: if not eligible_user.first_name or not eligible_user.last_name:
processed_user = eligible_user processed_user = eligible_user
processed_user.first_name = contact.first_name # (expression has type "str | None", variable has type "str | int | Combinable")
processed_user.last_name = contact.last_name # so we'll ignore type
processed_user.first_name = contact.first_name # type: ignore
processed_user.last_name = contact.last_name # type: ignore
processed_user.save() processed_user.save()
return ( return (
@ -140,7 +142,7 @@ class Command(BaseCommand):
# ====================================================== # ======================================================
# ================= PROCESS CONTACTS ================== # ================= PROCESS CONTACTS ==================
# ====================================================== # ======================================================
# C901 'Command.handle' is too complex # C901 'Command.handle' is too complex
def process_contacts( def process_contacts(
self, self,
@ -150,7 +152,6 @@ class Command(BaseCommand):
processed_users, processed_users,
): ):
for contact in Contact.objects.all(): for contact in Contact.objects.all():
# DEBUG: # DEBUG:
TerminalHelper.print_conditional( TerminalHelper.print_conditional(
debug_on, debug_on,
@ -208,10 +209,10 @@ class Command(BaseCommand):
# users we SKIPPED # users we SKIPPED
skipped_contacts = [] skipped_contacts = []
# users we found that are linked to contacts # users we found that are linked to contacts
eligible_users = [] eligible_users = []
# users we PROCESSED # users we PROCESSED
processed_users = [] processed_users = []

View file

@ -5,88 +5,96 @@ from registrar.models import (
Contact, Contact,
) )
from django.core.management import call_command
from unittest.mock import patch
from registrar.management.commands.copy_names_from_contacts_to_users import Command from registrar.management.commands.copy_names_from_contacts_to_users import Command
class TestOrganizationMigration(TestCase):
class TestDataUpdates(TestCase):
def setUp(self): def setUp(self):
"""Defines the file name of migration_json and the folder its contained in""" """We cannot setup the user details because contacts will override the first and last names in its save method
so we will initiate the users, setup the contacts and link them, and leave the rest of the setup for the test(s).
"""
# self.user1, _ = User.objects.get_or_create(username="user1")
# self.user2 = User.objects.create(username="user2", first_name="Joey", last_name="") self.user1 = User.objects.create(username="user1")
# self.user3 = User.objects.create(username="user3", first_name="a special first name", last_name="a special last name") self.user2 = User.objects.create(username="user2")
# self.userX = User.objects.create(username="emailX@igorville.gov", first_name="firstX", last_name="lastX") self.user3 = User.objects.create(username="user3")
self.userX = User.objects.create(username="user4")
# self.contact1, _ = Contact.objects.get_or_create(user=self.user1, email="email1@igorville.gov", first_name="first1", last_name="last1") # The last user created triggers the creation of a contact and attaches itself to it. @Neil wth is going on?
# self.contact2 = Contact.objects.create(user=self.user2, email="email2@igorville.gov", first_name="first2", last_name="last2") # This bs_user defuses that situation so we can test the code.
# self.contact3 = Contact.objects.create(user=None, email="email3@igorville.gov", first_name="first3", last_name="last3") self.bs_user = User.objects.create()
# self.contact4 = Contact.objects.create(user=None, email="email4@igorville.gov", first_name="first4", last_name="last4")
self.contact1 = Contact.objects.create(
# self.contact1 = Contact.objects.create(email="email1@igorville.gov", first_name="first1", last_name="last1") user=self.user1, email="email1@igorville.gov", first_name="first1", last_name="last1"
# self.contact2 = Contact.objects.create(email="email2@igorville.gov", first_name="first2", last_name="last2") )
# self.contact3 = Contact.objects.create(email="email3@igorville.gov", first_name="first3", last_name="last3") self.contact2 = Contact.objects.create(
# self.contact4 = Contact.objects.create(email="email4@igorville.gov", first_name="first4", last_name="last4") user=self.user2, email="email2@igorville.gov", first_name="first2", last_name="last2"
)
# self.user1 = User.objects.create(contact=self.contact1) self.contact3 = Contact.objects.create(
# self.user2 = User.objects.create(contact=self.contact2, username="user2", first_name="Joey", last_name="") user=self.user3, email="email3@igorville.gov", first_name="first3", last_name="last3"
# self.user3 = User.objects.create(username="user3", first_name="a special first name", last_name="a special last name") )
# self.userX = User.objects.create(username="emailX@igorville.gov", first_name="firstX", last_name="lastX") self.contact4 = Contact.objects.create(email="email4@igorville.gov", first_name="first4", last_name="last4")
self.command = Command() self.command = Command()
def tearDown(self): def tearDown(self):
"""Deletes all DB objects related to migrations""" """Clean up"""
# Delete users # Delete users and contacts
User.objects.all().delete() User.objects.all().delete()
Contact.objects.all().delete() Contact.objects.all().delete()
def test_script_updates_linked_users(self): def test_script_updates_linked_users(self):
"""Test the script that copies contacts' first and last names into associated users that
user1, _ = User.objects.get_or_create(username="user1") are eligible (first or last are blank or undefined)"""
contact1, _ = Contact.objects.get_or_create(user=user1, email="email1@igorville.gov", first_name="first1", last_name="last1")
# Set up the users' first and last names here so
# they that they don't get overwritten by Contact's save()
# self.user1.first_name = "" # User with no first or last names
# self.user1.last_name = "" self.user1.first_name = ""
# self.user2.last_name = "" self.user1.last_name = ""
# self.user1.save() self.user1.save()
# self.user2.save()
# User with a first name but no last name
# users we SKIPPED self.user2.last_name = ""
self.user2.save()
# User with a first and last name
self.user3.first_name = "An existing first name"
self.user3.last_name = "An existing last name"
self.user3.save()
# Unlinked user
# To make this test useful, we will set the last_name to ""
self.userX.first_name = "Unlinked user's first name"
self.userX.last_name = ""
self.userX.save()
# Call the parent method the same way we do it in the script
skipped_contacts = [] skipped_contacts = []
# users we found that are linked to contacts
eligible_users = [] eligible_users = []
# users we PROCESSED
processed_users = [] processed_users = []
( (
skipped_contacts, skipped_contacts,
eligible_users, eligible_users,
processed_users, processed_users,
) = self.command.process_contacts( ) = self.command.process_contacts(
True, # Set debugging to False
False,
skipped_contacts, skipped_contacts,
eligible_users, eligible_users,
processed_users, processed_users,
) )
# self.user1.refresh_from_db() # Trigger DB refresh
# self.user2.refresh_from_db() self.user1.refresh_from_db()
# self.user3.refresh_from_db() self.user2.refresh_from_db()
# self.userX.refresh_from_db() self.user3.refresh_from_db()
self.userX.refresh_from_db()
self.assertEqual(user1.first_name, "first1")
self.assertEqual(user1.last_name, "last1") # Asserts
# self.assertEqual(self.user2.first_name, "first2") self.assertEqual(self.user1.first_name, "first1")
# self.assertEqual(self.user2.last_name, "last2") self.assertEqual(self.user1.last_name, "last1")
# self.assertEqual(self.user3.first_name, "a special first name") self.assertEqual(self.user2.first_name, "first2")
# self.assertEqual(self.user3.last_name, "a special last name") self.assertEqual(self.user2.last_name, "last2")
# self.assertEqual(self.userX.first_name, "firstX") self.assertEqual(self.user3.first_name, "An existing first name")
# self.assertEqual(self.userX.last_name, "lastX") self.assertEqual(self.user3.last_name, "An existing last name")
self.assertEqual(self.userX.first_name, "Unlinked user's first name")
self.assertEqual(self.userX.last_name, "")

View file

@ -654,10 +654,9 @@ class TestUser(TestCase):
"""A new user who's neither transitioned nor invited should """A new user who's neither transitioned nor invited should
return True when tested with class method needs_identity_verification""" return True when tested with class method needs_identity_verification"""
self.assertTrue(User.needs_identity_verification(self.user.email, self.user.username)) self.assertTrue(User.needs_identity_verification(self.user.email, self.user.username))
class TestContact(TestCase): class TestContact(TestCase):
def setUp(self): def setUp(self):
self.email = "mayor@igorville.gov" self.email = "mayor@igorville.gov"
self.user, _ = User.objects.get_or_create(email=self.email, first_name="Rachid", last_name="Mrad") self.user, _ = User.objects.get_or_create(email=self.email, first_name="Rachid", last_name="Mrad")
@ -675,29 +674,28 @@ class TestContact(TestCase):
self.assertEqual(self.contact.last_name, "Mrad") self.assertEqual(self.contact.last_name, "Mrad")
self.assertEqual(self.user.first_name, "Rachid") self.assertEqual(self.user.first_name, "Rachid")
self.assertEqual(self.user.last_name, "Mrad") self.assertEqual(self.user.last_name, "Mrad")
self.contact.first_name = "Joey" self.contact.first_name = "Joey"
self.contact.last_name = "Baloney" self.contact.last_name = "Baloney"
self.contact.save() self.contact.save()
# Refresh the user object to reflect the changes made in the database # Refresh the user object to reflect the changes made in the database
self.user.refresh_from_db() self.user.refresh_from_db()
# Updating the contact's first and last names propagate to the user # Updating the contact's first and last names propagate to the user
self.assertEqual(self.contact.first_name, "Joey") self.assertEqual(self.contact.first_name, "Joey")
self.assertEqual(self.contact.last_name, "Baloney") self.assertEqual(self.contact.last_name, "Baloney")
self.assertEqual(self.user.first_name, "Joey") self.assertEqual(self.user.first_name, "Joey")
self.assertEqual(self.user.last_name, "Baloney") self.assertEqual(self.user.last_name, "Baloney")
def test_saving_contact_does_not_update_user_email(self): def test_saving_contact_does_not_update_user_email(self):
"""When a contact's email is updated, the change is not propagated to the lined user.""" """When a contact's email is updated, the change is not propagated to the lined user."""
self.contact.email = "joey.baloney@diaperville.com" self.contact.email = "joey.baloney@diaperville.com"
self.contact.save() self.contact.save()
# Refresh the user object to reflect the changes made in the database # Refresh the user object to reflect the changes made in the database
self.user.refresh_from_db() self.user.refresh_from_db()
# Updating the contact's email does not propagate # Updating the contact's email does not propagate
self.assertEqual(self.contact.email, "joey.baloney@diaperville.com") self.assertEqual(self.contact.email, "joey.baloney@diaperville.com")
self.assertEqual(self.user.email, "mayor@igorville.gov") self.assertEqual(self.user.email, "mayor@igorville.gov")