Merge branch 'main' into za/1530-warning-messages-changing-statuses

This commit is contained in:
zandercymatics 2024-03-12 16:45:21 -06:00
commit 7b773c16b7
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
9 changed files with 648 additions and 189 deletions

View file

@ -16,6 +16,7 @@ from dateutil.relativedelta import relativedelta # type: ignore
from epplibwrapper.errors import ErrorCode, RegistryError from epplibwrapper.errors import ErrorCode, RegistryError
from registrar.models import Contact, Domain, DomainRequest, DraftDomain, User, Website from registrar.models import Contact, Domain, DomainRequest, DraftDomain, User, Website
from registrar.utility import csv_export from registrar.utility import csv_export
from registrar.utility.errors import FSMApplicationError, FSMErrorCodes
from registrar.views.utility.mixins import OrderableFieldsMixin from registrar.views.utility.mixins import OrderableFieldsMixin
from django.contrib.admin.views.main import ORDER_VAR from django.contrib.admin.views.main import ORDER_VAR
from registrar.widgets import NoAutocompleteFilteredSelectMultiple from registrar.widgets import NoAutocompleteFilteredSelectMultiple
@ -92,9 +93,14 @@ class DomainRequestAdminForm(forms.ModelForm):
# first option in status transitions is current state # first option in status transitions is current state
available_transitions = [(current_state, domain_request.get_status_display())] available_transitions = [(current_state, domain_request.get_status_display())]
transitions = get_available_FIELD_transitions( if domain_request.investigator is not None:
domain_request, models.DomainRequest._meta.get_field("status") transitions = get_available_FIELD_transitions(
) domain_request, models.DomainRequest._meta.get_field("status")
)
else:
transitions = self.get_custom_field_transitions(
domain_request, models.DomainRequest._meta.get_field("status")
)
for transition in transitions: for transition in transitions:
available_transitions.append((transition.target, transition.target.label)) available_transitions.append((transition.target, transition.target.label))
@ -105,6 +111,73 @@ class DomainRequestAdminForm(forms.ModelForm):
if not domain_request.creator.is_restricted(): if not domain_request.creator.is_restricted():
self.fields["status"].widget.choices = available_transitions self.fields["status"].widget.choices = available_transitions
def get_custom_field_transitions(self, instance, field):
"""Custom implementation of get_available_FIELD_transitions
in the FSM. Allows us to still display fields filtered out by a condition."""
curr_state = field.get_state(instance)
transitions = field.transitions[instance.__class__]
for name, transition in transitions.items():
meta = transition._django_fsm
if meta.has_transition(curr_state):
yield meta.get_transition(curr_state)
def clean(self):
"""
Override of the default clean on the form.
This is so we can inject custom form-level error messages.
"""
# clean is called from clean_forms, which is called from is_valid
# after clean_fields. it is used to determine form level errors.
# is_valid is typically called from view during a post
cleaned_data = super().clean()
status = cleaned_data.get("status")
investigator = cleaned_data.get("investigator")
# Get the old status
initial_status = self.initial.get("status", None)
# We only care about investigator when in these statuses
checked_statuses = [
DomainRequest.DomainRequestStatus.APPROVED,
DomainRequest.DomainRequestStatus.IN_REVIEW,
DomainRequest.DomainRequestStatus.ACTION_NEEDED,
DomainRequest.DomainRequestStatus.REJECTED,
DomainRequest.DomainRequestStatus.INELIGIBLE,
]
# If a status change occured, check for validity
if status != initial_status and status in checked_statuses:
# Checks the "investigators" field for validity.
# That field must obey certain conditions when an domain request is approved.
# Will call "add_error" if any issues are found.
self._check_for_valid_investigator(investigator)
return cleaned_data
def _check_for_valid_investigator(self, investigator) -> bool:
"""
Checks if the investigator field is not none, and is staff.
Adds form errors on failure.
"""
is_valid = False
# Check if an investigator is assigned. No approval is possible without one.
error_message = None
if investigator is None:
# Lets grab the error message from a common location
error_message = FSMApplicationError.get_error_message(FSMErrorCodes.NO_INVESTIGATOR)
elif not investigator.is_staff:
error_message = FSMApplicationError.get_error_message(FSMErrorCodes.INVESTIGATOR_NOT_STAFF)
else:
is_valid = True
if error_message is not None:
self.add_error("investigator", error_message)
return is_valid
# Based off of this excellent example: https://djangosnippets.org/snippets/10471/ # Based off of this excellent example: https://djangosnippets.org/snippets/10471/
class MultiFieldSortableChangeList(admin.views.main.ChangeList): class MultiFieldSortableChangeList(admin.views.main.ChangeList):
@ -1056,72 +1129,24 @@ class DomainRequestAdmin(ListHeaderAdmin):
# Trigger action when a fieldset is changed # Trigger action when a fieldset is changed
def save_model(self, request, obj, form, change): def save_model(self, request, obj, form, change):
if obj and obj.creator.status != models.User.RESTRICTED: """Custom save_model definition that handles edge cases"""
if change: # Check if the domain request is being edited
# Get the original domain request from the database
original_obj = models.DomainRequest.objects.get(pk=obj.pk)
if ( # == Check that the obj is in a valid state == #
obj
and original_obj.status == models.DomainRequest.DomainRequestStatus.APPROVED
and obj.status != models.DomainRequest.DomainRequestStatus.APPROVED
and not obj.domain_is_not_active()
):
# If an admin tried to set an approved domain request to
# another status and the related domain is already
# active, shortcut the action and throw a friendly
# error message. This action would still not go through
# shortcut or not as the rules are duplicated on the model,
# but the error would be an ugly Django error screen.
# Clear the success message # If obj is none, something went very wrong.
messages.set_level(request, messages.ERROR) # The form should have blocked this, so lets forbid it.
if not obj:
logger.error(f"Invalid value for obj ({obj})")
messages.set_level(request, messages.ERROR)
messages.error(
request,
"Could not save DomainRequest. Something went wrong.",
)
return None
messages.error( # If the user is restricted or we're saving an invalid model,
request, # forbid this action.
"This action is not permitted. The domain is already active.", if not obj or obj.creator.status == models.User.RESTRICTED:
)
elif (
obj and obj.status == models.DomainRequest.DomainRequestStatus.REJECTED and not obj.rejection_reason
):
# This condition should never be triggered.
# The opposite of this condition is acceptable (rejected -> other status and rejection_reason)
# because we clean up the rejection reason in the transition in the model.
# Clear the success message
messages.set_level(request, messages.ERROR)
messages.error(
request,
"A rejection reason is required.",
)
else:
if obj.status != original_obj.status:
status_method_mapping = {
models.DomainRequest.DomainRequestStatus.STARTED: None,
models.DomainRequest.DomainRequestStatus.SUBMITTED: obj.submit,
models.DomainRequest.DomainRequestStatus.IN_REVIEW: obj.in_review,
models.DomainRequest.DomainRequestStatus.ACTION_NEEDED: obj.action_needed,
models.DomainRequest.DomainRequestStatus.APPROVED: obj.approve,
models.DomainRequest.DomainRequestStatus.WITHDRAWN: obj.withdraw,
models.DomainRequest.DomainRequestStatus.REJECTED: obj.reject,
models.DomainRequest.DomainRequestStatus.INELIGIBLE: (obj.reject_with_prejudice),
}
selected_method = status_method_mapping.get(obj.status)
if selected_method is None:
logger.warning("Unknown status selected in django admin")
else:
# This is an fsm in model which will throw an error if the
# transition condition is violated, so we roll back the
# status to what it was before the admin user changed it and
# let the fsm method set it.
obj.status = original_obj.status
selected_method()
super().save_model(request, obj, form, change)
else:
# Clear the success message # Clear the success message
messages.set_level(request, messages.ERROR) messages.set_level(request, messages.ERROR)
@ -1130,6 +1155,117 @@ class DomainRequestAdmin(ListHeaderAdmin):
"This action is not permitted for domain requests with a restricted creator.", "This action is not permitted for domain requests with a restricted creator.",
) )
return None
# == Check if we're making a change or not == #
# If we're not making a change (adding a record), run save model as we do normally
if not change:
return super().save_model(request, obj, form, change)
# == Handle non-status changes == #
# Get the original domain request from the database.
original_obj = models.DomainRequest.objects.get(pk=obj.pk)
if obj.status == original_obj.status:
# If the status hasn't changed, let the base function take care of it
return super().save_model(request, obj, form, change)
# == Handle status changes == #
# Run some checks on the current object for invalid status changes
obj, should_save = self._handle_status_change(request, obj, original_obj)
# We should only save if we don't display any errors in the step above.
if should_save:
return super().save_model(request, obj, form, change)
def _handle_status_change(self, request, obj, original_obj):
"""
Checks for various conditions when a status change is triggered.
In the event that it is valid, the status will be mapped to
the appropriate method.
In the event that we should not status change, an error message
will be displayed.
Returns a tuple: (obj: DomainRequest, should_proceed: bool)
"""
should_proceed = True
error_message = None
# Get the method that should be run given the status
selected_method = self.get_status_method_mapping(obj)
if selected_method is None:
logger.warning("Unknown status selected in django admin")
# If the status is not mapped properly, saving could cause
# weird issues down the line. Instead, we should block this.
should_proceed = False
return should_proceed
request_is_not_approved = obj.status != models.DomainRequest.DomainRequestStatus.APPROVED
if request_is_not_approved and not obj.domain_is_not_active():
# If an admin tried to set an approved domain request to
# another status and the related domain is already
# active, shortcut the action and throw a friendly
# error message. This action would still not go through
# shortcut or not as the rules are duplicated on the model,
# but the error would be an ugly Django error screen.
error_message = "This action is not permitted. The domain is already active."
elif obj.status == models.DomainRequest.DomainRequestStatus.REJECTED and not obj.rejection_reason:
# This condition should never be triggered.
# The opposite of this condition is acceptable (rejected -> other status and rejection_reason)
# because we clean up the rejection reason in the transition in the model.
error_message = "A rejection reason is required."
else:
# This is an fsm in model which will throw an error if the
# transition condition is violated, so we roll back the
# status to what it was before the admin user changed it and
# let the fsm method set it.
obj.status = original_obj.status
# Try to perform the status change.
# Catch FSMApplicationError's and return the message,
# as these are typically user errors.
try:
selected_method()
except FSMApplicationError as err:
logger.warning(f"An error encountered when trying to change status: {err}")
error_message = err.message
if error_message is not None:
# Clear the success message
messages.set_level(request, messages.ERROR)
# Display the error
messages.error(
request,
error_message,
)
# If an error message exists, we shouldn't proceed
should_proceed = False
return (obj, should_proceed)
def get_status_method_mapping(self, domain_request):
"""Returns what method should be ran given an domain request object"""
# Define a per-object mapping
status_method_mapping = {
models.DomainRequest.DomainRequestStatus.STARTED: None,
models.DomainRequest.DomainRequestStatus.SUBMITTED: domain_request.submit,
models.DomainRequest.DomainRequestStatus.IN_REVIEW: domain_request.in_review,
models.DomainRequest.DomainRequestStatus.ACTION_NEEDED: domain_request.action_needed,
models.DomainRequest.DomainRequestStatus.APPROVED: domain_request.approve,
models.DomainRequest.DomainRequestStatus.WITHDRAWN: domain_request.withdraw,
models.DomainRequest.DomainRequestStatus.REJECTED: domain_request.reject,
models.DomainRequest.DomainRequestStatus.INELIGIBLE: (domain_request.reject_with_prejudice),
}
# Grab the method
return status_method_mapping.get(domain_request.status, None)
def get_readonly_fields(self, request, obj=None): def get_readonly_fields(self, request, obj=None):
"""Set the read-only state on form elements. """Set the read-only state on form elements.
We have 2 conditions that determine which fields are read-only: We have 2 conditions that determine which fields are read-only:

