Implement VIP table and fix 401 login bug

This commit is contained in:
Rachid Mrad 2024-01-18 19:58:53 -05:00
parent 7077519837
commit 510da21934
No known key found for this signature in database
GPG key ID: EF38E4CEC4A8F3CF
11 changed files with 175 additions and 1 deletions

View file

@ -33,6 +33,10 @@ class AuthenticationFailed(OIDCException):
friendly_message = "This login attempt didn't work." friendly_message = "This login attempt didn't work."
class NoStateDefined(OIDCException):
friendly_message = "The session state is None."
class InternalError(OIDCException): class InternalError(OIDCException):
status = status.INTERNAL_SERVER_ERROR status = status.INTERNAL_SERVER_ERROR
friendly_message = "The system broke while trying to log you in." friendly_message = "The system broke while trying to log you in."

View file

@ -183,6 +183,8 @@ class Client(oic.Client):
if authn_response["state"] != session.get("state", None): if authn_response["state"] != session.get("state", None):
# this most likely means the user's Django session vanished # this most likely means the user's Django session vanished
logger.error("Received state not the same as expected for %s" % state) logger.error("Received state not the same as expected for %s" % state)
if session.get("state", None) is None:
raise o_e.NoStateDefined()
raise o_e.AuthenticationFailed(locator=state) raise o_e.AuthenticationFailed(locator=state)
if self.behaviour.get("response_type") == "code": if self.behaviour.get("response_type") == "code":
@ -272,6 +274,11 @@ class Client(oic.Client):
super(Client, self).store_response(resp, info) super(Client, self).store_response(resp, info)
def get_default_acr_value(self):
"""returns the acr_value from settings
this helper function is called from djangooidc views"""
return self.behaviour.get("acr_value")
def get_step_up_acr_value(self): def get_step_up_acr_value(self):
"""returns the step_up_acr_value from settings """returns the step_up_acr_value from settings
this helper function is called from djangooidc views""" this helper function is called from djangooidc views"""

View file

