first refactor attempt, moving send_domain_invitation_email to a helper

This commit is contained in:
David Kennedy 2024-12-13 06:48:47 -05:00
parent d766aa9b2a
commit d2d44bfea1
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B
4 changed files with 150 additions and 120 deletions

View file

@ -7,6 +7,7 @@ from typing import Optional
from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore
from django.db import models
from django.urls import reverse
from django.utils import timezone
from typing import Any
from registrar.models.host import Host
@ -1087,6 +1088,13 @@ class Domain(TimeStampedModel, DomainHelper):
help_text="Record of the last change event for ds data",
)
def get_absolute_url(self):
"""
Returns the absolute URL for the domain instance.
This is the standard implementation for Django models.
"""
return reverse("domain", kwargs={"pk": self.pk})
def isActive(self):
return self.state == Domain.State.CREATED

View file

@ -0,0 +1,97 @@
from django.conf import settings
from registrar.models import DomainInvitation
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.utility.errors import (
MissingEmailError,
AlreadyManagerError,
AlreadyInvitedError,
OutsideOrgMemberError,
)
from registrar.utility.waffle import flag_is_active_for_user
from registrar.utility.email import send_templated_email, EmailSendingError
import logging
logger = logging.getLogger(__name__)
def _is_member_of_different_org(email, requestor, requested_user):
"""Verifies if an email belongs to a different organization as a member or invited member."""
# Check if requested_user is a already member of a different organization than the requestor's org
requestor_org = UserPortfolioPermission.objects.filter(user=requestor).first().portfolio
existing_org_permission = UserPortfolioPermission.objects.filter(user=requested_user).first()
existing_org_invitation = PortfolioInvitation.objects.filter(email=email).first()
return (existing_org_permission and existing_org_permission.portfolio != requestor_org) or (
existing_org_invitation and existing_org_invitation.portfolio != requestor_org
)
def send_domain_invitation_email(email: str, requestor, domain, requested_user=None):
"""
Sends a domain invitation email to the specified address.
Raises exceptions for validation or email-sending issues.
Args:
email (str): Email address of the recipient.
requestor (User): The user initiating the invitation.
domain (Domain): The domain object for which the invitation is being sent.
requested_user (User): The user of the recipient, if exists; defaults to None
Raises:
MissingEmailError: If the requestor has no email associated with their account.
AlreadyManagerError: If the email corresponds to an existing domain manager.
AlreadyInvitedError: If an invitation has already been sent.
OutsideOrgMemberError: If the requested_user is part of a different organization.
EmailSendingError: If there is an error while sending the email.
"""
# Default email address for staff
requestor_email = settings.DEFAULT_FROM_EMAIL
# Check if the requestor is staff and has an email
if not requestor.is_staff:
if not requestor.email or requestor.email.strip() == "":
raise MissingEmailError(requestor.username)
else:
requestor_email = requestor.email
# Check if the recipient is part of a different organization
if flag_is_active_for_user(requestor, "organization_feature") and _is_member_of_different_org(
email, requestor, requested_user
):
raise OutsideOrgMemberError
# Check for an existing invitation
try:
invite = DomainInvitation.objects.get(email=email, domain=domain)
if invite.status == DomainInvitation.DomainInvitationStatus.RETRIEVED:
raise AlreadyManagerError(email)
elif invite.status == DomainInvitation.DomainInvitationStatus.CANCELED:
invite.update_cancellation_status()
invite.save()
else:
raise AlreadyInvitedError(email)
except DomainInvitation.DoesNotExist:
pass
# Send the email
try:
send_templated_email(
"emails/domain_invitation.txt",
"emails/domain_invitation_subject.txt",
to_address=email,
context={
"domain_url": domain.get_absolute_url(),
"domain": domain,
"requestor_email": requestor_email,
},
)
except EmailSendingError as exc:
logger.warning(
"Could not send email invitation to %s for domain %s",
email,
domain,
exc_info=True,
)
raise EmailSendingError("Could not send email invitation.") from exc

View file

