domain invitations now send email from django admin for both domain and portfolio invitations, consolidated error handling

This commit is contained in:
David Kennedy 2025-01-07 17:04:02 -05:00
parent ab491d1507
commit c2d17442f8
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B
6 changed files with 264 additions and 179 deletions

View file

@ -14,6 +14,7 @@ from django.db.models import (
from django.db.models.functions import Concat, Coalesce from django.db.models.functions import Concat, Coalesce
from django.http import HttpResponseRedirect from django.http import HttpResponseRedirect
from registrar.models.federal_agency import FederalAgency from registrar.models.federal_agency import FederalAgency
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.utility.admin_helpers import ( from registrar.utility.admin_helpers import (
AutocompleteSelectWithPlaceholder, AutocompleteSelectWithPlaceholder,
get_action_needed_reason_default_email, get_action_needed_reason_default_email,
@ -27,8 +28,12 @@ from django.shortcuts import redirect
from django_fsm import get_available_FIELD_transitions, FSMField from django_fsm import get_available_FIELD_transitions, FSMField
from registrar.models import DomainInformation, Portfolio, UserPortfolioPermission, DomainInvitation from registrar.models import DomainInformation, Portfolio, UserPortfolioPermission, DomainInvitation
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from registrar.utility.email import EmailSendingError from registrar.utility.email_invitations import send_domain_invitation_email, send_portfolio_invitation_email
from registrar.utility.email_invitations import send_portfolio_invitation_email from registrar.views.utility.portfolio_helper import (
get_org_membership,
get_requested_user,
handle_invitation_exceptions,
)
from waffle.decorators import flag_is_active from waffle.decorators import flag_is_active
from django.contrib import admin, messages from django.contrib import admin, messages
from django.contrib.auth.admin import UserAdmin as BaseUserAdmin from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
@ -41,7 +46,7 @@ from waffle.admin import FlagAdmin
from waffle.models import Sample, Switch from waffle.models import Sample, Switch
from registrar.models import Contact, Domain, DomainRequest, DraftDomain, User, Website, SeniorOfficial from registrar.models import Contact, Domain, DomainRequest, DraftDomain, User, Website, SeniorOfficial
from registrar.utility.constants import BranchChoices from registrar.utility.constants import BranchChoices
from registrar.utility.errors import FSMDomainRequestError, FSMErrorCodes, MissingEmailError from registrar.utility.errors import FSMDomainRequestError, FSMErrorCodes
from registrar.utility.waffle import flag_is_active_for_user from registrar.utility.waffle import flag_is_active_for_user
from registrar.views.utility.mixins import OrderableFieldsMixin from registrar.views.utility.mixins import OrderableFieldsMixin
from django.contrib.admin.views.main import ORDER_VAR from django.contrib.admin.views.main import ORDER_VAR
@ -1442,11 +1447,108 @@ class DomainInvitationAdmin(ListHeaderAdmin):
which will be successful if a single User exists for that email; otherwise, will which will be successful if a single User exists for that email; otherwise, will
just continue to create the invitation. just continue to create the invitation.
""" """
if not change and User.objects.filter(email=obj.email).count() == 1: if not change:
# Domain Invitation creation for an existing User domain = obj.domain
obj.retrieve() domain_org = domain.domain_info.portfolio
# Call the parent save method to save the object requested_email = obj.email
super().save_model(request, obj, form, change) # Look up a user with that email
requested_user = get_requested_user(requested_email)
requestor = request.user
member_of_a_different_org, member_of_this_org = get_org_membership(
domain_org, requested_email, requested_user
)
try:
if (
flag_is_active(request, "organization_feature")
and not flag_is_active(request, "multiple_portfolios")
and domain_org is not None
and not member_of_this_org
):
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=domain_org)
PortfolioInvitation.objects.get_or_create(
email=requested_email,
portfolio=domain_org,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
messages.success(request, f"{requested_email} has been invited to the organization: {domain_org}")
send_domain_invitation_email(
email=requested_email,
requestor=requestor,
domain=domain,
is_member_of_different_org=member_of_a_different_org,
)
if requested_user is not None:
# Domain Invitation creation for an existing User
obj.retrieve()
# Call the parent save method to save the object
super().save_model(request, obj, form, change)
messages.success(request, f"{requested_email} has been invited to the domain: {domain}")
except Exception as e:
handle_invitation_exceptions(request, e, requested_email)
return
else:
# Call the parent save method to save the object
super().save_model(request, obj, form, change)
def response_add(self, request, obj, post_url_continue=None):
"""
Override response_add to handle rendering when exceptions are raised during add model.
Normal flow on successful save_model on add is to redirect to changelist_view.
If there are errors, flow is modified to instead render change form.
"""
# Check if there are any error or warning messages in the `messages` framework
storage = get_messages(request)
has_errors = any(message.level_tag in ["error", "warning"] for message in storage)
if has_errors:
# Re-render the change form if there are errors or warnings
# Prepare context for rendering the change form
# Get the model form
ModelForm = self.get_form(request, obj=obj)
form = ModelForm(instance=obj)
# Create an AdminForm instance
admin_form = AdminForm(
form,
list(self.get_fieldsets(request, obj)),
self.get_prepopulated_fields(request, obj),
self.get_readonly_fields(request, obj),
model_admin=self,
)
media = self.media + form.media
opts = obj._meta
change_form_context = {
**self.admin_site.each_context(request), # Add admin context
"title": f"Add {opts.verbose_name}",
"opts": opts,
"original": obj,
"save_as": self.save_as,
"has_change_permission": self.has_change_permission(request, obj),
"add": True, # Indicate this is an "Add" form
"change": False, # Indicate this is not a "Change" form
"is_popup": False,
"inline_admin_formsets": [],
"save_on_top": self.save_on_top,
"show_delete": self.has_delete_permission(request, obj),
"obj": obj,
"adminform": admin_form, # Pass the AdminForm instance
"media": media,
"errors": None,
}
return self.render_change_form(
request,
context=change_form_context,
add=True,
change=False,
obj=obj,
)
return super().response_add(request, obj, post_url_continue)
class PortfolioInvitationAdmin(ListHeaderAdmin): class PortfolioInvitationAdmin(ListHeaderAdmin):
@ -1523,36 +1625,11 @@ class PortfolioInvitationAdmin(ListHeaderAdmin):
messages.warning(request, "User is already a member of this portfolio.") messages.warning(request, "User is already a member of this portfolio.")
except Exception as e: except Exception as e:
# when exception is raised, handle and do not save the model # when exception is raised, handle and do not save the model
self._handle_exceptions(e, request, obj) handle_invitation_exceptions(request, e, requested_email)
return return
# Call the parent save method to save the object # Call the parent save method to save the object
super().save_model(request, obj, form, change) super().save_model(request, obj, form, change)
def _handle_exceptions(self, exception, request, obj):
"""Handle exceptions raised during the process.
Log warnings / errors, and message errors to the user.
"""
if isinstance(exception, EmailSendingError):
logger.warning(
"Could not sent email invitation to %s for portfolio %s (EmailSendingError)",
obj.email,
obj.portfolio,
exc_info=True,
)
messages.error(request, "Could not send email invitation. Portfolio invitation not saved.")
elif isinstance(exception, MissingEmailError):
messages.error(request, str(exception))
logger.error(
f"Can't send email to '{obj.email}' for portfolio '{obj.portfolio}'. "
f"No email exists for the requestor.",
exc_info=True,
)
else:
logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
messages.error(request, "Could not send email invitation. Portfolio invitation not saved.")
def response_add(self, request, obj, post_url_continue=None): def response_add(self, request, obj, post_url_continue=None):
""" """
Override response_add to handle rendering when exceptions are raised during add model. Override response_add to handle rendering when exceptions are raised during add model.

View file

@ -56,6 +56,8 @@ class DomainInvitation(TimeStampedModel):
Raises: Raises:
RuntimeError if no matching user can be found. RuntimeError if no matching user can be found.
""" """
# NOTE: this is currently not accounting for scenario when User.objects.get matches
# multiple user accounts with the same email address
# get a user with this email address # get a user with this email address
User = get_user_model() User = get_user_model()

View file

@ -7,7 +7,7 @@ from registrar.utility.errors import (
OutsideOrgMemberError, OutsideOrgMemberError,
) )
from registrar.utility.waffle import flag_is_active_for_user from registrar.utility.waffle import flag_is_active_for_user
from registrar.utility.email import send_templated_email from registrar.utility.email import EmailSendingError, send_templated_email
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -38,7 +38,7 @@ def send_domain_invitation_email(email: str, requestor, domain, is_member_of_dif
# Check if the requestor is staff and has an email # Check if the requestor is staff and has an email
if not requestor.is_staff: if not requestor.is_staff:
if not requestor.email or requestor.email.strip() == "": if not requestor.email or requestor.email.strip() == "":
raise MissingEmailError raise MissingEmailError(email=email, domain=domain)
else: else:
requestor_email = requestor.email requestor_email = requestor.email
@ -65,15 +65,18 @@ def send_domain_invitation_email(email: str, requestor, domain, is_member_of_dif
pass pass
# Send the email # Send the email
send_templated_email( try:
"emails/domain_invitation.txt", send_templated_email(
"emails/domain_invitation_subject.txt", "emails/domain_invitation.txt",
to_address=email, "emails/domain_invitation_subject.txt",
context={ to_address=email,
"domain": domain, context={
"requestor_email": requestor_email, "domain": domain,
}, "requestor_email": requestor_email,
) },
)
except EmailSendingError as err:
raise EmailSendingError(f"Could not send email invitation to {email} for domain {domain}.") from err
def send_portfolio_invitation_email(email: str, requestor, portfolio): def send_portfolio_invitation_email(email: str, requestor, portfolio):
@ -98,17 +101,22 @@ def send_portfolio_invitation_email(email: str, requestor, portfolio):
# Check if the requestor is staff and has an email # Check if the requestor is staff and has an email
if not requestor.is_staff: if not requestor.is_staff:
if not requestor.email or requestor.email.strip() == "": if not requestor.email or requestor.email.strip() == "":
raise MissingEmailError raise MissingEmailError(email=email, portfolio=portfolio)
else: else:
requestor_email = requestor.email requestor_email = requestor.email
send_templated_email( try:
"emails/portfolio_invitation.txt", send_templated_email(
"emails/portfolio_invitation_subject.txt", "emails/portfolio_invitation.txt",
to_address=email, "emails/portfolio_invitation_subject.txt",
context={ to_address=email,
"portfolio": portfolio, context={
"requestor_email": requestor_email, "portfolio": portfolio,
"email": email, "requestor_email": requestor_email,
}, "email": email,
) },
)
except EmailSendingError as err:
raise EmailSendingError(
f"Could not sent email invitation to {email} for portfolio {portfolio}. Portfolio invitation not saved."
) from err

View file

@ -46,8 +46,17 @@ class AlreadyDomainInvitedError(InvitationError):
class MissingEmailError(InvitationError): class MissingEmailError(InvitationError):
"""Raised when the requestor has no email associated with their account.""" """Raised when the requestor has no email associated with their account."""
def __init__(self): def __init__(self, email=None, domain=None, portfolio=None):
super().__init__("Can't send invitation email. No email is associated with your user account.") # Default message if no additional info is provided
message = "Can't send invitation email. No email is associated with your user account."
# Customize message based on provided arguments
if email and domain:
message = f"Can't send email to '{email}' on domain '{domain}'. No email exists for the requestor."
elif email and portfolio:
message = f"Can't send email to '{email}' for portfolio '{portfolio}'. No email exists for the requestor."
super().__init__(message)
class OutsideOrgMemberError(ValueError): class OutsideOrgMemberError(ValueError):

View file

@ -10,7 +10,6 @@ import logging
import requests import requests
from django.contrib import messages from django.contrib import messages
from django.contrib.messages.views import SuccessMessageMixin from django.contrib.messages.views import SuccessMessageMixin
from django.db import IntegrityError
from django.http import HttpResponseRedirect from django.http import HttpResponseRedirect
from django.shortcuts import redirect from django.shortcuts import redirect
from django.urls import reverse from django.urls import reverse
@ -31,22 +30,23 @@ from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
from registrar.utility.enums import DefaultEmail from registrar.utility.enums import DefaultEmail
from registrar.utility.errors import ( from registrar.utility.errors import (
AlreadyDomainInvitedError,
AlreadyDomainManagerError,
GenericError, GenericError,
GenericErrorCodes, GenericErrorCodes,
MissingEmailError,
NameserverError, NameserverError,
NameserverErrorCodes as nsErrorCodes, NameserverErrorCodes as nsErrorCodes,
DsDataError, DsDataError,
DsDataErrorCodes, DsDataErrorCodes,
SecurityEmailError, SecurityEmailError,
SecurityEmailErrorCodes, SecurityEmailErrorCodes,
OutsideOrgMemberError,
) )
from registrar.models.utility.contact_error import ContactError from registrar.models.utility.contact_error import ContactError
from registrar.views.utility.permission_views import UserDomainRolePermissionDeleteView from registrar.views.utility.permission_views import UserDomainRolePermissionDeleteView
from registrar.utility.waffle import flag_is_active_for_user from registrar.utility.waffle import flag_is_active_for_user
from registrar.views.utility.portfolio_helper import (
get_org_membership,
get_requested_user,
handle_invitation_exceptions,
)
from ..forms import ( from ..forms import (
SeniorOfficialContactForm, SeniorOfficialContactForm,
@ -1149,43 +1149,13 @@ class DomainAddUserView(DomainFormBaseView):
def get_success_url(self): def get_success_url(self):
return reverse("domain-users", kwargs={"pk": self.object.pk}) return reverse("domain-users", kwargs={"pk": self.object.pk})
def _get_org_membership(self, requestor_org, requested_email, requested_user):
"""
Verifies if an email belongs to a different organization as a member or invited member.
Verifies if an email belongs to this organization as a member or invited member.
User does not belong to any org can be deduced from the tuple returned.
Returns a tuple (member_of_a_different_org, member_of_this_org).
"""
# COMMENT: this code does not take into account multiple portfolios flag
# COMMENT: shouldn't this code be based on the organization of the domain, not the org
# of the requestor? requestor could have multiple portfolios
# Check for existing permissions or invitations for the requested user
existing_org_permission = UserPortfolioPermission.objects.filter(user=requested_user).first()
existing_org_invitation = PortfolioInvitation.objects.filter(email=requested_email).first()
# Determine membership in a different organization
member_of_a_different_org = (
existing_org_permission and existing_org_permission.portfolio != requestor_org
) or (existing_org_invitation and existing_org_invitation.portfolio != requestor_org)
# Determine membership in the same organization
member_of_this_org = (existing_org_permission and existing_org_permission.portfolio == requestor_org) or (
existing_org_invitation and existing_org_invitation.portfolio == requestor_org
)
return member_of_a_different_org, member_of_this_org
def form_valid(self, form): def form_valid(self, form):
"""Add the specified user to this domain.""" """Add the specified user to this domain."""
requested_email = form.cleaned_data["email"] requested_email = form.cleaned_data["email"]
requestor = self.request.user requestor = self.request.user
# Look up a user with that email # Look up a user with that email
requested_user = self._get_requested_user(requested_email) requested_user = get_requested_user(requested_email)
# NOTE: This does not account for multiple portfolios flag being set to True # NOTE: This does not account for multiple portfolios flag being set to True
domain_org = self.object.domain_info.portfolio domain_org = self.object.domain_info.portfolio
@ -1196,49 +1166,36 @@ class DomainAddUserView(DomainFormBaseView):
or requestor.is_staff or requestor.is_staff
) )
member_of_a_different_org, member_of_this_org = self._get_org_membership( member_of_a_different_org, member_of_this_org = get_org_membership(domain_org, requested_email, requested_user)
domain_org, requested_email, requested_user
)
# determine portfolio of the domain (code currently is looking at requestor's portfolio)
# if requested_email/user is not member or invited member of this portfolio
# COMMENT: this code does not take into account multiple portfolios flag
# send portfolio invitation email
# create portfolio invitation
# create message to view
if (
flag_is_active_for_user(requestor, "organization_feature")
and not flag_is_active_for_user(requestor, "multiple_portfolios")
and domain_org is not None
and requestor_can_update_portfolio
and not member_of_this_org
):
try:
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=domain_org)
PortfolioInvitation.objects.get_or_create(email=requested_email, portfolio=domain_org)
messages.success(self.request, f"{requested_email} has been invited to the organization: {domain_org}")
except Exception as e:
self._handle_portfolio_exceptions(e, requested_email, domain_org)
# If that first invite does not succeed take an early exit
return redirect(self.get_success_url())
try: try:
# determine portfolio of the domain (code currently is looking at requestor's portfolio)
# if requested_email/user is not member or invited member of this portfolio
# COMMENT: this code does not take into account multiple portfolios flag
# send portfolio invitation email
# create portfolio invitation
# create message to view
if (
flag_is_active_for_user(requestor, "organization_feature")
and not flag_is_active_for_user(requestor, "multiple_portfolios")
and domain_org is not None
and requestor_can_update_portfolio
and not member_of_this_org
):
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=domain_org)
PortfolioInvitation.objects.get_or_create(
email=requested_email, portfolio=domain_org, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER]
)
messages.success(self.request, f"{requested_email} has been invited to the organization: {domain_org}")
if requested_user is None: if requested_user is None:
self._handle_new_user_invitation(requested_email, requestor, member_of_a_different_org) self._handle_new_user_invitation(requested_email, requestor, member_of_a_different_org)
else: else:
self._handle_existing_user(requested_email, requestor, requested_user, member_of_a_different_org) self._handle_existing_user(requested_email, requestor, requested_user, member_of_a_different_org)
except Exception as e: except Exception as e:
self._handle_exceptions(e, requested_email) handle_invitation_exceptions(self.request, e, requested_email)
return redirect(self.get_success_url()) return redirect(self.get_success_url())
def _get_requested_user(self, email):
"""Retrieve a user by email or return None if the user doesn't exist."""
try:
return User.objects.get(email=email)
except User.DoesNotExist:
return None
def _handle_new_user_invitation(self, email, requestor, member_of_different_org): def _handle_new_user_invitation(self, email, requestor, member_of_different_org):
"""Handle invitation for a new user who does not exist in the system.""" """Handle invitation for a new user who does not exist in the system."""
send_domain_invitation_email( send_domain_invitation_email(
@ -1265,57 +1222,6 @@ class DomainAddUserView(DomainFormBaseView):
) )
messages.success(self.request, f"Added user {email}.") messages.success(self.request, f"Added user {email}.")
def _handle_exceptions(self, exception, email):
"""Handle exceptions raised during the process."""
if isinstance(exception, EmailSendingError):
logger.warning(
"Could not send email invitation to %s for domain %s (EmailSendingError)",
email,
self.object,
exc_info=True,
)
messages.warning(self.request, "Could not send email invitation.")
elif isinstance(exception, OutsideOrgMemberError):
logger.warning(
"Could not send email. Can not invite member of a .gov organization to a different organization.",
self.object,
exc_info=True,
)
messages.error(
self.request,
f"{email} is already a member of another .gov organization.",
)
elif isinstance(exception, AlreadyDomainManagerError):
messages.warning(self.request, str(exception))
elif isinstance(exception, AlreadyDomainInvitedError):
messages.warning(self.request, str(exception))
elif isinstance(exception, MissingEmailError):
messages.error(self.request, str(exception))
logger.error(
f"Can't send email to '{email}' on domain '{self.object}'. No email exists for the requestor.",
exc_info=True,
)
elif isinstance(exception, IntegrityError):
messages.warning(self.request, f"{email} is already a manager for this domain")
else:
logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
messages.warning(self.request, "Could not send email invitation.")
def _handle_portfolio_exceptions(self, exception, email, portfolio):
"""Handle exceptions raised during the process."""
if isinstance(exception, EmailSendingError):
logger.warning("Could not send email invitation (EmailSendingError)", exc_info=True)
messages.warning(self.request, "Could not send email invitation.")
elif isinstance(exception, MissingEmailError):
messages.error(self.request, str(exception))
logger.error(
f"Can't send email to '{email}' for portfolio '{portfolio}'. No email exists for the requestor.",
exc_info=True,
)
else:
logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
messages.warning(self.request, "Could not send email invitation.")
class DomainInvitationCancelView(SuccessMessageMixin, DomainInvitationPermissionCancelView): class DomainInvitationCancelView(SuccessMessageMixin, DomainInvitationPermissionCancelView):
object: DomainInvitation object: DomainInvitation

View file

@ -0,0 +1,83 @@
from django.contrib import messages
from django.db import IntegrityError
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user import User
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.utility.email import EmailSendingError
import logging
from registrar.utility.errors import (
AlreadyDomainInvitedError,
AlreadyDomainManagerError,
MissingEmailError,
OutsideOrgMemberError,
)
logger = logging.getLogger(__name__)
def get_org_membership(requestor_org, requested_email, requested_user):
"""
Verifies if an email belongs to a different organization as a member or invited member.
Verifies if an email belongs to this organization as a member or invited member.
User does not belong to any org can be deduced from the tuple returned.
Returns a tuple (member_of_a_different_org, member_of_this_org).
"""
# COMMENT: this code does not take into account multiple portfolios flag
# COMMENT: shouldn't this code be based on the organization of the domain, not the org
# of the requestor? requestor could have multiple portfolios
# Check for existing permissions or invitations for the requested user
existing_org_permission = UserPortfolioPermission.objects.filter(user=requested_user).first()
existing_org_invitation = PortfolioInvitation.objects.filter(email=requested_email).first()
# Determine membership in a different organization
member_of_a_different_org = (existing_org_permission and existing_org_permission.portfolio != requestor_org) or (
existing_org_invitation and existing_org_invitation.portfolio != requestor_org
)
# Determine membership in the same organization
member_of_this_org = (existing_org_permission and existing_org_permission.portfolio == requestor_org) or (
existing_org_invitation and existing_org_invitation.portfolio == requestor_org
)
return member_of_a_different_org, member_of_this_org
def get_requested_user(email):
"""Retrieve a user by email or return None if the user doesn't exist."""
try:
return User.objects.get(email=email)
except User.DoesNotExist:
return None
def handle_invitation_exceptions(request, exception, email):
"""Handle exceptions raised during the process."""
if isinstance(exception, EmailSendingError):
logger.warning(str(exception), exc_info=True)
messages.error(request, str(exception))
elif isinstance(exception, MissingEmailError):
messages.error(request, str(exception))
logger.error(str(exception), exc_info=True)
elif isinstance(exception, OutsideOrgMemberError):
logger.warning(
"Could not send email. Can not invite member of a .gov organization to a different organization.",
exc_info=True,
)
messages.error(
request,
f"{email} is already a member of another .gov organization.",
)
elif isinstance(exception, AlreadyDomainManagerError):
messages.warning(request, str(exception))
elif isinstance(exception, AlreadyDomainInvitedError):
messages.warning(request, str(exception))
elif isinstance(exception, IntegrityError):
messages.warning(request, f"{email} is already a manager for this domain")
else:
logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
messages.warning(request, "Could not send email invitation.")