@ -3,6 +3,8 @@ from unittest.mock import MagicMock, patch
from django.http import HttpResponse from django.http import HttpResponse
from django.test import Client, TestCase, RequestFactory from django.test import Client, TestCase, RequestFactory
from django.urls import reverse from django.urls import reverse
from djangooidc.exceptions import NoStateDefined
from ..views import login_callback from ..views import login_callback
from .common import less_console_noise from .common import less_console_noise
@ -17,6 +19,9 @@ class ViewsTest(TestCase):
def say_hi(*args): def say_hi(*args):
return HttpResponse("Hi") return HttpResponse("Hi")
def create_acr(*args):
return "any string"
def user_info(*args): def user_info(*args):
return { return {
"sub": "TEST", "sub": "TEST",
@ -34,6 +39,7 @@ class ViewsTest(TestCase):
callback_url = reverse("openid_login_callback") callback_url = reverse("openid_login_callback")
# mock # mock
mock_client.create_authn_request.side_effect = self.say_hi mock_client.create_authn_request.side_effect = self.say_hi
mock_client.get_default_acr_value.side_effect = self.create_acr
# test # test
response = self.client.get(reverse("login"), {"next": callback_url}) response = self.client.get(reverse("login"), {"next": callback_url})
# assert # assert
@ -53,6 +59,19 @@ class ViewsTest(TestCase):
self.assertTemplateUsed(response, "500.html") self.assertTemplateUsed(response, "500.html")
self.assertIn("Server error", response.content.decode("utf-8")) self.assertIn("Server error", response.content.decode("utf-8"))
def test_callback_with_no_session_state(self, mock_client):
"""If the local session is None (ie the server restarted while user was logged out),
we do not throw an exception. Rather, we attempt to login again."""
# mock
mock_client.get_default_acr_value.side_effect = self.create_acr
mock_client.callback.side_effect = NoStateDefined()
# test
with less_console_noise():
response = self.client.get(reverse("openid_login_callback"))
# assert
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, "/")
def test_login_callback_reads_next(self, mock_client): def test_login_callback_reads_next(self, mock_client):
# setup # setup
session = self.client.session session = self.client.session

View file

@ -55,6 +55,10 @@ def error_page(request, error):
def openid(request): def openid(request):
"""Redirect the user to an authentication provider (OP).""" """Redirect the user to an authentication provider (OP)."""
# If the session reset because of a server restart, attempt to login again
request.session["acr_value"] = CLIENT.get_default_acr_value()
request.session["next"] = request.GET.get("next", "/") request.session["next"] = request.GET.get("next", "/")
try: try:
@ -78,9 +82,13 @@ def login_callback(request):
if user: if user:
login(request, user) login(request, user)
logger.info("Successfully logged in user %s" % user) logger.info("Successfully logged in user %s" % user)
# Double login bug?
return redirect(request.session.get("next", "/")) return redirect(request.session.get("next", "/"))
else: else:
raise o_e.BannedUser() raise o_e.BannedUser()
except o_e.NoStateDefined as nsd_err:
logger.debug(f"No State Defined: {nsd_err}")
return redirect(request.session.get("next", "/"))
except Exception as err: except Exception as err:
return error_page(request, err) return error_page(request, err)

View file

@ -1239,6 +1239,28 @@ class DraftDomainAdmin(ListHeaderAdmin):
search_help_text = "Search by draft domain name." search_help_text = "Search by draft domain name."
class VeryImportantPersonAdmin(ListHeaderAdmin):
list_display = ("email", "custom_user_label", "notes", "created_at")
search_fields = ["email"]
search_help_text = "Search by email."
list_filter = [
"user",
]
readonly_fields = [
"user",
]
def custom_user_label(self, obj):
return obj.user
custom_user_label.short_description = "Requestor" # type: ignore
def save_model(self, request, obj, form, change):
# Set the user field to the current admin user
obj.user = request.user if request.user.is_authenticated else None
super().save_model(request, obj, form, change)
admin.site.unregister(LogEntry) # Unregister the default registration admin.site.unregister(LogEntry) # Unregister the default registration
admin.site.register(LogEntry, CustomLogEntryAdmin) admin.site.register(LogEntry, CustomLogEntryAdmin)
admin.site.register(models.User, MyUserAdmin) admin.site.register(models.User, MyUserAdmin)
@ -1259,3 +1281,4 @@ admin.site.register(models.Website, WebsiteAdmin)
admin.site.register(models.PublicContact, AuditedAdmin) admin.site.register(models.PublicContact, AuditedAdmin)
admin.site.register(models.DomainApplication, DomainApplicationAdmin) admin.site.register(models.DomainApplication, DomainApplicationAdmin)
admin.site.register(models.TransitionDomain, TransitionDomainAdmin) admin.site.register(models.TransitionDomain, TransitionDomainAdmin)
admin.site.register(models.VeryImportantPerson, VeryImportantPersonAdmin)

View file

@ -0,0 +1,37 @@
# Generated by Django 4.2.7 on 2024-01-19 00:18
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
("registrar", "0062_alter_host_name"),
]
operations = [
migrations.CreateModel(
name="VeryImportantPerson",
fields=[
("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("created_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True)),
("email", models.EmailField(blank=True, db_index=True, help_text="Email", max_length=254, null=True)),
("notes", models.TextField(blank=True, help_text="Notes", null=True)),
(
"user",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="verifiedby_user",
to=settings.AUTH_USER_MODEL,
),
),
],
options={
"abstract": False,
},
),
]

View file