@ -23,6 +23,34 @@ class InvalidDomainError(ValueError):
pass
class InvitationError(Exception):
"""Base exception for invitation-related errors."""
pass
class AlreadyManagerError(InvitationError):
"""Raised when the user is already a manager for the domain."""
def __init__(self, email):
super().__init__(f"{email} is already a manager for this domain.")
class AlreadyInvitedError(InvitationError):
"""Raised when the user has already been invited to the domain."""
def __init__(self, email):
super().__init__(f"{email} has already been invited to this domain.")
class MissingEmailError(InvitationError):
"""Raised when the requestor has no email associated with their account."""
def __init__(self, username):
super().__init__(f"Can't send invitation email. No email is associated with the account for '{username}'.")
self.username = username
class OutsideOrgMemberError(ValueError):
"""
Error raised when an org member tries adding a user from a different .gov org.

View file

@ -25,14 +25,16 @@ from registrar.models import (
PortfolioInvitation,
User,
UserDomainRole,
UserPortfolioPermission,
PublicContact,
)
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
from registrar.utility.enums import DefaultEmail
from registrar.utility.errors import (
AlreadyInvitedError,
AlreadyManagerError,
GenericError,
GenericErrorCodes,
MissingEmailError,
NameserverError,
NameserverErrorCodes as nsErrorCodes,
DsDataError,
@ -63,40 +65,13 @@ from epplibwrapper import (
)
from ..utility.email import send_templated_email, EmailSendingError
from ..utility.email_invitations import send_domain_invitation_email
from .utility import DomainPermissionView, DomainInvitationPermissionCancelView
from django import forms
logger = logging.getLogger(__name__)
class InvitationError(Exception):
"""Base exception for invitation-related errors."""
pass
class AlreadyManagerError(InvitationError):
"""Raised when the user is already a manager for the domain."""
def __init__(self, email):
super().__init__(f"{email} is already a manager for this domain.")
class AlreadyInvitedError(InvitationError):
"""Raised when the user has already been invited to the domain."""
def __init__(self, email):
super().__init__(f"{email} has already been invited to this domain.")
class MissingEmailError(InvitationError):
"""Raised when the requestor has no email associated with their account."""
def __init__(self, username):
super().__init__(f"Can't send invitation email. No email is associated with the account for '{username}'.")
self.username = username
class DomainBaseView(DomainPermissionView):
"""
Base View for the Domain. Handles getting and setting the domain
@ -1211,93 +1186,8 @@ class DomainAddUserView(DomainFormBaseView):
def get_success_url(self):
return reverse("domain-users", kwargs={"pk": self.object.pk})
def _domain_abs_url(self):
"""Get an absolute URL for this domain."""
return self.request.build_absolute_uri(reverse("domain", kwargs={"pk": self.object.id}))
def _is_member_of_different_org(self, email, requestor, requested_user):
"""Verifies if an email belongs to a different organization as a member or invited member."""
# Check if user is a already member of a different organization than the requestor's org
requestor_org = UserPortfolioPermission.objects.filter(user=requestor).first().portfolio
existing_org_permission = UserPortfolioPermission.objects.filter(user=requested_user).first()
existing_org_invitation = PortfolioInvitation.objects.filter(email=email).first()
return (existing_org_permission and existing_org_permission.portfolio != requestor_org) or (
existing_org_invitation and existing_org_invitation.portfolio != requestor_org
)
def _check_existing_invitation_status(self, email, domain):
"""Check if existing invitation exists; checks its status for canceled, invited or retrieved, and gives the appropiate response.
Exceptions raised for RETRIEVED or INVITED. Existing CANCELED invitations updated to INVITED.
Existing CANCELED invitations do not raise an exception."""
try:
invite = DomainInvitation.objects.get(email=email, domain=domain)
except Exception as err:
# No existing invitation, do nothing
return
else:
if invite.status == DomainInvitation.DomainInvitationStatus.RETRIEVED:
raise AlreadyManagerError(email)
elif invite.status == DomainInvitation.DomainInvitationStatus.CANCELED:
invite.update_cancellation_status()
invite.save()
else:
# Status is INVITED
raise AlreadyInvitedError(email)
def _send_domain_invitation_email(self, email: str, requestor: User, requested_user=None):
"""Performs the sending of the domain invitation email,
does not make a domain information object
email: string- email to send to
raises EmailSendingError
"""
# Set a default email address to send to for staff
requestor_email = settings.DEFAULT_FROM_EMAIL
# refactor below to raise an error
# if the user is not a staff member
if not requestor.is_staff:
if not requestor.email or requestor.email.strip() == "":
raise MissingEmailError(requestor.username)
else:
requestor_email = requestor.email
# Check is user is a member or invited member of a different org from this domain's org
if flag_is_active_for_user(requestor, "organization_feature") and self._is_member_of_different_org(
email, requestor, requested_user
):
raise OutsideOrgMemberError
# Check to see if an invite has already been sent
self._check_existing_invitation_status(email=email, domain=self.object)
try:
send_templated_email(
"emails/domain_invitation.txt",
"emails/domain_invitation_subject.txt",
to_address=email,
context={
"domain_url": self._domain_abs_url(),
"domain": self.object,
"requestor_email": requestor_email,
},
)
except EmailSendingError as exc:
logger.warn(
"Could not sent email invitation to %s for domain %s",
email,
self.object,
exc_info=True,
)
logger.info(exc)
raise EmailSendingError("Could not send email invitation.") from exc
def form_valid(self, form):
"""Add the specified user on this domain.
Throws EmailSendingError."""
"""Add the specified user on this domain."""
requested_email = form.cleaned_data["email"]
requestor = self.request.user
@ -1311,13 +1201,20 @@ class DomainAddUserView(DomainFormBaseView):
try:
if requested_user is None:
# no user exists, send an email and make an invitation
self._send_domain_invitation_email(email=requested_email, requestor=requestor)
send_domain_invitation_email(
email=requested_email,
requestor=requestor,
domain=self.object,
)
DomainInvitation.objects.get_or_create(email=requested_email, domain=self.object)
messages.success(self.request, f"{requested_email} has been invited to this domain.")
else:
# user exists, send email and make user domain role
self._send_domain_invitation_email(
email=requested_email, requestor=requestor, requested_user=requested_user
send_domain_invitation_email(
email=requested_email,
requestor=requestor,
requested_user=requested_user,
domain=self.object,
)
UserDomainRole.objects.create(
user=requested_user,
@ -1353,6 +1250,8 @@ class DomainAddUserView(DomainFormBaseView):
f"No email exists for the requestor '{e.username}'.",
exc_info=True,
)
except IntegrityError:
messages.warning(self.request, f"{requested_email} is already a manager for this domain")
except Exception:
logger.warn(
"Could not send email invitation (Other Exception)",
@ -1360,8 +1259,6 @@ class DomainAddUserView(DomainFormBaseView):
exc_info=True,
)
messages.warning(self.request, "Could not send email invitation.")
except IntegrityError:
messages.warning(self.request, f"{requested_email} is already a manager for this domain")
return redirect(self.get_success_url())