Merge branch 'main' of https://github.com/cisagov/manage.get.gov into rh/2258-update-ao-to-so

This commit is contained in:
Rebecca Hsieh 2024-06-28 09:51:34 -07:00
commit dcc29dbcc7
No known key found for this signature in database
22 changed files with 536 additions and 84 deletions

View file

@ -426,6 +426,10 @@ function initializeWidgetOnList(list, parentId) {
let statusSelect = document.getElementById('id_status');
function moveStatusChangelog(actionNeededReasonFormGroup, statusSelect) {
if (!actionNeededReasonFormGroup || !statusSelect) {
return;
}
let flexContainer = actionNeededReasonFormGroup.querySelector('.flex-container');
let statusChangelog = document.getElementById('dja-status-changelog');

View file

@ -12,12 +12,11 @@ class Command(BaseCommand, PopulateScriptTemplate):
def handle(self, **kwargs):
"""Loops through each valid User object and updates its verification_type value"""
filter_condition = {"verification_type__isnull": True}
self.mass_populate_field(User, filter_condition, ["verification_type"])
self.mass_update_records(User, filter_condition, ["verification_type"])
def populate_field(self, field_to_update):
def update_record(self, record: User):
"""Defines how we update the verification_type field"""
field_to_update.set_user_verification_type()
record.set_user_verification_type()
logger.info(
f"{TerminalColors.OKCYAN}Updating {field_to_update} => "
f"{field_to_update.verification_type}{TerminalColors.OKCYAN}"
f"{TerminalColors.OKCYAN}Updating {record} => " f"{record.verification_type}{TerminalColors.OKCYAN}"
)

View file

@ -0,0 +1,94 @@
import logging
from django.core.management import BaseCommand
from registrar.management.commands.utility.terminal_helper import PopulateScriptTemplate, TerminalColors
from registrar.models import FederalAgency, DomainInformation
from registrar.utility.constants import BranchChoices
logger = logging.getLogger(__name__)
class Command(BaseCommand, PopulateScriptTemplate):
"""
This command uses the PopulateScriptTemplate,
which provides reusable logging and bulk updating functions for mass-updating fields.
"""
help = "Loops through each valid User object and updates its verification_type value"
prompt_title = "Do you wish to update all Federal Agencies?"
def handle(self, **kwargs):
"""Loops through each valid User object and updates the value of its verification_type field"""
# These are federal agencies that we don't have any data on.
# Independent agencies are considered "EXECUTIVE" here.
self.missing_records = {
"Christopher Columbus Fellowship Foundation": BranchChoices.EXECUTIVE,
"Commission for the Preservation of America's Heritage Abroad": BranchChoices.EXECUTIVE,
"Commission of Fine Arts": BranchChoices.EXECUTIVE,
"Committee for Purchase From People Who Are Blind or Severely Disabled": BranchChoices.EXECUTIVE,
"DC Court Services and Offender Supervision Agency": BranchChoices.EXECUTIVE,
"DC Pre-trial Services": BranchChoices.EXECUTIVE,
"Department of Agriculture": BranchChoices.EXECUTIVE,
"Dwight D. Eisenhower Memorial Commission": BranchChoices.LEGISLATIVE,
"Farm Credit System Insurance Corporation": BranchChoices.EXECUTIVE,
"Federal Financial Institutions Examination Council": BranchChoices.EXECUTIVE,
"Federal Judiciary": BranchChoices.JUDICIAL,
"Institute of Peace": BranchChoices.EXECUTIVE,
"International Boundary and Water Commission: United States and Mexico": BranchChoices.EXECUTIVE,
"International Boundary Commission: United States and Canada": BranchChoices.EXECUTIVE,
"International Joint Commission: United States and Canada": BranchChoices.EXECUTIVE,
"Legislative Branch": BranchChoices.LEGISLATIVE,
"National Foundation on the Arts and the Humanities": BranchChoices.EXECUTIVE,
"Nuclear Safety Oversight Committee": BranchChoices.EXECUTIVE,
"Office of Compliance": BranchChoices.LEGISLATIVE,
"Overseas Private Investment Corporation": BranchChoices.EXECUTIVE,
"Public Defender Service for the District of Columbia": BranchChoices.EXECUTIVE,
"The Executive Office of the President": BranchChoices.EXECUTIVE,
"U.S. Access Board": BranchChoices.EXECUTIVE,
"U.S. Agency for Global Media": BranchChoices.EXECUTIVE,
"U.S. China Economic and Security Review Commission": BranchChoices.LEGISLATIVE,
"U.S. Interagency Council on Homelessness": BranchChoices.EXECUTIVE,
"U.S. International Trade Commission": BranchChoices.EXECUTIVE,
"U.S. Postal Service": BranchChoices.EXECUTIVE,
"U.S. Trade and Development Agency": BranchChoices.EXECUTIVE,
"Udall Foundation": BranchChoices.EXECUTIVE,
"United States Arctic Research Commission": BranchChoices.EXECUTIVE,
"Utah Reclamation Mitigation and Conservation Commission": BranchChoices.EXECUTIVE,
"Vietnam Education Foundation": BranchChoices.EXECUTIVE,
"Woodrow Wilson International Center for Scholars": BranchChoices.EXECUTIVE,
"World War I Centennial Commission": BranchChoices.EXECUTIVE,
}
# Get all existing domain requests. Select_related allows us to skip doing db queries.
self.all_domain_infos = DomainInformation.objects.select_related("federal_agency")
self.mass_update_records(
FederalAgency, filter_conditions={"agency__isnull": False}, fields_to_update=["federal_type"]
)
def update_record(self, record: FederalAgency):
"""Defines how we update the federal_type field on each record."""
request = self.all_domain_infos.filter(federal_agency__agency=record.agency).first()
if request:
record.federal_type = request.federal_type
elif not request and record.agency in self.missing_records:
record.federal_type = self.missing_records.get(record.agency)
logger.info(f"{TerminalColors.OKCYAN}Updating {str(record)} => {record.federal_type}{TerminalColors.ENDC}")
def should_skip_record(self, record) -> bool: # noqa
"""Defines the conditions in which we should skip updating a record."""
requests = self.all_domain_infos.filter(federal_agency__agency=record.agency, federal_type__isnull=False)
# Check if all federal_type values are the same. Skip the record otherwise.
distinct_federal_types = requests.values("federal_type").distinct()
should_skip = distinct_federal_types.count() != 1
if should_skip and record.agency not in self.missing_records:
logger.info(
f"{TerminalColors.YELLOW}Skipping update for {str(record)} => count is "
f"{distinct_federal_types.count()} and records are {distinct_federal_types}{TerminalColors.ENDC}"
)
elif record.agency in self.missing_records:
logger.info(
f"{TerminalColors.MAGENTA}Missing data on {str(record)} - "
f"swapping to manual mapping{TerminalColors.ENDC}"
)
should_skip = False
return should_skip

View file

@ -61,56 +61,96 @@ class ScriptDataHelper:
class PopulateScriptTemplate(ABC):
"""
Contains an ABC for generic populate scripts
Contains an ABC for generic populate scripts.
This template provides reusable logging and bulk updating functions for
mass-updating fields.
"""
def mass_populate_field(self, sender, filter_conditions, fields_to_update):
"""Loops through each valid "sender" object - specified by filter_conditions - and
updates fields defined by fields_to_update using populate_function.
# Optional script-global config variables. For the most part, you can leave these untouched.
# Defines what prompt_for_execution displays as its header when you first start the script
prompt_title: str = "Do you wish to proceed?"
You must define populate_field before you can use this function.
# The header when printing the script run summary (after the script finishes)
run_summary_header = None
@abstractmethod
def update_record(self, record):
"""Defines how we update each field. Must be defined before using mass_update_records."""
raise NotImplementedError
def mass_update_records(self, object_class, filter_conditions, fields_to_update, debug=True):
"""Loops through each valid "object_class" object - specified by filter_conditions - and
updates fields defined by fields_to_update using update_record.
You must define update_record before you can use this function.
"""
objects = sender.objects.filter(**filter_conditions)
records = object_class.objects.filter(**filter_conditions)
readable_class_name = self.get_class_name(object_class)
# Code execution will stop here if the user prompts "N"
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
info_to_inspect=f"""
==Proposed Changes==
Number of {sender} objects to change: {len(objects)}
Number of {readable_class_name} objects to change: {len(records)}
These fields will be updated on each record: {fields_to_update}
""",
prompt_title="Do you wish to patch this data?",
prompt_title=self.prompt_title,
)
logger.info("Updating...")
to_update: List[sender] = []
failed_to_update: List[sender] = []
for updated_object in objects:
to_update: List[object_class] = []
to_skip: List[object_class] = []
failed_to_update: List[object_class] = []
for record in records:
try:
self.populate_field(updated_object)
to_update.append(updated_object)
if not self.should_skip_record(record):
self.update_record(record)
to_update.append(record)
else:
to_skip.append(record)
except Exception as err:
failed_to_update.append(updated_object)
fail_message = self.get_failure_message(record)
failed_to_update.append(record)
logger.error(err)
logger.error(f"{TerminalColors.FAIL}" f"Failed to update {updated_object}" f"{TerminalColors.ENDC}")
logger.error(fail_message)
# Do a bulk update on the first_ready field
ScriptDataHelper.bulk_update_fields(sender, to_update, fields_to_update)
# Do a bulk update on the desired field
ScriptDataHelper.bulk_update_fields(object_class, to_update, fields_to_update)
# Log what happened
TerminalHelper.log_script_run_summary(to_update, failed_to_update, skipped=[], debug=True)
TerminalHelper.log_script_run_summary(
to_update,
failed_to_update,
to_skip,
debug=debug,
log_header=self.run_summary_header,
display_as_str=True,
)
@abstractmethod
def populate_field(self, field_to_update):
"""Defines how we update each field. Must be defined before using mass_populate_field."""
raise NotImplementedError
def get_class_name(self, sender) -> str:
"""Returns the class name that we want to display for the terminal prompt.
Example: DomainRequest => "Domain Request"
"""
return sender._meta.verbose_name if getattr(sender, "_meta") else sender
def get_failure_message(self, record) -> str:
"""Returns the message that we will display if a record fails to update"""
return f"{TerminalColors.FAIL}" f"Failed to update {record}" f"{TerminalColors.ENDC}"
def should_skip_record(self, record) -> bool: # noqa
"""Defines the condition in which we should skip updating a record. Override as needed."""
# By default - don't skip
return False
class TerminalHelper:
@staticmethod
def log_script_run_summary(to_update, failed_to_update, skipped, debug: bool, log_header=None):
def log_script_run_summary(
to_update, failed_to_update, skipped, debug: bool, log_header=None, display_as_str=False
):
"""Prints success, failed, and skipped counts, as well as
all affected objects."""
update_success_count = len(to_update)
@ -121,20 +161,24 @@ class TerminalHelper:
log_header = "============= FINISHED ==============="
# Prepare debug messages
debug_messages = {
"success": (f"{TerminalColors.OKCYAN}Updated: {to_update}{TerminalColors.ENDC}\n"),
"skipped": (f"{TerminalColors.YELLOW}Skipped: {skipped}{TerminalColors.ENDC}\n"),
"failed": (f"{TerminalColors.FAIL}Failed: {failed_to_update}{TerminalColors.ENDC}\n"),
}
if debug:
updated_display = [str(u) for u in to_update] if display_as_str else to_update
skipped_display = [str(s) for s in skipped] if display_as_str else skipped
failed_display = [str(f) for f in failed_to_update] if display_as_str else failed_to_update
debug_messages = {
"success": (f"{TerminalColors.OKCYAN}Updated: {updated_display}{TerminalColors.ENDC}\n"),
"skipped": (f"{TerminalColors.YELLOW}Skipped: {skipped_display}{TerminalColors.ENDC}\n"),
"failed": (f"{TerminalColors.FAIL}Failed: {failed_display}{TerminalColors.ENDC}\n"),
}
# Print out a list of everything that was changed, if we have any changes to log.
# Otherwise, don't print anything.
TerminalHelper.print_conditional(
debug,
f"{debug_messages.get('success') if update_success_count > 0 else ''}"
f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}"
f"{debug_messages.get('failed') if update_failed_count > 0 else ''}",
)
# Print out a list of everything that was changed, if we have any changes to log.
# Otherwise, don't print anything.
TerminalHelper.print_conditional(
debug,
f"{debug_messages.get('success') if update_success_count > 0 else ''}"
f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}"
f"{debug_messages.get('failed') if update_failed_count > 0 else ''}",
)
if update_failed_count == 0 and update_skipped_count == 0:
logger.info(

View file

@ -7,6 +7,7 @@ from django.conf import settings
from django.db import models
from django_fsm import FSMField, transition # type: ignore
from django.utils import timezone
from waffle import flag_is_active
from registrar.models.domain import Domain
from registrar.models.federal_agency import FederalAgency
from registrar.models.utility.generic_helper import CreateOrUpdateOrganizationTypeHelper
@ -17,8 +18,6 @@ from .utility.time_stamped_model import TimeStampedModel
from ..utility.email import send_templated_email, EmailSendingError
from itertools import chain
from waffle.decorators import flag_is_active
logger = logging.getLogger(__name__)
@ -675,34 +674,50 @@ class DomainRequest(TimeStampedModel):
def _send_status_update_email(
self, new_status, email_template, email_template_subject, send_email=True, bcc_address="", wrap_email=False
):
"""Send a status update email to the submitter.
"""Send a status update email to the creator.
The email goes to the email address that the submitter gave as their
contact information. If there is not submitter information, then do
The email goes to the email address that the creator gave as their
contact information. If there is not creator information, then do
nothing.
If the waffle flag "profile_feature" is active, then this email will be sent to the
domain request creator rather than the submitter
send_email: bool -> Used to bypass the send_templated_email function, in the event
we just want to log that an email would have been sent, rather than actually sending one.
wrap_email: bool -> Wraps emails using `wrap_text_and_preserve_paragraphs` if any given
paragraph exceeds our desired max length (for prettier display).
"""
if self.submitter is None or self.submitter.email is None:
logger.warning(f"Cannot send {new_status} email, no submitter email address.")
recipient = self.creator if flag_is_active(None, "profile_feature") else self.submitter
if recipient is None or recipient.email is None:
logger.warning(
f"Cannot send {new_status} email, no creator email address for domain request with pk: {self.pk}."
f" Name: {self.requested_domain.name}"
if self.requested_domain
else ""
)
return None
if not send_email:
logger.info(f"Email was not sent. Would send {new_status} email: {self.submitter.email}")
logger.info(f"Email was not sent. Would send {new_status} email to: {recipient.email}")
return None
try:
send_templated_email(
email_template,
email_template_subject,
self.submitter.email,
context={"domain_request": self},
recipient.email,
context={
"domain_request": self,
# This is the user that we refer to in the email
"recipient": recipient,
},
bcc_address=bcc_address,
wrap_email=wrap_email,
)
logger.info(f"The {new_status} email sent to: {self.submitter.email}")
logger.info(f"The {new_status} email sent to: {recipient.email}")
except EmailSendingError:
logger.warning("Failed to send confirmation email", exc_info=True)

View file

@ -1,5 +1,5 @@
{% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #}
Hi, {{ domain_request.submitter.first_name }}.
Hi, {{ recipient.first_name }}.
We've identified an action that youll need to complete before we continue reviewing your .gov domain request.

View file

@ -1,5 +1,5 @@
{% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #}
Hi, {{ domain_request.submitter.first_name }}.
Hi, {{ recipient.first_name }}.
We've identified an action that youll need to complete before we continue reviewing your .gov domain request.

View file

@ -1,5 +1,5 @@
{% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #}
Hi, {{ domain_request.submitter.first_name }}.
Hi, {{ recipient.first_name }}.
We've identified an action that youll need to complete before we continue reviewing your .gov domain request.

View file

@ -1,5 +1,5 @@
{% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #}
Hi, {{ domain_request.submitter.first_name }}.
Hi, {{ recipient.first_name }}.
We've identified an action that youll need to complete before we continue reviewing your .gov domain request.

View file

@ -1,5 +1,5 @@
{% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #}
Hi, {{ domain_request.submitter.first_name }}.
Hi, {{ recipient.first_name }}.
Your .gov domain request has been withdrawn and will not be reviewed by our team.

View file

@ -43,7 +43,7 @@ Purpose of your domain:
{{ domain_request.purpose }}
Your contact information:
{% spaceless %}{% include "emails/includes/contact.txt" with contact=domain_request.submitter %}{% endspaceless %}
{% spaceless %}{% include "emails/includes/contact.txt" with contact=recipient %}{% endspaceless %}
Other employees from your organization:{% for other in domain_request.other_contacts.all %}
{% spaceless %}{% include "emails/includes/contact.txt" with contact=other %}{% endspaceless %}

View file

@ -1,5 +1,5 @@
{% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #}
Hi, {{ domain_request.submitter.first_name }}.
Hi, {{ recipient.first_name }}.
Congratulations! Your .gov domain request has been approved.

View file

@ -1,5 +1,5 @@
{% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #}
Hi, {{ domain_request.submitter.first_name }}.
Hi, {{ recipient.first_name }}.
Your .gov domain request has been rejected.

View file

@ -1,5 +1,5 @@
{% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #}
Hi, {{ domain_request.submitter.first_name }}.
Hi, {{ recipient.first_name }}.
We received your .gov domain request.

View file

@ -858,6 +858,7 @@ def completed_domain_request( # noqa
is_election_board=False,
organization_type=None,
federal_agency=None,
federal_type=None,
action_needed_reason=None,
):
"""A completed domain request."""
@ -923,6 +924,9 @@ def completed_domain_request( # noqa
if organization_type:
domain_request_kwargs["organization_type"] = organization_type
if federal_type:
domain_request_kwargs["federal_type"] = federal_type
if action_needed_reason:
domain_request_kwargs["action_needed_reason"] = action_needed_reason

View file

@ -944,7 +944,7 @@ class TestDomainRequestAdminForm(TestCase):
self.assertIn("rejection_reason", form.errors)
rejection_reason = form.errors.get("rejection_reason")
self.assertEqual(rejection_reason, ["A rejection reason is required."])
self.assertEqual(rejection_reason, ["A reason is required for this status."])
def test_form_choices_when_no_instance(self):
with less_console_noise():
@ -1929,7 +1929,7 @@ class TestDomainRequestAdmin(MockEppLib):
messages.error.assert_called_once_with(
request,
"A rejection reason is required.",
"A reason is required for this status.",
)
domain_request.refresh_from_db()

View file

@ -2,6 +2,7 @@ import copy
from datetime import date, datetime, time
from django.core.management import call_command
from django.test import TestCase, override_settings
from registrar.utility.constants import BranchChoices
from django.utils import timezone
from django.utils.module_loading import import_string
import logging
@ -1112,3 +1113,115 @@ class TestImportTables(TestCase):
# Check that logger.error was called with the correct message
mock_logger.error.assert_called_once_with("Zip file tmp/exported_tables.zip does not exist.")
class TestTransferFederalAgencyType(TestCase):
"""Tests for the transfer_federal_agency_type script"""
def setUp(self):
"""Creates a fake domain object"""
super().setUp()
self.amtrak, _ = FederalAgency.objects.get_or_create(agency="AMTRAK")
self.legislative_branch, _ = FederalAgency.objects.get_or_create(agency="Legislative Branch")
self.library_of_congress, _ = FederalAgency.objects.get_or_create(agency="Library of Congress")
self.gov_admin, _ = FederalAgency.objects.get_or_create(agency="gov Administration")
self.domain_request_1 = completed_domain_request(
name="testgov.gov",
federal_agency=self.amtrak,
federal_type=BranchChoices.EXECUTIVE,
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
)
self.domain_request_2 = completed_domain_request(
name="cheesefactory.gov",
federal_agency=self.legislative_branch,
federal_type=BranchChoices.LEGISLATIVE,
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
)
self.domain_request_3 = completed_domain_request(
name="meowardslaw.gov",
federal_agency=self.library_of_congress,
federal_type=BranchChoices.JUDICIAL,
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
)
# Duplicate fields with invalid data - we expect to skip updating these
self.domain_request_4 = completed_domain_request(
name="baddata.gov",
federal_agency=self.gov_admin,
federal_type=BranchChoices.EXECUTIVE,
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
)
self.domain_request_5 = completed_domain_request(
name="worsedata.gov",
federal_agency=self.gov_admin,
federal_type=BranchChoices.JUDICIAL,
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
)
self.domain_request_1.approve()
self.domain_request_2.approve()
self.domain_request_3.approve()
self.domain_request_4.approve()
self.domain_request_5.approve()
def tearDown(self):
"""Deletes all DB objects related to migrations"""
super().tearDown()
# Delete domains and related information
Domain.objects.all().delete()
DomainInformation.objects.all().delete()
DomainRequest.objects.all().delete()
User.objects.all().delete()
Contact.objects.all().delete()
Website.objects.all().delete()
FederalAgency.objects.all().delete()
def run_transfer_federal_agency_type(self):
"""
This method executes the transfer_federal_agency_type command.
The 'call_command' function from Django's management framework is then used to
execute the populate_first_ready command with the specified arguments.
"""
with less_console_noise():
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa
return_value=True,
):
call_command("transfer_federal_agency_type")
@less_console_noise_decorator
def test_transfer_federal_agency_type_script(self):
"""
Tests that the transfer_federal_agency_type script updates what we expect, and skips what we expect
"""
# Before proceeding, make sure we don't have any data contamination
tested_agencies = [
self.amtrak,
self.legislative_branch,
self.library_of_congress,
self.gov_admin,
]
for agency in tested_agencies:
self.assertEqual(agency.federal_type, None)
# Run the script
self.run_transfer_federal_agency_type()
# Refresh the local db instance to reflect changes
self.amtrak.refresh_from_db()
self.legislative_branch.refresh_from_db()
self.library_of_congress.refresh_from_db()
self.gov_admin.refresh_from_db()
# Test the values that we expect to be updated
self.assertEqual(self.amtrak.federal_type, BranchChoices.EXECUTIVE)
self.assertEqual(self.legislative_branch.federal_type, BranchChoices.LEGISLATIVE)
self.assertEqual(self.library_of_congress.federal_type, BranchChoices.JUDICIAL)
# We don't expect this field to be updated (as it has duplicate data)
self.assertEqual(self.gov_admin.federal_type, None)

View file

@ -25,6 +25,7 @@ from registrar.utility.constants import BranchChoices
from .common import MockSESClient, less_console_noise, completed_domain_request, set_domain_request_investigators
from django_fsm import TransitionNotAllowed
from waffle.testutils import override_flag
# Test comment for push -- will remove
@ -33,29 +34,44 @@ from django_fsm import TransitionNotAllowed
@boto3_mocking.patching
class TestDomainRequest(TestCase):
def setUp(self):
self.dummy_user, _ = Contact.objects.get_or_create(
email="mayor@igorville.com", first_name="Hello", last_name="World"
)
self.dummy_user_2, _ = User.objects.get_or_create(
username="intern@igorville.com", email="intern@igorville.com", first_name="Lava", last_name="World"
)
self.started_domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.STARTED, name="started.gov"
status=DomainRequest.DomainRequestStatus.STARTED,
name="started.gov",
)
self.submitted_domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.SUBMITTED, name="submitted.gov"
status=DomainRequest.DomainRequestStatus.SUBMITTED,
name="submitted.gov",
)
self.in_review_domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.IN_REVIEW, name="in-review.gov"
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
name="in-review.gov",
)
self.action_needed_domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.ACTION_NEEDED, name="action-needed.gov"
status=DomainRequest.DomainRequestStatus.ACTION_NEEDED,
name="action-needed.gov",
)
self.approved_domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.APPROVED, name="approved.gov"
status=DomainRequest.DomainRequestStatus.APPROVED,
name="approved.gov",
)
self.withdrawn_domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.WITHDRAWN, name="withdrawn.gov"
status=DomainRequest.DomainRequestStatus.WITHDRAWN,
name="withdrawn.gov",
)
self.rejected_domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.REJECTED, name="rejected.gov"
status=DomainRequest.DomainRequestStatus.REJECTED,
name="rejected.gov",
)
self.ineligible_domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.INELIGIBLE, name="ineligible.gov"
status=DomainRequest.DomainRequestStatus.INELIGIBLE,
name="ineligible.gov",
)
# Store all domain request statuses in a variable for ease of use
@ -199,7 +215,9 @@ class TestDomainRequest(TestCase):
domain_request.submit()
self.assertEqual(domain_request.status, domain_request.DomainRequestStatus.SUBMITTED)
def check_email_sent(self, domain_request, msg, action, expected_count):
def check_email_sent(
self, domain_request, msg, action, expected_count, expected_content=None, expected_email="mayor@igorville.com"
):
"""Check if an email was sent after performing an action."""
with self.subTest(msg=msg, action=action):
@ -213,19 +231,35 @@ class TestDomainRequest(TestCase):
sent_emails = [
email
for email in MockSESClient.EMAILS_SENT
if "mayor@igorville.gov" in email["kwargs"]["Destination"]["ToAddresses"]
if expected_email in email["kwargs"]["Destination"]["ToAddresses"]
]
self.assertEqual(len(sent_emails), expected_count)
if expected_content:
email_content = sent_emails[0]["kwargs"]["Content"]["Simple"]["Body"]["Text"]["Data"]
self.assertIn(expected_content, email_content)
@override_flag("profile_feature", active=False)
def test_submit_from_started_sends_email(self):
msg = "Create a domain request and submit it and see if email was sent."
domain_request = completed_domain_request()
self.check_email_sent(domain_request, msg, "submit", 1)
domain_request = completed_domain_request(submitter=self.dummy_user, user=self.dummy_user_2)
self.check_email_sent(domain_request, msg, "submit", 1, expected_content="Hello")
@override_flag("profile_feature", active=True)
def test_submit_from_started_sends_email_to_creator(self):
"""Tests if, when the profile feature flag is on, we send an email to the creator"""
msg = "Create a domain request and submit it and see if email was sent when the feature flag is on."
domain_request = completed_domain_request(submitter=self.dummy_user, user=self.dummy_user_2)
self.check_email_sent(
domain_request, msg, "submit", 1, expected_content="Lava", expected_email="intern@igorville.com"
)
def test_submit_from_withdrawn_sends_email(self):
msg = "Create a withdrawn domain request and submit it and see if email was sent."
domain_request = completed_domain_request(status=DomainRequest.DomainRequestStatus.WITHDRAWN)
self.check_email_sent(domain_request, msg, "submit", 1)
domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.WITHDRAWN, submitter=self.dummy_user
)
self.check_email_sent(domain_request, msg, "submit", 1, expected_content="Hello")
def test_submit_from_action_needed_does_not_send_email(self):
msg = "Create a domain request with ACTION_NEEDED status and submit it, check if email was not sent."
@ -239,18 +273,24 @@ class TestDomainRequest(TestCase):
def test_approve_sends_email(self):
msg = "Create a domain request and approve it and see if email was sent."
domain_request = completed_domain_request(status=DomainRequest.DomainRequestStatus.IN_REVIEW)
self.check_email_sent(domain_request, msg, "approve", 1)
domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.IN_REVIEW, submitter=self.dummy_user
)
self.check_email_sent(domain_request, msg, "approve", 1, expected_content="Hello")
def test_withdraw_sends_email(self):
msg = "Create a domain request and withdraw it and see if email was sent."
domain_request = completed_domain_request(status=DomainRequest.DomainRequestStatus.IN_REVIEW)
self.check_email_sent(domain_request, msg, "withdraw", 1)
domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.IN_REVIEW, submitter=self.dummy_user
)
self.check_email_sent(domain_request, msg, "withdraw", 1, expected_content="Hello")
def test_reject_sends_email(self):
msg = "Create a domain request and reject it and see if email was sent."
domain_request = completed_domain_request(status=DomainRequest.DomainRequestStatus.APPROVED)
self.check_email_sent(domain_request, msg, "reject", 1)
domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.APPROVED, submitter=self.dummy_user
)
self.check_email_sent(domain_request, msg, "reject", 1, expected_content="Hello")
def test_reject_with_prejudice_does_not_send_email(self):
msg = "Create a domain request and reject it with prejudice and see if email was sent."

View file

@ -101,7 +101,7 @@ class FSMDomainRequestError(Exception):
FSMErrorCodes.NO_INVESTIGATOR: ("Investigator is required for this status."),
FSMErrorCodes.INVESTIGATOR_NOT_STAFF: ("Investigator is not a staff user."),
FSMErrorCodes.INVESTIGATOR_NOT_SUBMITTER: ("Only the assigned investigator can make this change."),
FSMErrorCodes.NO_REJECTION_REASON: ("A rejection reason is required."),
FSMErrorCodes.NO_REJECTION_REASON: ("A reason is required for this status."),
FSMErrorCodes.NO_ACTION_NEEDED_REASON: ("A reason is required for this status."),
}