@ -13,6 +13,7 @@ from .user import User
from .user_group import UserGroup from .user_group import UserGroup
from .website import Website from .website import Website
from .transition_domain import TransitionDomain from .transition_domain import TransitionDomain
from .very_important_person import VeryImportantPerson
__all__ = [ __all__ = [
"Contact", "Contact",
@ -29,6 +30,7 @@ __all__ = [
"UserGroup", "UserGroup",
"Website", "Website",
"TransitionDomain", "TransitionDomain",
"VeryImportantPerson",
] ]
auditlog.register(Contact) auditlog.register(Contact)
@ -45,3 +47,4 @@ auditlog.register(User, m2m_fields=["user_permissions", "groups"])
auditlog.register(UserGroup, m2m_fields=["permissions"]) auditlog.register(UserGroup, m2m_fields=["permissions"])
auditlog.register(Website) auditlog.register(Website)
auditlog.register(TransitionDomain) auditlog.register(TransitionDomain)
auditlog.register(VeryImportantPerson)

View file

@ -7,6 +7,7 @@ from registrar.models.user_domain_role import UserDomainRole
from .domain_invitation import DomainInvitation from .domain_invitation import DomainInvitation
from .transition_domain import TransitionDomain from .transition_domain import TransitionDomain
from .very_important_person import VeryImportantPerson
from .domain import Domain from .domain import Domain
from phonenumber_field.modelfields import PhoneNumberField # type: ignore from phonenumber_field.modelfields import PhoneNumberField # type: ignore
@ -89,6 +90,10 @@ class User(AbstractUser):
if TransitionDomain.objects.filter(username=email).exists(): if TransitionDomain.objects.filter(username=email).exists():
return False return False
# New users flagged by Staff to bypass ial2
if VeryImportantPerson.objects.filter(email=email).exists():
return False
# A new incoming user who is being invited to be a domain manager (that is, # A new incoming user who is being invited to be a domain manager (that is,
# their email address is in DomainInvitation for an invitation that is not yet "retrieved"). # their email address is in DomainInvitation for an invitation that is not yet "retrieved").
invited = DomainInvitation.DomainInvitationStatus.INVITED invited = DomainInvitation.DomainInvitationStatus.INVITED

View file

@ -0,0 +1,36 @@
from django.db import models
from .utility.time_stamped_model import TimeStampedModel
class VeryImportantPerson(TimeStampedModel):
""""""
email = models.EmailField(
null=True,
blank=True,
help_text="Email",
db_index=True,
)
user = models.ForeignKey(
"registrar.User",
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="verifiedby_user",
)
notes = models.TextField(
null=True,
blank=True,
help_text="Notes",
)
def __str__(self):
try:
if self.email:
return self.email
except Exception:
return ""

View file

@ -14,9 +14,11 @@ from registrar.admin import (
ContactAdmin, ContactAdmin,
DomainInformationAdmin, DomainInformationAdmin,
UserDomainRoleAdmin, UserDomainRoleAdmin,
VeryImportantPersonAdmin,
) )
from registrar.models import Domain, DomainApplication, DomainInformation, User, DomainInvitation, Contact, Website from registrar.models import Domain, DomainApplication, DomainInformation, User, DomainInvitation, Contact, Website
from registrar.models.user_domain_role import UserDomainRole from registrar.models.user_domain_role import UserDomainRole
from registrar.models.very_important_person import VeryImportantPerson
from .common import ( from .common import (
MockSESClient, MockSESClient,
AuditedAdminMockData, AuditedAdminMockData,
@ -1737,3 +1739,26 @@ class ContactAdminTest(TestCase):
def tearDown(self): def tearDown(self):
User.objects.all().delete() User.objects.all().delete()
class VeryImportantPersonAdminTestCase(TestCase):
def setUp(self):
self.superuser = create_superuser()
self.factory = RequestFactory()
def test_save_model_sets_user_field(self):
# Create an instance of the admin class
admin_instance = VeryImportantPersonAdmin(model=VeryImportantPerson, admin_site=None)
# Create a VeryImportantPerson instance
vip_instance = VeryImportantPerson(email="test@example.com", notes="Test Notes")
# Create a request object
request = self.factory.post("/admin/yourapp/veryimportantperson/add/")
request.user = self.superuser
# Call the save_model method
admin_instance.save_model(request, vip_instance, None, None)
# Check that the user field is set to the request.user
self.assertEqual(vip_instance.user, self.superuser)

View file

@ -15,7 +15,8 @@ from registrar.models import (
) )
import boto3_mocking import boto3_mocking
from registrar.models.transition_domain import TransitionDomain # type: ignore from registrar.models.transition_domain import TransitionDomain
from registrar.models.very_important_person import VeryImportantPerson # type: ignore
from .common import MockSESClient, less_console_noise, completed_application from .common import MockSESClient, less_console_noise, completed_application
from django_fsm import TransitionNotAllowed from django_fsm import TransitionNotAllowed
@ -652,6 +653,12 @@ class TestUser(TestCase):
TransitionDomain.objects.get_or_create(username=self.user.email, domain_name=self.domain_name) TransitionDomain.objects.get_or_create(username=self.user.email, domain_name=self.domain_name)
self.assertFalse(User.needs_identity_verification(self.user.email, self.user.username)) self.assertFalse(User.needs_identity_verification(self.user.email, self.user.username))
def test_identity_verification_with_very_important_person(self):
"""A Very Important Person should return False
when tested with class method needs_identity_verification"""
VeryImportantPerson.objects.get_or_create(email=self.user.email)
self.assertFalse(User.needs_identity_verification(self.user.email, self.user.username))
def test_identity_verification_with_invited_user(self): def test_identity_verification_with_invited_user(self):
"""An invited user should return False when tested with class """An invited user should return False when tested with class
method needs_identity_verification""" method needs_identity_verification"""