View file

@ -277,11 +277,6 @@ h1, h2, h3,
} }
} }
// Hides the "clear" button on autocomplete, as we already have one to use
.select2-selection__clear {
display: none;
}
// Fixes a display issue where the list was entirely white, or had too much whitespace // Fixes a display issue where the list was entirely white, or had too much whitespace
.select2-dropdown { .select2-dropdown {
display: inline-grid !important; display: inline-grid !important;

View file

@ -1,6 +1,7 @@
import logging import logging
import random import random
from faker import Faker from faker import Faker
from django.db import transaction
from registrar.models import ( from registrar.models import (
User, User,
@ -184,6 +185,14 @@ class DomainRequestFixture:
logger.warning(e) logger.warning(e)
return return
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
with transaction.atomic():
cls._create_domain_requests(users)
@classmethod
def _create_domain_requests(cls, users):
"""Creates DomainRequests given a list of users"""
for user in users: for user in users:
logger.debug("Loading domain requests for %s" % user) logger.debug("Loading domain requests for %s" % user)
for app in cls.DA: for app in cls.DA:
@ -211,8 +220,16 @@ class DomainFixture(DomainRequestFixture):
logger.warning(e) logger.warning(e)
return return
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
with transaction.atomic():
# approve each user associated with `in review` status domains
DomainFixture._approve_domain_requests(users)
@staticmethod
def _approve_domain_requests(users):
"""Approves all provided domain requests if they are in the state in_review"""
for user in users: for user in users:
# approve one of each users in review status domains
domain_request = DomainRequest.objects.filter( domain_request = DomainRequest.objects.filter(
creator=user, status=DomainRequest.DomainRequestStatus.IN_REVIEW creator=user, status=DomainRequest.DomainRequestStatus.IN_REVIEW
).last() ).last()
@ -220,5 +237,13 @@ class DomainFixture(DomainRequestFixture):
# We don't want fixtures sending out real emails to # We don't want fixtures sending out real emails to
# fake email addresses, so we just skip that and log it instead # fake email addresses, so we just skip that and log it instead
# All approvals require an investigator, so if there is none,
# assign one.
if domain_request.investigator is None:
# All "users" in fixtures have admin perms per prior config.
# No need to check for that.
domain_request.investigator = random.choice(users) # nosec
domain_request.approve(send_email=False) domain_request.approve(send_email=False)
domain_request.save() domain_request.save()

View file

@ -1,5 +1,6 @@
import logging import logging
from faker import Faker from faker import Faker
from django.db import transaction
from registrar.models import ( from registrar.models import (
User, User,
@ -186,5 +187,12 @@ class UserFixture:
@classmethod @classmethod
def load(cls): def load(cls):
cls.load_users(cls, cls.ADMINS, "full_access_group") # Lumped under .atomic to ensure we don't make redundant DB calls.
cls.load_users(cls, cls.STAFF, "cisa_analysts_group") # This bundles them all together, and then saves it in a single call.
# This is slightly different then bulk_create or bulk_update, in that
# you still get the same behaviour of .save(), but those incremental
# steps now do not need to close/reopen a db connection,
# instead they share one.
with transaction.atomic():
cls.load_users(cls, cls.ADMINS, "full_access_group")
cls.load_users(cls, cls.STAFF, "cisa_analysts_group")

View file

@ -9,6 +9,7 @@ from django.db import models
from django_fsm import FSMField, transition # type: ignore from django_fsm import FSMField, transition # type: ignore
from django.utils import timezone from django.utils import timezone
from registrar.models.domain import Domain from registrar.models.domain import Domain
from registrar.utility.errors import FSMApplicationError, FSMErrorCodes
from .utility.time_stamped_model import TimeStampedModel from .utility.time_stamped_model import TimeStampedModel
from ..utility.email import send_templated_email, EmailSendingError from ..utility.email import send_templated_email, EmailSendingError
@ -645,6 +646,14 @@ class DomainRequest(TimeStampedModel):
except EmailSendingError: except EmailSendingError:
logger.warning("Failed to send confirmation email", exc_info=True) logger.warning("Failed to send confirmation email", exc_info=True)
def investigator_exists_and_is_staff(self):
"""Checks if the current investigator is in a valid state for a state transition"""
is_valid = True
# Check if an investigator is assigned. No approval is possible without one.
if self.investigator is None or not self.investigator.is_staff:
is_valid = False
return is_valid
@transition( @transition(
field="status", field="status",
source=[ source=[
@ -656,7 +665,7 @@ class DomainRequest(TimeStampedModel):
target=DomainRequestStatus.SUBMITTED, target=DomainRequestStatus.SUBMITTED,
) )
def submit(self): def submit(self):
"""Submit a domain request that is started. """Submit an domain request that is started.
As a side effect, an email notification is sent.""" As a side effect, an email notification is sent."""
@ -664,10 +673,7 @@ class DomainRequest(TimeStampedModel):
# can raise more informative exceptions # can raise more informative exceptions
# requested_domain could be None here # requested_domain could be None here
if not hasattr(self, "requested_domain"): if not hasattr(self, "requested_domain") or self.requested_domain is None:
raise ValueError("Requested domain is missing.")
if self.requested_domain is None:
raise ValueError("Requested domain is missing.") raise ValueError("Requested domain is missing.")
DraftDomain = apps.get_model("registrar.DraftDomain") DraftDomain = apps.get_model("registrar.DraftDomain")
@ -704,10 +710,10 @@ class DomainRequest(TimeStampedModel):
DomainRequestStatus.INELIGIBLE, DomainRequestStatus.INELIGIBLE,
], ],
target=DomainRequestStatus.IN_REVIEW, target=DomainRequestStatus.IN_REVIEW,
conditions=[domain_is_not_active], conditions=[domain_is_not_active, investigator_exists_and_is_staff],
) )
def in_review(self): def in_review(self):
"""Investigate a domain request that has been submitted. """Investigate an domain request that has been submitted.
This action is logged. This action is logged.
@ -736,10 +742,10 @@ class DomainRequest(TimeStampedModel):
DomainRequestStatus.INELIGIBLE, DomainRequestStatus.INELIGIBLE,
], ],
target=DomainRequestStatus.ACTION_NEEDED, target=DomainRequestStatus.ACTION_NEEDED,
conditions=[domain_is_not_active], conditions=[domain_is_not_active, investigator_exists_and_is_staff],
) )
def action_needed(self): def action_needed(self):
"""Send back a domain request that is under investigation or rejected. """Send back an domain request that is under investigation or rejected.
This action is logged. This action is logged.
@ -768,9 +774,10 @@ class DomainRequest(TimeStampedModel):
DomainRequestStatus.REJECTED, DomainRequestStatus.REJECTED,
], ],
target=DomainRequestStatus.APPROVED, target=DomainRequestStatus.APPROVED,
conditions=[investigator_exists_and_is_staff],
) )
def approve(self, send_email=True): def approve(self, send_email=True):
"""Approve a domain request that has been submitted. """Approve an domain request that has been submitted.
This action cleans up the rejection status if moving away from rejected. This action cleans up the rejection status if moving away from rejected.
@ -781,8 +788,12 @@ class DomainRequest(TimeStampedModel):
# create the domain # create the domain
Domain = apps.get_model("registrar.Domain") Domain = apps.get_model("registrar.Domain")
# == Check that the domain_request is valid == #
if Domain.objects.filter(name=self.requested_domain.name).exists(): if Domain.objects.filter(name=self.requested_domain.name).exists():
raise ValueError("Cannot approve. Requested domain is already in use.") raise FSMApplicationError(code=FSMErrorCodes.APPROVE_DOMAIN_IN_USE)
# == Create the domain and related components == #
created_domain = Domain.objects.create(name=self.requested_domain.name) created_domain = Domain.objects.create(name=self.requested_domain.name)
self.approved_domain = created_domain self.approved_domain = created_domain
@ -799,6 +810,7 @@ class DomainRequest(TimeStampedModel):
if self.status == self.DomainRequestStatus.REJECTED: if self.status == self.DomainRequestStatus.REJECTED:
self.rejection_reason = None self.rejection_reason = None
# == Send out an email == #
self._send_status_update_email( self._send_status_update_email(
"domain request approved", "domain request approved",
"emails/status_change_approved.txt", "emails/status_change_approved.txt",
@ -812,7 +824,7 @@ class DomainRequest(TimeStampedModel):
target=DomainRequestStatus.WITHDRAWN, target=DomainRequestStatus.WITHDRAWN,
) )
def withdraw(self): def withdraw(self):
"""Withdraw a domain request that has been submitted.""" """Withdraw an domain request that has been submitted."""
self._send_status_update_email( self._send_status_update_email(
"withdraw", "withdraw",
@ -824,10 +836,10 @@ class DomainRequest(TimeStampedModel):
field="status", field="status",
source=[DomainRequestStatus.IN_REVIEW, DomainRequestStatus.ACTION_NEEDED, DomainRequestStatus.APPROVED], source=[DomainRequestStatus.IN_REVIEW, DomainRequestStatus.ACTION_NEEDED, DomainRequestStatus.APPROVED],
target=DomainRequestStatus.REJECTED, target=DomainRequestStatus.REJECTED,
conditions=[domain_is_not_active], conditions=[domain_is_not_active, investigator_exists_and_is_staff],
) )
def reject(self): def reject(self):
"""Reject a domain request that has been submitted. """Reject an domain request that has been submitted.
As side effects this will delete the domain and domain_information As side effects this will delete the domain and domain_information
(will cascade), and send an email notification.""" (will cascade), and send an email notification."""
@ -850,7 +862,7 @@ class DomainRequest(TimeStampedModel):
DomainRequestStatus.REJECTED, DomainRequestStatus.REJECTED,
], ],
target=DomainRequestStatus.INELIGIBLE, target=DomainRequestStatus.INELIGIBLE,
conditions=[domain_is_not_active], conditions=[domain_is_not_active, investigator_exists_and_is_staff],
) )
def reject_with_prejudice(self): def reject_with_prejudice(self):
"""The applicant is a bad actor, reject with prejudice. """The applicant is a bad actor, reject with prejudice.

View file

@ -549,6 +549,7 @@ def completed_domain_request(
user=False, user=False,
submitter=False, submitter=False,
name="city.gov", name="city.gov",
investigator=None,
): ):
"""A completed domain request.""" """A completed domain request."""
if not user: if not user:
@ -578,6 +579,13 @@ def completed_domain_request(
email="testy2@town.com", email="testy2@town.com",
phone="(555) 555 5557", phone="(555) 555 5557",
) )
if not investigator:
investigator, _ = User.objects.get_or_create(
username="incrediblyfakeinvestigator",
first_name="Joe",
last_name="Bob",
is_staff=True,
)
domain_request_kwargs = dict( domain_request_kwargs = dict(
organization_type="federal", organization_type="federal",
federal_type="executive", federal_type="executive",
@ -593,6 +601,7 @@ def completed_domain_request(
submitter=submitter, submitter=submitter,
creator=user, creator=user,
status=status, status=status,
investigator=investigator,
) )
if has_about_your_organization: if has_about_your_organization:
domain_request_kwargs["about_your_organization"] = "e-Government" domain_request_kwargs["about_your_organization"] = "e-Government"
@ -611,6 +620,13 @@ def completed_domain_request(
return domain_request return domain_request
def set_domain_request_investigators(domain_request_list: list[DomainRequest], investigator_user: User):
"""Helper method that sets the investigator field of all provided domain requests"""
for request in domain_request_list:
request.investigator = investigator_user
request.save()
def multiple_unalphabetical_domain_objects( def multiple_unalphabetical_domain_objects(
domain_type=AuditedAdminMockData.DOMAIN_REQUEST, domain_type=AuditedAdminMockData.DOMAIN_REQUEST,
): ):

View file

@ -17,7 +17,7 @@ from registrar.models import (
import boto3_mocking import boto3_mocking
from registrar.models.transition_domain import TransitionDomain from registrar.models.transition_domain import TransitionDomain
from registrar.models.verified_by_staff import VerifiedByStaff # type: ignore from registrar.models.verified_by_staff import VerifiedByStaff # type: ignore
from .common import MockSESClient, less_console_noise, completed_domain_request from .common import MockSESClient, less_console_noise, completed_domain_request, set_domain_request_investigators
from django_fsm import TransitionNotAllowed from django_fsm import TransitionNotAllowed
@ -52,6 +52,18 @@ class TestDomainRequest(TestCase):
status=DomainRequest.DomainRequestStatus.INELIGIBLE, name="ineligible.gov" status=DomainRequest.DomainRequestStatus.INELIGIBLE, name="ineligible.gov"
) )
# Store all domain request statuses in a variable for ease of use
self.all_domain_requests = [
self.started_domain_request,
self.submitted_domain_request,
self.in_review_domain_request,
self.action_needed_domain_request,
self.approved_domain_request,
self.withdrawn_domain_request,
self.rejected_domain_request,
self.ineligible_domain_request,
]
self.mock_client = MockSESClient() self.mock_client = MockSESClient()
def tearDown(self): def tearDown(self):
@ -219,6 +231,65 @@ class TestDomainRequest(TestCase):
domain_request = completed_domain_request(status=DomainRequest.DomainRequestStatus.APPROVED) domain_request = completed_domain_request(status=DomainRequest.DomainRequestStatus.APPROVED)
self.check_email_sent(domain_request, msg, "reject_with_prejudice", 0) self.check_email_sent(domain_request, msg, "reject_with_prejudice", 0)
def assert_fsm_transition_raises_error(self, test_cases, method_to_run):
"""Given a list of test cases, check if each transition throws the intended error"""
with boto3_mocking.clients.handler_for("sesv2", self.mock_client), less_console_noise():
for domain_request, exception_type in test_cases:
with self.subTest(domain_request=domain_request, exception_type=exception_type):
with self.assertRaises(exception_type):
# Retrieve the method by name from the domain_request object and call it
method = getattr(domain_request, method_to_run)
# Call the method
method()
def assert_fsm_transition_does_not_raise_error(self, test_cases, method_to_run):
"""Given a list of test cases, ensure that none of them throw transition errors"""
with boto3_mocking.clients.handler_for("sesv2", self.mock_client), less_console_noise():
for domain_request, exception_type in test_cases:
with self.subTest(domain_request=domain_request, exception_type=exception_type):
try:
# Retrieve the method by name from the DomainRequest object and call it
method = getattr(domain_request, method_to_run)
# Call the method
method()
except exception_type:
self.fail(f"{exception_type} was raised, but it was not expected.")
def test_submit_transition_allowed_with_no_investigator(self):
"""
Tests for attempting to transition without an investigator.
For submit, this should be valid in all cases.
"""
test_cases = [
(self.started_domain_request, TransitionNotAllowed),
(self.in_review_domain_request, TransitionNotAllowed),
(self.action_needed_domain_request, TransitionNotAllowed),
(self.withdrawn_domain_request, TransitionNotAllowed),
]
# Set all investigators to none
set_domain_request_investigators(self.all_domain_requests, None)
self.assert_fsm_transition_does_not_raise_error(test_cases, "submit")
def test_submit_transition_allowed_with_investigator_not_staff(self):
"""
Tests for attempting to transition with an investigator user that is not staff.
For submit, this should be valid in all cases.
"""
test_cases = [
(self.in_review_domain_request, TransitionNotAllowed),
(self.action_needed_domain_request, TransitionNotAllowed),
]
# Set all investigators to a user with no staff privs
user, _ = User.objects.get_or_create(username="pancakesyrup", is_staff=False)
set_domain_request_investigators(self.all_domain_requests, user)
self.assert_fsm_transition_does_not_raise_error(test_cases, "submit")
def test_submit_transition_allowed(self): def test_submit_transition_allowed(self):
""" """
Test that calling submit from allowable statuses does raises TransitionNotAllowed. Test that calling submit from allowable statuses does raises TransitionNotAllowed.
@ -230,14 +301,27 @@ class TestDomainRequest(TestCase):
(self.withdrawn_domain_request, TransitionNotAllowed), (self.withdrawn_domain_request, TransitionNotAllowed),
] ]
self.assert_fsm_transition_does_not_raise_error(test_cases, "submit")
def test_submit_transition_allowed_twice(self):
"""
Test that rotating between submit and in_review doesn't throw an error
"""
with boto3_mocking.clients.handler_for("sesv2", self.mock_client): with boto3_mocking.clients.handler_for("sesv2", self.mock_client):
with less_console_noise(): with less_console_noise():
for domain_request, exception_type in test_cases: try:
with self.subTest(domain_request=domain_request, exception_type=exception_type): # Make a submission
try: self.in_review_domain_request.submit()
domain_request.submit()
except TransitionNotAllowed: # Rerun the old method to get back to the original state
self.fail("TransitionNotAllowed was raised, but it was not expected.") self.in_review_domain_request.in_review()
# Make another submission
self.in_review_domain_request.submit()
except TransitionNotAllowed:
self.fail("TransitionNotAllowed was raised, but it was not expected.")
self.assertEqual(self.in_review_domain_request.status, DomainRequest.DomainRequestStatus.SUBMITTED)
def test_submit_transition_not_allowed(self): def test_submit_transition_not_allowed(self):
""" """
@ -250,12 +334,7 @@ class TestDomainRequest(TestCase):
(self.ineligible_domain_request, TransitionNotAllowed), (self.ineligible_domain_request, TransitionNotAllowed),
] ]
with boto3_mocking.clients.handler_for("sesv2", self.mock_client): self.assert_fsm_transition_raises_error(test_cases, "submit")
with less_console_noise():
for domain_request, exception_type in test_cases:
with self.subTest(domain_request=domain_request, exception_type=exception_type):
with self.assertRaises(exception_type):
domain_request.submit()
def test_in_review_transition_allowed(self): def test_in_review_transition_allowed(self):
""" """
@ -269,14 +348,43 @@ class TestDomainRequest(TestCase):
(self.ineligible_domain_request, TransitionNotAllowed), (self.ineligible_domain_request, TransitionNotAllowed),
] ]
with boto3_mocking.clients.handler_for("sesv2", self.mock_client): self.assert_fsm_transition_does_not_raise_error(test_cases, "in_review")
with less_console_noise():
for domain_request, exception_type in test_cases: def test_in_review_transition_not_allowed_with_no_investigator(self):
with self.subTest(domain_request=domain_request, exception_type=exception_type): """
try: Tests for attempting to transition without an investigator
domain_request.in_review() """
except TransitionNotAllowed:
self.fail("TransitionNotAllowed was raised, but it was not expected.") test_cases = [
(self.action_needed_domain_request, TransitionNotAllowed),
(self.approved_domain_request, TransitionNotAllowed),
(self.rejected_domain_request, TransitionNotAllowed),
(self.ineligible_domain_request, TransitionNotAllowed),
]
# Set all investigators to none
set_domain_request_investigators(self.all_domain_requests, None)
self.assert_fsm_transition_raises_error(test_cases, "in_review")
def test_in_review_transition_not_allowed_with_investigator_not_staff(self):
"""
Tests for attempting to transition with an investigator that is not staff.
This should throw an exception.
"""
test_cases = [
(self.action_needed_domain_request, TransitionNotAllowed),
(self.approved_domain_request, TransitionNotAllowed),
(self.rejected_domain_request, TransitionNotAllowed),
(self.ineligible_domain_request, TransitionNotAllowed),
]
# Set all investigators to a user with no staff privs
user, _ = User.objects.get_or_create(username="pancakesyrup", is_staff=False)
set_domain_request_investigators(self.all_domain_requests, user)
self.assert_fsm_transition_raises_error(test_cases, "in_review")
def test_in_review_transition_not_allowed(self): def test_in_review_transition_not_allowed(self):
""" """
@ -288,12 +396,7 @@ class TestDomainRequest(TestCase):
(self.withdrawn_domain_request, TransitionNotAllowed), (self.withdrawn_domain_request, TransitionNotAllowed),
] ]
with boto3_mocking.clients.handler_for("sesv2", self.mock_client): self.assert_fsm_transition_raises_error(test_cases, "in_review")
with less_console_noise():
for domain_request, exception_type in test_cases:
with self.subTest(domain_request=domain_request, exception_type=exception_type):
with self.assertRaises(exception_type):
domain_request.in_review()
def test_action_needed_transition_allowed(self): def test_action_needed_transition_allowed(self):
""" """
@ -305,13 +408,43 @@ class TestDomainRequest(TestCase):
(self.rejected_domain_request, TransitionNotAllowed), (self.rejected_domain_request, TransitionNotAllowed),
(self.ineligible_domain_request, TransitionNotAllowed), (self.ineligible_domain_request, TransitionNotAllowed),
] ]
with less_console_noise():
for domain_request, exception_type in test_cases: self.assert_fsm_transition_does_not_raise_error(test_cases, "action_needed")
with self.subTest(domain_request=domain_request, exception_type=exception_type):
try: def test_action_needed_transition_not_allowed_with_no_investigator(self):
domain_request.action_needed() """
except TransitionNotAllowed: Tests for attempting to transition without an investigator
self.fail("TransitionNotAllowed was raised, but it was not expected.") """
test_cases = [
(self.in_review_domain_request, TransitionNotAllowed),
(self.approved_domain_request, TransitionNotAllowed),
(self.rejected_domain_request, TransitionNotAllowed),
(self.ineligible_domain_request, TransitionNotAllowed),
]
# Set all investigators to none
set_domain_request_investigators(self.all_domain_requests, None)
self.assert_fsm_transition_raises_error(test_cases, "action_needed")
def test_action_needed_transition_not_allowed_with_investigator_not_staff(self):
"""
Tests for attempting to transition with an investigator that is not staff
"""
test_cases = [
(self.in_review_domain_request, TransitionNotAllowed),
(self.approved_domain_request, TransitionNotAllowed),
(self.rejected_domain_request, TransitionNotAllowed),
(self.ineligible_domain_request, TransitionNotAllowed),
]
# Set all investigators to a user with no staff privs
user, _ = User.objects.get_or_create(username="pancakesyrup", is_staff=False)
set_domain_request_investigators(self.all_domain_requests, user)
self.assert_fsm_transition_raises_error(test_cases, "action_needed")
def test_action_needed_transition_not_allowed(self): def test_action_needed_transition_not_allowed(self):
""" """
@ -323,11 +456,8 @@ class TestDomainRequest(TestCase):
(self.action_needed_domain_request, TransitionNotAllowed), (self.action_needed_domain_request, TransitionNotAllowed),
(self.withdrawn_domain_request, TransitionNotAllowed), (self.withdrawn_domain_request, TransitionNotAllowed),
] ]
with less_console_noise():
for domain_request, exception_type in test_cases: self.assert_fsm_transition_raises_error(test_cases, "action_needed")
with self.subTest(domain_request=domain_request, exception_type=exception_type):
with self.assertRaises(exception_type):
domain_request.action_needed()
def test_approved_transition_allowed(self): def test_approved_transition_allowed(self):
""" """
@ -340,14 +470,40 @@ class TestDomainRequest(TestCase):
(self.rejected_domain_request, TransitionNotAllowed), (self.rejected_domain_request, TransitionNotAllowed),
] ]
with boto3_mocking.clients.handler_for("sesv2", self.mock_client): self.assert_fsm_transition_does_not_raise_error(test_cases, "approve")
with less_console_noise():
for domain_request, exception_type in test_cases: def test_approved_transition_not_allowed_with_no_investigator(self):
with self.subTest(domain_request=domain_request, exception_type=exception_type): """
try: Tests for attempting to transition without an investigator
domain_request.approve() """
except TransitionNotAllowed:
self.fail("TransitionNotAllowed was raised, but it was not expected.") test_cases = [
(self.in_review_domain_request, TransitionNotAllowed),
(self.action_needed_domain_request, TransitionNotAllowed),
(self.rejected_domain_request, TransitionNotAllowed),
]
# Set all investigators to none
set_domain_request_investigators(self.all_domain_requests, None)
self.assert_fsm_transition_raises_error(test_cases, "approve")
def test_approved_transition_not_allowed_with_investigator_not_staff(self):
"""
Tests for attempting to transition with an investigator that is not staff
"""
test_cases = [
(self.in_review_domain_request, TransitionNotAllowed),
(self.action_needed_domain_request, TransitionNotAllowed),
(self.rejected_domain_request, TransitionNotAllowed),
]
# Set all investigators to a user with no staff privs
user, _ = User.objects.get_or_create(username="pancakesyrup", is_staff=False)
set_domain_request_investigators(self.all_domain_requests, user)
self.assert_fsm_transition_raises_error(test_cases, "approve")
def test_approved_skips_sending_email(self): def test_approved_skips_sending_email(self):
""" """
@ -372,13 +528,7 @@ class TestDomainRequest(TestCase):
(self.withdrawn_domain_request, TransitionNotAllowed), (self.withdrawn_domain_request, TransitionNotAllowed),
(self.ineligible_domain_request, TransitionNotAllowed), (self.ineligible_domain_request, TransitionNotAllowed),
] ]
self.assert_fsm_transition_raises_error(test_cases, "approve")
with boto3_mocking.clients.handler_for("sesv2", self.mock_client):
with less_console_noise():
for domain_request, exception_type in test_cases:
with self.subTest(domain_request=domain_request, exception_type=exception_type):
with self.assertRaises(exception_type):
domain_request.approve()
def test_withdraw_transition_allowed(self): def test_withdraw_transition_allowed(self):
""" """
@ -390,14 +540,42 @@ class TestDomainRequest(TestCase):
(self.action_needed_domain_request, TransitionNotAllowed), (self.action_needed_domain_request, TransitionNotAllowed),
] ]
with boto3_mocking.clients.handler_for("sesv2", self.mock_client): self.assert_fsm_transition_does_not_raise_error(test_cases, "withdraw")
with less_console_noise():
for domain_request, exception_type in test_cases: def test_withdraw_transition_allowed_with_no_investigator(self):
with self.subTest(domain_request=domain_request, exception_type=exception_type): """
try: Tests for attempting to transition without an investigator.
domain_request.withdraw() For withdraw, this should be valid in all cases.
except TransitionNotAllowed: """
self.fail("TransitionNotAllowed was raised, but it was not expected.")
test_cases = [
(self.submitted_domain_request, TransitionNotAllowed),
(self.in_review_domain_request, TransitionNotAllowed),
(self.action_needed_domain_request, TransitionNotAllowed),
]
# Set all investigators to none
set_domain_request_investigators(self.all_domain_requests, None)
self.assert_fsm_transition_does_not_raise_error(test_cases, "withdraw")
def test_withdraw_transition_allowed_with_investigator_not_staff(self):
"""
Tests for attempting to transition when investigator is not staff.
For withdraw, this should be valid in all cases.
"""
test_cases = [
(self.submitted_domain_request, TransitionNotAllowed),
(self.in_review_domain_request, TransitionNotAllowed),
(self.action_needed_domain_request, TransitionNotAllowed),
]
# Set all investigators to a user with no staff privs
user, _ = User.objects.get_or_create(username="pancakesyrup", is_staff=False)
set_domain_request_investigators(self.all_domain_requests, user)
self.assert_fsm_transition_does_not_raise_error(test_cases, "withdraw")
def test_withdraw_transition_not_allowed(self): def test_withdraw_transition_not_allowed(self):
""" """
@ -411,12 +589,7 @@ class TestDomainRequest(TestCase):
(self.ineligible_domain_request, TransitionNotAllowed), (self.ineligible_domain_request, TransitionNotAllowed),
] ]
with boto3_mocking.clients.handler_for("sesv2", self.mock_client): self.assert_fsm_transition_raises_error(test_cases, "withdraw")
with less_console_noise():
for domain_request, exception_type in test_cases:
with self.subTest(domain_request=domain_request, exception_type=exception_type):
with self.assertRaises(exception_type):
domain_request.withdraw()
def test_reject_transition_allowed(self): def test_reject_transition_allowed(self):
""" """
@ -428,14 +601,40 @@ class TestDomainRequest(TestCase):
(self.approved_domain_request, TransitionNotAllowed), (self.approved_domain_request, TransitionNotAllowed),
] ]
with boto3_mocking.clients.handler_for("sesv2", self.mock_client): self.assert_fsm_transition_does_not_raise_error(test_cases, "reject")
with less_console_noise():
for domain_request, exception_type in test_cases: def test_reject_transition_not_allowed_with_no_investigator(self):
with self.subTest(domain_request=domain_request, exception_type=exception_type): """
try: Tests for attempting to transition without an investigator
domain_request.reject() """
except TransitionNotAllowed:
self.fail("TransitionNotAllowed was raised, but it was not expected.") test_cases = [
(self.in_review_domain_request, TransitionNotAllowed),
(self.action_needed_domain_request, TransitionNotAllowed),
(self.approved_domain_request, TransitionNotAllowed),
]
# Set all investigators to none
set_domain_request_investigators(self.all_domain_requests, None)
self.assert_fsm_transition_raises_error(test_cases, "reject")
def test_reject_transition_not_allowed_with_investigator_not_staff(self):
"""
Tests for attempting to transition when investigator is not staff
"""
test_cases = [
(self.in_review_domain_request, TransitionNotAllowed),
(self.action_needed_domain_request, TransitionNotAllowed),
(self.approved_domain_request, TransitionNotAllowed),
]
# Set all investigators to a user with no staff privs
user, _ = User.objects.get_or_create(username="pancakesyrup", is_staff=False)
set_domain_request_investigators(self.all_domain_requests, user)
self.assert_fsm_transition_raises_error(test_cases, "reject")
def test_reject_transition_not_allowed(self): def test_reject_transition_not_allowed(self):
""" """
@ -449,12 +648,7 @@ class TestDomainRequest(TestCase):
(self.ineligible_domain_request, TransitionNotAllowed), (self.ineligible_domain_request, TransitionNotAllowed),
] ]
with boto3_mocking.clients.handler_for("sesv2", self.mock_client): self.assert_fsm_transition_raises_error(test_cases, "reject")
with less_console_noise():
for domain_request, exception_type in test_cases:
with self.subTest(domain_request=domain_request, exception_type=exception_type):
with self.assertRaises(exception_type):
domain_request.reject()
def test_reject_with_prejudice_transition_allowed(self): def test_reject_with_prejudice_transition_allowed(self):
""" """
@ -467,14 +661,42 @@ class TestDomainRequest(TestCase):
(self.rejected_domain_request, TransitionNotAllowed), (self.rejected_domain_request, TransitionNotAllowed),
] ]
with boto3_mocking.clients.handler_for("sesv2", self.mock_client): self.assert_fsm_transition_does_not_raise_error(test_cases, "reject_with_prejudice")
with less_console_noise():
for domain_request, exception_type in test_cases: def test_reject_with_prejudice_transition_not_allowed_with_no_investigator(self):
with self.subTest(domain_request=domain_request, exception_type=exception_type): """
try: Tests for attempting to transition without an investigator
domain_request.reject_with_prejudice() """
except TransitionNotAllowed:
self.fail("TransitionNotAllowed was raised, but it was not expected.") test_cases = [
(self.in_review_domain_request, TransitionNotAllowed),
(self.action_needed_domain_request, TransitionNotAllowed),
(self.approved_domain_request, TransitionNotAllowed),
(self.rejected_domain_request, TransitionNotAllowed),
]
# Set all investigators to none
set_domain_request_investigators(self.all_domain_requests, None)
self.assert_fsm_transition_raises_error(test_cases, "reject_with_prejudice")
def test_reject_with_prejudice_not_allowed_with_investigator_not_staff(self):
"""
Tests for attempting to transition when investigator is not staff
"""
test_cases = [
(self.in_review_domain_request, TransitionNotAllowed),
(self.action_needed_domain_request, TransitionNotAllowed),
(self.approved_domain_request, TransitionNotAllowed),
(self.rejected_domain_request, TransitionNotAllowed),
]
# Set all investigators to a user with no staff privs
user, _ = User.objects.get_or_create(username="pancakesyrup", is_staff=False)
set_domain_request_investigators(self.all_domain_requests, user)
self.assert_fsm_transition_raises_error(test_cases, "reject_with_prejudice")
def test_reject_with_prejudice_transition_not_allowed(self): def test_reject_with_prejudice_transition_not_allowed(self):
""" """
@ -487,12 +709,7 @@ class TestDomainRequest(TestCase):
(self.ineligible_domain_request, TransitionNotAllowed), (self.ineligible_domain_request, TransitionNotAllowed),
] ]
with boto3_mocking.clients.handler_for("sesv2", self.mock_client): self.assert_fsm_transition_raises_error(test_cases, "reject_with_prejudice")
with less_console_noise():
for domain_request, exception_type in test_cases:
with self.subTest(domain_request=domain_request, exception_type=exception_type):
with self.assertRaises(exception_type):
domain_request.reject_with_prejudice()
def test_transition_not_allowed_approved_in_review_when_domain_is_active(self): def test_transition_not_allowed_approved_in_review_when_domain_is_active(self):
"""Create a domain request with status approved, create a matching domain that """Create a domain request with status approved, create a matching domain that
@ -666,7 +883,10 @@ class TestPermissions(TestCase):
def test_approval_creates_role(self): def test_approval_creates_role(self):
draft_domain, _ = DraftDomain.objects.get_or_create(name="igorville.gov") draft_domain, _ = DraftDomain.objects.get_or_create(name="igorville.gov")
user, _ = User.objects.get_or_create() user, _ = User.objects.get_or_create()
domain_request = DomainRequest.objects.create(creator=user, requested_domain=draft_domain) investigator, _ = User.objects.get_or_create(username="frenchtoast", is_staff=True)
domain_request = DomainRequest.objects.create(
creator=user, requested_domain=draft_domain, investigator=investigator
)
with boto3_mocking.clients.handler_for("sesv2", self.mock_client): with boto3_mocking.clients.handler_for("sesv2", self.mock_client):
with less_console_noise(): with less_console_noise():
@ -697,10 +917,12 @@ class TestDomainInformation(TestCase):
@boto3_mocking.patching @boto3_mocking.patching
def test_approval_creates_info(self): def test_approval_creates_info(self):
self.maxDiff = None
draft_domain, _ = DraftDomain.objects.get_or_create(name="igorville.gov") draft_domain, _ = DraftDomain.objects.get_or_create(name="igorville.gov")
user, _ = User.objects.get_or_create() user, _ = User.objects.get_or_create()
domain_request = DomainRequest.objects.create(creator=user, requested_domain=draft_domain, notes="test notes") investigator, _ = User.objects.get_or_create(username="frenchtoast", is_staff=True)
domain_request = DomainRequest.objects.create(
creator=user, requested_domain=draft_domain, notes="test notes", investigator=investigator
)
with boto3_mocking.clients.handler_for("sesv2", self.mock_client): with boto3_mocking.clients.handler_for("sesv2", self.mock_client):
with less_console_noise(): with less_console_noise():

View file

@ -324,7 +324,10 @@ class TestDomainCreation(MockEppLib):
with less_console_noise(): with less_console_noise():
draft_domain, _ = DraftDomain.objects.get_or_create(name="igorville.gov") draft_domain, _ = DraftDomain.objects.get_or_create(name="igorville.gov")
user, _ = User.objects.get_or_create() user, _ = User.objects.get_or_create()
domain_request = DomainRequest.objects.create(creator=user, requested_domain=draft_domain) investigator, _ = User.objects.get_or_create(username="frenchtoast", is_staff=True)
domain_request = DomainRequest.objects.create(
creator=user, requested_domain=draft_domain, investigator=investigator
)
mock_client = MockSESClient() mock_client = MockSESClient()
with boto3_mocking.clients.handler_for("sesv2", mock_client): with boto3_mocking.clients.handler_for("sesv2", mock_client):

View file

@ -71,6 +71,48 @@ class GenericError(Exception):
return self._error_mapping.get(code) return self._error_mapping.get(code)
class FSMErrorCodes(IntEnum):
"""Used when doing FSM transitions.
Overview of generic error codes:
- 1 APPROVE_DOMAIN_IN_USE The domain is already in use
- 2 NO_INVESTIGATOR No investigator is assigned
- 3 INVESTIGATOR_NOT_STAFF Investigator is a non-staff user
- 4 INVESTIGATOR_NOT_SUBMITTER The form submitter is not the investigator
"""
APPROVE_DOMAIN_IN_USE = 1
NO_INVESTIGATOR = 2
INVESTIGATOR_NOT_STAFF = 3
INVESTIGATOR_NOT_SUBMITTER = 4
class FSMApplicationError(Exception):
"""
Used to raise exceptions when doing FSM Transitions.
Uses `FSMErrorCodes` as an enum.
"""
_error_mapping = {
FSMErrorCodes.APPROVE_DOMAIN_IN_USE: ("Cannot approve. Requested domain is already in use."),
FSMErrorCodes.NO_INVESTIGATOR: ("Investigator is required for this status."),
FSMErrorCodes.INVESTIGATOR_NOT_STAFF: ("Investigator is not a staff user."),
FSMErrorCodes.INVESTIGATOR_NOT_SUBMITTER: ("Only the assigned investigator can make this change."),
}
def __init__(self, *args, code=None, **kwargs):
super().__init__(*args, **kwargs)
self.code = code
if self.code in self._error_mapping:
self.message = self._error_mapping.get(self.code)
def __str__(self):
return f"{self.message}"
@classmethod
def get_error_message(cls, code=None):
return cls._error_mapping.get(code)
class NameserverErrorCodes(IntEnum): class NameserverErrorCodes(IntEnum):
"""Used in the NameserverError class for """Used in the NameserverError class for
error mapping. error mapping.