mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-08-16 22:44:11 +02:00
Merge pull request #1491 from cisagov/rjm/1464-auth-contact-user-refactor
Issues 1464, 1468: Auth / contact / user refactor to solve issues introduced by switch to ial1 for some users (RJM sandbox).
This commit is contained in:
commit
5e49772ef7
8 changed files with 514 additions and 10 deletions
|
@ -46,8 +46,14 @@ class OpenIdConnectBackend(ModelBackend):
|
||||||
# defaults _will_ be updated, these are not fallbacks
|
# defaults _will_ be updated, these are not fallbacks
|
||||||
"defaults": openid_data,
|
"defaults": openid_data,
|
||||||
}
|
}
|
||||||
user, created = UserModel.objects.update_or_create(**args)
|
|
||||||
if created:
|
user, created = UserModel.objects.get_or_create(**args)
|
||||||
|
|
||||||
|
if not created:
|
||||||
|
# If user exists, update existing user
|
||||||
|
self.update_existing_user(user, args["defaults"])
|
||||||
|
else:
|
||||||
|
# If user is created, configure the user
|
||||||
user = self.configure_user(user, **kwargs)
|
user = self.configure_user(user, **kwargs)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
|
@ -58,6 +64,16 @@ class OpenIdConnectBackend(ModelBackend):
|
||||||
user.on_each_login()
|
user.on_each_login()
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
def update_existing_user(self, user, kwargs):
|
||||||
|
"""Update other fields without overwriting first_name and last_name.
|
||||||
|
Overwrite first_name and last_name if not empty string"""
|
||||||
|
|
||||||
|
for key, value in kwargs.items():
|
||||||
|
# Check if the key is not first_name or last_name or value is not empty string
|
||||||
|
if key not in ["first_name", "last_name"] or value:
|
||||||
|
setattr(user, key, value)
|
||||||
|
user.save()
|
||||||
|
|
||||||
def clean_username(self, username):
|
def clean_username(self, username):
|
||||||
"""
|
"""
|
||||||
Performs any cleaning on the "username" prior to using it to get or
|
Performs any cleaning on the "username" prior to using it to get or
|
||||||
|
|
99
src/djangooidc/tests/test_backends.py
Normal file
99
src/djangooidc/tests/test_backends.py
Normal file
|
@ -0,0 +1,99 @@
|
||||||
|
from django.test import TestCase
|
||||||
|
from registrar.models import User
|
||||||
|
from ..backends import OpenIdConnectBackend # Adjust the import path based on your project structure
|
||||||
|
|
||||||
|
|
||||||
|
class OpenIdConnectBackendTestCase(TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.backend = OpenIdConnectBackend()
|
||||||
|
self.kwargs = {
|
||||||
|
"sub": "test_user",
|
||||||
|
"given_name": "John",
|
||||||
|
"family_name": "Doe",
|
||||||
|
"email": "john.doe@example.com",
|
||||||
|
"phone": "123456789",
|
||||||
|
}
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
User.objects.all().delete()
|
||||||
|
|
||||||
|
def test_authenticate_with_create_user(self):
|
||||||
|
"""Test that authenticate creates a new user if it does not find
|
||||||
|
existing user"""
|
||||||
|
# Ensure that the authenticate method creates a new user
|
||||||
|
user = self.backend.authenticate(request=None, **self.kwargs)
|
||||||
|
self.assertIsNotNone(user)
|
||||||
|
self.assertIsInstance(user, User)
|
||||||
|
self.assertEqual(user.username, "test_user")
|
||||||
|
|
||||||
|
# Verify that user fields are correctly set
|
||||||
|
self.assertEqual(user.first_name, "John")
|
||||||
|
self.assertEqual(user.last_name, "Doe")
|
||||||
|
self.assertEqual(user.email, "john.doe@example.com")
|
||||||
|
self.assertEqual(user.phone, "123456789")
|
||||||
|
|
||||||
|
def test_authenticate_with_existing_user(self):
|
||||||
|
"""Test that authenticate updates an existing user if it finds one.
|
||||||
|
For this test, given_name and family_name are supplied"""
|
||||||
|
# Create an existing user with the same username
|
||||||
|
existing_user = User.objects.create_user(username="test_user")
|
||||||
|
|
||||||
|
# Ensure that the authenticate method updates the existing user
|
||||||
|
user = self.backend.authenticate(request=None, **self.kwargs)
|
||||||
|
self.assertIsNotNone(user)
|
||||||
|
self.assertIsInstance(user, User)
|
||||||
|
self.assertEqual(user, existing_user) # The same user instance should be returned
|
||||||
|
|
||||||
|
# Verify that user fields are correctly updated
|
||||||
|
self.assertEqual(user.first_name, "John")
|
||||||
|
self.assertEqual(user.last_name, "Doe")
|
||||||
|
self.assertEqual(user.email, "john.doe@example.com")
|
||||||
|
self.assertEqual(user.phone, "123456789")
|
||||||
|
|
||||||
|
def test_authenticate_with_existing_user_no_name(self):
|
||||||
|
"""Test that authenticate updates an existing user if it finds one.
|
||||||
|
For this test, given_name and family_name are not supplied"""
|
||||||
|
# Create an existing user with the same username and with first and last names
|
||||||
|
existing_user = User.objects.create_user(username="test_user", first_name="John", last_name="Doe")
|
||||||
|
|
||||||
|
# Remove given_name and family_name from the input, self.kwargs
|
||||||
|
self.kwargs.pop("given_name", None)
|
||||||
|
self.kwargs.pop("family_name", None)
|
||||||
|
|
||||||
|
# Ensure that the authenticate method updates the existing user
|
||||||
|
# and preserves existing first and last names
|
||||||
|
user = self.backend.authenticate(request=None, **self.kwargs)
|
||||||
|
self.assertIsNotNone(user)
|
||||||
|
self.assertIsInstance(user, User)
|
||||||
|
self.assertEqual(user, existing_user) # The same user instance should be returned
|
||||||
|
|
||||||
|
# Verify that user fields are correctly updated
|
||||||
|
self.assertEqual(user.first_name, "John")
|
||||||
|
self.assertEqual(user.last_name, "Doe")
|
||||||
|
self.assertEqual(user.email, "john.doe@example.com")
|
||||||
|
self.assertEqual(user.phone, "123456789")
|
||||||
|
|
||||||
|
def test_authenticate_with_existing_user_different_name(self):
|
||||||
|
"""Test that authenticate updates an existing user if it finds one.
|
||||||
|
For this test, given_name and family_name are supplied and overwrite"""
|
||||||
|
# Create an existing user with the same username and with first and last names
|
||||||
|
existing_user = User.objects.create_user(username="test_user", first_name="WillBe", last_name="Replaced")
|
||||||
|
|
||||||
|
# Ensure that the authenticate method updates the existing user
|
||||||
|
# and preserves existing first and last names
|
||||||
|
user = self.backend.authenticate(request=None, **self.kwargs)
|
||||||
|
self.assertIsNotNone(user)
|
||||||
|
self.assertIsInstance(user, User)
|
||||||
|
self.assertEqual(user, existing_user) # The same user instance should be returned
|
||||||
|
|
||||||
|
# Verify that user fields are correctly updated
|
||||||
|
self.assertEqual(user.first_name, "John")
|
||||||
|
self.assertEqual(user.last_name, "Doe")
|
||||||
|
self.assertEqual(user.email, "john.doe@example.com")
|
||||||
|
self.assertEqual(user.phone, "123456789")
|
||||||
|
|
||||||
|
def test_authenticate_with_unknown_user(self):
|
||||||
|
"""Test that authenticate returns None when no kwargs are supplied"""
|
||||||
|
# Ensure that the authenticate method handles the case when the user is not found
|
||||||
|
user = self.backend.authenticate(request=None, **{})
|
||||||
|
self.assertIsNone(user)
|
|
@ -0,0 +1,235 @@
|
||||||
|
import logging
|
||||||
|
import argparse
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from django.core.management import BaseCommand
|
||||||
|
|
||||||
|
from registrar.management.commands.utility.terminal_helper import (
|
||||||
|
TerminalColors,
|
||||||
|
TerminalHelper,
|
||||||
|
)
|
||||||
|
from registrar.models.contact import Contact
|
||||||
|
from registrar.models.user import User
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class Command(BaseCommand):
|
||||||
|
help = """Copy first and last names from a contact to
|
||||||
|
a related user if it exists and if its first and last name
|
||||||
|
properties are null or blank strings."""
|
||||||
|
|
||||||
|
# ======================================================
|
||||||
|
# ===================== ARGUMENTS =====================
|
||||||
|
# ======================================================
|
||||||
|
def add_arguments(self, parser):
|
||||||
|
parser.add_argument("--debug", action=argparse.BooleanOptionalAction)
|
||||||
|
|
||||||
|
# ======================================================
|
||||||
|
# ===================== PRINTING ======================
|
||||||
|
# ======================================================
|
||||||
|
def print_debug_mode_statements(self, debug_on: bool):
|
||||||
|
"""Prints additional terminal statements to indicate if --debug
|
||||||
|
or --limitParse are in use"""
|
||||||
|
TerminalHelper.print_conditional(
|
||||||
|
debug_on,
|
||||||
|
f"""{TerminalColors.OKCYAN}
|
||||||
|
----------DEBUG MODE ON----------
|
||||||
|
Detailed print statements activated.
|
||||||
|
{TerminalColors.ENDC}
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
def print_summary_of_findings(
|
||||||
|
self,
|
||||||
|
skipped_contacts,
|
||||||
|
eligible_users,
|
||||||
|
processed_users,
|
||||||
|
debug_on,
|
||||||
|
):
|
||||||
|
"""Prints to terminal a summary of findings from
|
||||||
|
copying first and last names from contacts to users"""
|
||||||
|
|
||||||
|
total_eligible_users = len(eligible_users)
|
||||||
|
total_skipped_contacts = len(skipped_contacts)
|
||||||
|
total_processed_users = len(processed_users)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"""{TerminalColors.OKGREEN}
|
||||||
|
============= FINISHED ===============
|
||||||
|
Skipped {total_skipped_contacts} contacts
|
||||||
|
Found {total_eligible_users} users linked to contacts
|
||||||
|
Processed {total_processed_users} users
|
||||||
|
{TerminalColors.ENDC}
|
||||||
|
""" # noqa
|
||||||
|
)
|
||||||
|
|
||||||
|
# DEBUG:
|
||||||
|
TerminalHelper.print_conditional(
|
||||||
|
debug_on,
|
||||||
|
f"""{TerminalColors.YELLOW}
|
||||||
|
======= DEBUG OUTPUT =======
|
||||||
|
Users who have a linked contact:
|
||||||
|
{eligible_users}
|
||||||
|
|
||||||
|
Processed users (users who have a linked contact and a missing first or last name):
|
||||||
|
{processed_users}
|
||||||
|
|
||||||
|
===== SKIPPED CONTACTS =====
|
||||||
|
{skipped_contacts}
|
||||||
|
|
||||||
|
{TerminalColors.ENDC}
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
# ======================================================
|
||||||
|
# =================== USER =====================
|
||||||
|
# ======================================================
|
||||||
|
def update_user(self, contact: Contact, debug_on: bool):
|
||||||
|
"""Given a contact with a first_name and last_name, find & update an existing
|
||||||
|
corresponding user if her first_name and last_name are null.
|
||||||
|
|
||||||
|
Returns tuple of eligible (is linked to the contact) and processed
|
||||||
|
(first and last are blank) users.
|
||||||
|
"""
|
||||||
|
|
||||||
|
user_exists = User.objects.filter(contact=contact).exists()
|
||||||
|
if user_exists:
|
||||||
|
try:
|
||||||
|
# ----------------------- UPDATE USER -----------------------
|
||||||
|
# ---- GET THE USER
|
||||||
|
eligible_user = User.objects.get(contact=contact)
|
||||||
|
processed_user = None
|
||||||
|
# DEBUG:
|
||||||
|
TerminalHelper.print_conditional(
|
||||||
|
debug_on,
|
||||||
|
f"""{TerminalColors.YELLOW}
|
||||||
|
> Found linked user for contact:
|
||||||
|
{contact} {contact.email} {contact.first_name} {contact.last_name}
|
||||||
|
> The linked user is {eligible_user} {eligible_user.username}
|
||||||
|
{TerminalColors.ENDC}""", # noqa
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- UPDATE THE USER IF IT DOES NOT HAVE A FIRST AND LAST NAMES
|
||||||
|
# ---- LET'S KEEP A LIGHT TOUCH
|
||||||
|
if not eligible_user.first_name and not eligible_user.last_name:
|
||||||
|
# (expression has type "str | None", variable has type "str | int | Combinable")
|
||||||
|
# so we'll ignore type
|
||||||
|
eligible_user.first_name = contact.first_name # type: ignore
|
||||||
|
eligible_user.last_name = contact.last_name # type: ignore
|
||||||
|
eligible_user.save()
|
||||||
|
processed_user = eligible_user
|
||||||
|
|
||||||
|
return (
|
||||||
|
eligible_user,
|
||||||
|
processed_user,
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as error:
|
||||||
|
logger.warning(
|
||||||
|
f"""
|
||||||
|
{TerminalColors.FAIL}
|
||||||
|
!!! ERROR: An exception occured in the
|
||||||
|
User table for the following user:
|
||||||
|
{contact.email} {contact.first_name} {contact.last_name}
|
||||||
|
|
||||||
|
Exception is: {error}
|
||||||
|
----------TERMINATING----------"""
|
||||||
|
)
|
||||||
|
sys.exit()
|
||||||
|
else:
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
# ======================================================
|
||||||
|
# ================= PROCESS CONTACTS ==================
|
||||||
|
# ======================================================
|
||||||
|
|
||||||
|
def process_contacts(
|
||||||
|
self,
|
||||||
|
debug_on,
|
||||||
|
skipped_contacts=[],
|
||||||
|
eligible_users=[],
|
||||||
|
processed_users=[],
|
||||||
|
):
|
||||||
|
for contact in Contact.objects.all():
|
||||||
|
TerminalHelper.print_conditional(
|
||||||
|
debug_on,
|
||||||
|
f"{TerminalColors.OKCYAN}"
|
||||||
|
"Processing Contact: "
|
||||||
|
f"{contact.email},"
|
||||||
|
f" {contact.first_name},"
|
||||||
|
f" {contact.last_name}"
|
||||||
|
f"{TerminalColors.ENDC}",
|
||||||
|
)
|
||||||
|
|
||||||
|
# ======================================================
|
||||||
|
# ====================== USER =======================
|
||||||
|
(eligible_user, processed_user) = self.update_user(contact, debug_on)
|
||||||
|
|
||||||
|
debug_string = ""
|
||||||
|
if eligible_user:
|
||||||
|
# ---------------- UPDATED ----------------
|
||||||
|
eligible_users.append(contact.email)
|
||||||
|
debug_string = f"eligible user: {eligible_user}"
|
||||||
|
if processed_user:
|
||||||
|
processed_users.append(contact.email)
|
||||||
|
debug_string = f"processed user: {processed_user}"
|
||||||
|
else:
|
||||||
|
skipped_contacts.append(contact.email)
|
||||||
|
debug_string = f"skipped user: {contact.email}"
|
||||||
|
|
||||||
|
# DEBUG:
|
||||||
|
TerminalHelper.print_conditional(
|
||||||
|
debug_on,
|
||||||
|
(f"{TerminalColors.OKCYAN} {debug_string} {TerminalColors.ENDC}"),
|
||||||
|
)
|
||||||
|
|
||||||
|
return (
|
||||||
|
skipped_contacts,
|
||||||
|
eligible_users,
|
||||||
|
processed_users,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ======================================================
|
||||||
|
# ===================== HANDLE ========================
|
||||||
|
# ======================================================
|
||||||
|
def handle(
|
||||||
|
self,
|
||||||
|
**options,
|
||||||
|
):
|
||||||
|
"""Parse entries in Contact table
|
||||||
|
and update valid corresponding entries in the
|
||||||
|
User table."""
|
||||||
|
|
||||||
|
# grab command line arguments and store locally...
|
||||||
|
debug_on = options.get("debug")
|
||||||
|
|
||||||
|
self.print_debug_mode_statements(debug_on)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"""{TerminalColors.OKCYAN}
|
||||||
|
==========================
|
||||||
|
Beginning Data Transfer
|
||||||
|
==========================
|
||||||
|
{TerminalColors.ENDC}"""
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"""{TerminalColors.OKCYAN}
|
||||||
|
========= Adding Domains and Domain Invitations =========
|
||||||
|
{TerminalColors.ENDC}"""
|
||||||
|
)
|
||||||
|
(
|
||||||
|
skipped_contacts,
|
||||||
|
eligible_users,
|
||||||
|
processed_users,
|
||||||
|
) = self.process_contacts(
|
||||||
|
debug_on,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.print_summary_of_findings(
|
||||||
|
skipped_contacts,
|
||||||
|
eligible_users,
|
||||||
|
processed_users,
|
||||||
|
debug_on,
|
||||||
|
)
|
|
@ -59,6 +59,16 @@ class Contact(TimeStampedModel):
|
||||||
names = [n for n in [self.first_name, self.middle_name, self.last_name] if n]
|
names = [n for n in [self.first_name, self.middle_name, self.last_name] if n]
|
||||||
return " ".join(names) if names else "Unknown"
|
return " ".join(names) if names else "Unknown"
|
||||||
|
|
||||||
|
def save(self, *args, **kwargs):
|
||||||
|
# Call the parent class's save method to perform the actual save
|
||||||
|
super().save(*args, **kwargs)
|
||||||
|
|
||||||
|
# Update the related User object's first_name and last_name
|
||||||
|
if self.user:
|
||||||
|
self.user.first_name = self.first_name
|
||||||
|
self.user.last_name = self.last_name
|
||||||
|
self.user.save()
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
if self.first_name or self.last_name:
|
if self.first_name or self.last_name:
|
||||||
return self.get_formatted_name()
|
return self.get_formatted_name()
|
||||||
|
|
|
@ -5,6 +5,7 @@ import logging
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
import random
|
import random
|
||||||
from string import ascii_uppercase
|
from string import ascii_uppercase
|
||||||
|
import uuid
|
||||||
from django.test import TestCase
|
from django.test import TestCase
|
||||||
from unittest.mock import MagicMock, Mock, patch
|
from unittest.mock import MagicMock, Mock, patch
|
||||||
from typing import List, Dict
|
from typing import List, Dict
|
||||||
|
@ -161,7 +162,7 @@ class AuditedAdminMockData:
|
||||||
user = User.objects.get_or_create(
|
user = User.objects.get_or_create(
|
||||||
first_name="{} first_name:{}".format(item_name, short_hand),
|
first_name="{} first_name:{}".format(item_name, short_hand),
|
||||||
last_name="{} last_name:{}".format(item_name, short_hand),
|
last_name="{} last_name:{}".format(item_name, short_hand),
|
||||||
username="{} username:{}".format(item_name, short_hand),
|
username="{} username:{}".format(item_name + str(uuid.uuid4())[:8], short_hand),
|
||||||
)[0]
|
)[0]
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
@ -405,8 +406,8 @@ def mock_user():
|
||||||
"""A simple user."""
|
"""A simple user."""
|
||||||
user_kwargs = dict(
|
user_kwargs = dict(
|
||||||
id=4,
|
id=4,
|
||||||
first_name="Rachid",
|
first_name="Jeff",
|
||||||
last_name="Mrad",
|
last_name="Lebowski",
|
||||||
)
|
)
|
||||||
mock_user, _ = User.objects.get_or_create(**user_kwargs)
|
mock_user, _ = User.objects.get_or_create(**user_kwargs)
|
||||||
return mock_user
|
return mock_user
|
||||||
|
|
|
@ -1023,7 +1023,7 @@ class ListHeaderAdminTest(TestCase):
|
||||||
# Set the GET parameters for testing
|
# Set the GET parameters for testing
|
||||||
request.GET = {
|
request.GET = {
|
||||||
"status": "started",
|
"status": "started",
|
||||||
"investigator": "Rachid Mrad",
|
"investigator": "Jeff Lebowski",
|
||||||
"q": "search_value",
|
"q": "search_value",
|
||||||
}
|
}
|
||||||
# Call the get_filters method
|
# Call the get_filters method
|
||||||
|
@ -1034,7 +1034,7 @@ class ListHeaderAdminTest(TestCase):
|
||||||
filters,
|
filters,
|
||||||
[
|
[
|
||||||
{"parameter_name": "status", "parameter_value": "started"},
|
{"parameter_name": "status", "parameter_value": "started"},
|
||||||
{"parameter_name": "investigator", "parameter_value": "Rachid Mrad"},
|
{"parameter_name": "investigator", "parameter_value": "Jeff Lebowski"},
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1110,8 +1110,8 @@ class AuditedAdminTest(TestCase):
|
||||||
tested_fields = [
|
tested_fields = [
|
||||||
DomainApplication.authorizing_official.field,
|
DomainApplication.authorizing_official.field,
|
||||||
DomainApplication.submitter.field,
|
DomainApplication.submitter.field,
|
||||||
DomainApplication.investigator.field,
|
# DomainApplication.investigator.field,
|
||||||
DomainApplication.creator.field,
|
# DomainApplication.creator.field,
|
||||||
DomainApplication.requested_domain.field,
|
DomainApplication.requested_domain.field,
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -1166,7 +1166,7 @@ class AuditedAdminTest(TestCase):
|
||||||
tested_fields = [
|
tested_fields = [
|
||||||
DomainInformation.authorizing_official.field,
|
DomainInformation.authorizing_official.field,
|
||||||
DomainInformation.submitter.field,
|
DomainInformation.submitter.field,
|
||||||
DomainInformation.creator.field,
|
# DomainInformation.creator.field,
|
||||||
(DomainInformation.domain.field, ["name"]),
|
(DomainInformation.domain.field, ["name"]),
|
||||||
(DomainInformation.domain_application.field, ["requested_domain__name"]),
|
(DomainInformation.domain_application.field, ["requested_domain__name"]),
|
||||||
]
|
]
|
||||||
|
|
|
@ -0,0 +1,98 @@
|
||||||
|
from django.test import TestCase
|
||||||
|
|
||||||
|
from registrar.models import (
|
||||||
|
User,
|
||||||
|
Contact,
|
||||||
|
)
|
||||||
|
|
||||||
|
from registrar.management.commands.copy_names_from_contacts_to_users import Command
|
||||||
|
|
||||||
|
|
||||||
|
class TestDataUpdates(TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
"""We cannot setup the user details because contacts will override the first and last names in its save method
|
||||||
|
so we will initiate the users, setup the contacts and link them, and leave the rest of the setup to the test(s).
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.user1 = User.objects.create(username="user1")
|
||||||
|
self.user2 = User.objects.create(username="user2")
|
||||||
|
self.user3 = User.objects.create(username="user3")
|
||||||
|
self.user4 = User.objects.create(username="user4")
|
||||||
|
# The last user created triggers the creation of a contact and attaches itself to it. @Neil wth is going on?
|
||||||
|
# This bs_user defuses that situation so we can test the code.
|
||||||
|
self.bs_user = User.objects.create()
|
||||||
|
|
||||||
|
self.contact1 = Contact.objects.create(
|
||||||
|
user=self.user1, email="email1@igorville.gov", first_name="first1", last_name="last1"
|
||||||
|
)
|
||||||
|
self.contact2 = Contact.objects.create(
|
||||||
|
user=self.user2, email="email2@igorville.gov", first_name="first2", last_name="last2"
|
||||||
|
)
|
||||||
|
self.contact3 = Contact.objects.create(
|
||||||
|
user=self.user3, email="email3@igorville.gov", first_name="first3", last_name="last3"
|
||||||
|
)
|
||||||
|
self.contact4 = Contact.objects.create(email="email4@igorville.gov", first_name="first4", last_name="last4")
|
||||||
|
|
||||||
|
self.command = Command()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
"""Clean up"""
|
||||||
|
# Delete users and contacts
|
||||||
|
User.objects.all().delete()
|
||||||
|
Contact.objects.all().delete()
|
||||||
|
|
||||||
|
def test_script_updates_linked_users(self):
|
||||||
|
"""Test the script that copies contacts' first and last names into associated users that
|
||||||
|
are eligible (first or last are blank or undefined)"""
|
||||||
|
|
||||||
|
# Set up the users' first and last names here so
|
||||||
|
# they that they don't get overwritten by Contact's save()
|
||||||
|
# User with no first or last names
|
||||||
|
self.user1.first_name = ""
|
||||||
|
self.user1.last_name = ""
|
||||||
|
self.user1.save()
|
||||||
|
|
||||||
|
# User with a first name but no last name
|
||||||
|
self.user2.first_name = "First name but no last name"
|
||||||
|
self.user2.last_name = ""
|
||||||
|
self.user2.save()
|
||||||
|
|
||||||
|
# User with a first and last name
|
||||||
|
self.user3.first_name = "An existing first name"
|
||||||
|
self.user3.last_name = "An existing last name"
|
||||||
|
self.user3.save()
|
||||||
|
|
||||||
|
# Call the parent method the same way we do it in the script
|
||||||
|
skipped_contacts = []
|
||||||
|
eligible_users = []
|
||||||
|
processed_users = []
|
||||||
|
(
|
||||||
|
skipped_contacts,
|
||||||
|
eligible_users,
|
||||||
|
processed_users,
|
||||||
|
) = self.command.process_contacts(
|
||||||
|
# Set debugging to False
|
||||||
|
False,
|
||||||
|
skipped_contacts,
|
||||||
|
eligible_users,
|
||||||
|
processed_users,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Trigger DB refresh
|
||||||
|
self.user1.refresh_from_db()
|
||||||
|
self.user2.refresh_from_db()
|
||||||
|
self.user3.refresh_from_db()
|
||||||
|
|
||||||
|
# Asserts
|
||||||
|
# The user that has no first and last names will get them from the contact
|
||||||
|
self.assertEqual(self.user1.first_name, "first1")
|
||||||
|
self.assertEqual(self.user1.last_name, "last1")
|
||||||
|
# The user that has a first but no last will be left alone
|
||||||
|
self.assertEqual(self.user2.first_name, "First name but no last name")
|
||||||
|
self.assertEqual(self.user2.last_name, "")
|
||||||
|
# The user that has a first and a last will be left alone
|
||||||
|
self.assertEqual(self.user3.first_name, "An existing first name")
|
||||||
|
self.assertEqual(self.user3.last_name, "An existing last name")
|
||||||
|
# The unlinked user will be left alone
|
||||||
|
self.assertEqual(self.user4.first_name, "")
|
||||||
|
self.assertEqual(self.user4.last_name, "")
|
|
@ -668,3 +668,48 @@ class TestUser(TestCase):
|
||||||
# if check_domain_invitations_on_login properly matches exactly one
|
# if check_domain_invitations_on_login properly matches exactly one
|
||||||
# Domain Invitation, then save routine should be called exactly once
|
# Domain Invitation, then save routine should be called exactly once
|
||||||
save_mock.assert_called_once()
|
save_mock.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
class TestContact(TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.email = "mayor@igorville.gov"
|
||||||
|
self.user, _ = User.objects.get_or_create(email=self.email, first_name="Jeff", last_name="Lebowski")
|
||||||
|
self.contact, _ = Contact.objects.get_or_create(user=self.user)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super().tearDown()
|
||||||
|
Contact.objects.all().delete()
|
||||||
|
User.objects.all().delete()
|
||||||
|
|
||||||
|
def test_saving_contact_updates_user_first_last_names(self):
|
||||||
|
"""When a contact is updated, we propagate the changes to the linked user if it exists."""
|
||||||
|
# User and Contact are created and linked as expected
|
||||||
|
self.assertEqual(self.contact.first_name, "Jeff")
|
||||||
|
self.assertEqual(self.contact.last_name, "Lebowski")
|
||||||
|
self.assertEqual(self.user.first_name, "Jeff")
|
||||||
|
self.assertEqual(self.user.last_name, "Lebowski")
|
||||||
|
|
||||||
|
self.contact.first_name = "Joey"
|
||||||
|
self.contact.last_name = "Baloney"
|
||||||
|
self.contact.save()
|
||||||
|
|
||||||
|
# Refresh the user object to reflect the changes made in the database
|
||||||
|
self.user.refresh_from_db()
|
||||||
|
|
||||||
|
# Updating the contact's first and last names propagate to the user
|
||||||
|
self.assertEqual(self.contact.first_name, "Joey")
|
||||||
|
self.assertEqual(self.contact.last_name, "Baloney")
|
||||||
|
self.assertEqual(self.user.first_name, "Joey")
|
||||||
|
self.assertEqual(self.user.last_name, "Baloney")
|
||||||
|
|
||||||
|
def test_saving_contact_does_not_update_user_email(self):
|
||||||
|
"""When a contact's email is updated, the change is not propagated to the lined user."""
|
||||||
|
self.contact.email = "joey.baloney@diaperville.com"
|
||||||
|
self.contact.save()
|
||||||
|
|
||||||
|
# Refresh the user object to reflect the changes made in the database
|
||||||
|
self.user.refresh_from_db()
|
||||||
|
|
||||||
|
# Updating the contact's email does not propagate
|
||||||
|
self.assertEqual(self.contact.email, "joey.baloney@diaperville.com")
|
||||||
|
self.assertEqual(self.user.email, "mayor@igorville.gov")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue