refactored _send_domain_invitation_email in domain.py view

This commit is contained in:
David Kennedy 2024-12-12 20:23:30 -05:00
parent a2e6238b18
commit d766aa9b2a
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B

View file

@ -69,6 +69,34 @@ from django import forms
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class InvitationError(Exception):
"""Base exception for invitation-related errors."""
pass
class AlreadyManagerError(InvitationError):
"""Raised when the user is already a manager for the domain."""
def __init__(self, email):
super().__init__(f"{email} is already a manager for this domain.")
class AlreadyInvitedError(InvitationError):
"""Raised when the user has already been invited to the domain."""
def __init__(self, email):
super().__init__(f"{email} has already been invited to this domain.")
class MissingEmailError(InvitationError):
"""Raised when the requestor has no email associated with their account."""
def __init__(self, username):
super().__init__(f"Can't send invitation email. No email is associated with the account for '{username}'.")
self.username = username
class DomainBaseView(DomainPermissionView): class DomainBaseView(DomainPermissionView):
""" """
Base View for the Domain. Handles getting and setting the domain Base View for the Domain. Handles getting and setting the domain
@ -1184,13 +1212,11 @@ class DomainAddUserView(DomainFormBaseView):
return reverse("domain-users", kwargs={"pk": self.object.pk}) return reverse("domain-users", kwargs={"pk": self.object.pk})
def _domain_abs_url(self): def _domain_abs_url(self):
"""Get an absolute URL for this domain. """Get an absolute URL for this domain."""
Used by the email helper."""
return self.request.build_absolute_uri(reverse("domain", kwargs={"pk": self.object.id})) return self.request.build_absolute_uri(reverse("domain", kwargs={"pk": self.object.id}))
def _is_member_of_different_org(self, email, requestor, requested_user): def _is_member_of_different_org(self, email, requestor, requested_user):
"""Verifies if an email belongs to a different organization as a member or invited member. """Verifies if an email belongs to a different organization as a member or invited member."""
Used by the email helper."""
# Check if user is a already member of a different organization than the requestor's org # Check if user is a already member of a different organization than the requestor's org
requestor_org = UserPortfolioPermission.objects.filter(user=requestor).first().portfolio requestor_org = UserPortfolioPermission.objects.filter(user=requestor).first().portfolio
existing_org_permission = UserPortfolioPermission.objects.filter(user=requested_user).first() existing_org_permission = UserPortfolioPermission.objects.filter(user=requested_user).first()
@ -1200,67 +1226,53 @@ class DomainAddUserView(DomainFormBaseView):
existing_org_invitation and existing_org_invitation.portfolio != requestor_org existing_org_invitation and existing_org_invitation.portfolio != requestor_org
) )
def _check_invite_status(self, invite, email): def _check_existing_invitation_status(self, email, domain):
"""Check if invitation status is canceled or retrieved, and gives the appropiate response""" """Check if existing invitation exists; checks its status for canceled, invited or retrieved, and gives the appropiate response.
if invite.status == DomainInvitation.DomainInvitationStatus.RETRIEVED:
messages.warning(
self.request,
f"{email} is already a manager for this domain.",
)
return False
elif invite.status == DomainInvitation.DomainInvitationStatus.CANCELED:
invite.update_cancellation_status()
invite.save()
return True
else:
# else if it has been sent but not accepted
messages.warning(self.request, f"{email} has already been invited to this domain")
return False
def _send_domain_invitation_email(self, email: str, requestor: User, requested_user=None, add_success=True): Exceptions raised for RETRIEVED or INVITED. Existing CANCELED invitations updated to INVITED.
Existing CANCELED invitations do not raise an exception."""
try:
invite = DomainInvitation.objects.get(email=email, domain=domain)
except Exception as err:
# No existing invitation, do nothing
return
else:
if invite.status == DomainInvitation.DomainInvitationStatus.RETRIEVED:
raise AlreadyManagerError(email)
elif invite.status == DomainInvitation.DomainInvitationStatus.CANCELED:
invite.update_cancellation_status()
invite.save()
else:
# Status is INVITED
raise AlreadyInvitedError(email)
def _send_domain_invitation_email(self, email: str, requestor: User, requested_user=None):
"""Performs the sending of the domain invitation email, """Performs the sending of the domain invitation email,
does not make a domain information object does not make a domain information object
email: string- email to send to email: string- email to send to
add_success: bool- default True indicates:
adding a success message to the view if the email sending succeeds
raises EmailSendingError raises EmailSendingError
""" """
print('_send_domain_invitation_email') # Set a default email address to send to for staff
# FROM email for staff
requestor_email = settings.DEFAULT_FROM_EMAIL requestor_email = settings.DEFAULT_FROM_EMAIL
# FROM email for users # refactor below to raise an error
# if the user is not a staff member
if not requestor.is_staff: if not requestor.is_staff:
if requestor.email is not None and requestor.email.strip() != "": if not requestor.email or requestor.email.strip() == "":
requestor_email = requestor.email raise MissingEmailError(requestor.username)
else: else:
# The user is not staff and does not have an email requestor_email = requestor.email
logger.error(
f"Can't send email to '{email}' on domain '{self.object}'."
f"No email exists for the requestor '{requestor.username}'.",
exc_info=True,
)
# The None returned here will trigger a specific error alert
return None
# Check is user is a member or invited member of a different org from this domain's org
# Check is user is a member or invited member of a different org
if flag_is_active_for_user(requestor, "organization_feature") and self._is_member_of_different_org( if flag_is_active_for_user(requestor, "organization_feature") and self._is_member_of_different_org(
email, requestor, requested_user email, requestor, requested_user
): ):
add_success = False
raise OutsideOrgMemberError raise OutsideOrgMemberError
# Check to see if an invite has already been sent # Check to see if an invite has already been sent
try: self._check_existing_invitation_status(email=email, domain=self.object)
invite = DomainInvitation.objects.get(email=email, domain=self.object)
# check if the invite has already been accepted or has a canceled invite
add_success = self._check_invite_status(invite, email)
except Exception as exeption:
logger.error(f"Invite does not exist: {exeption}")
try: try:
send_templated_email( send_templated_email(
@ -1282,95 +1294,74 @@ class DomainAddUserView(DomainFormBaseView):
) )
logger.info(exc) logger.info(exc)
raise EmailSendingError("Could not send email invitation.") from exc raise EmailSendingError("Could not send email invitation.") from exc
# else:
# if add_success:
# messages.success(self.request, f"{email} has been invited to this domain.")
return add_success
def _make_invitation(self, email_address: str, requestor: User):
"""Make a Domain invitation for this email and redirect with a message."""
try:
print('_make_invitation')
add_success = self._send_domain_invitation_email(email=email_address, requestor=requestor)
if add_success is None:
messages.error(self.request, "Can't send invitation email. No email is associated with your account.")
elif add_success == True:
messages.success(self.request, f"{email_address} has been invited to this domain.")
except EmailSendingError:
messages.warning(self.request, "Could not send email invitation.")
else:
# (NOTE: only create a domainInvitation if the e-mail sends correctly)
DomainInvitation.objects.get_or_create(email=email_address, domain=self.object)
return redirect(self.get_success_url())
def form_valid(self, form): def form_valid(self, form):
"""Add the specified user on this domain. """Add the specified user on this domain.
Throws EmailSendingError.""" Throws EmailSendingError."""
requested_email = form.cleaned_data["email"] requested_email = form.cleaned_data["email"]
requestor = self.request.user requestor = self.request.user
should_add_user_domain_role = False
# look up a user with that email # look up a user with that email
requested_user = None
try: try:
requested_user = User.objects.get(email=requested_email) requested_user = User.objects.get(email=requested_email)
except User.DoesNotExist: except User.DoesNotExist:
# no matching user, go make an invitation pass
print('User.DoesNotExist')
return self._make_invitation(requested_email, requestor)
else:
# if user already exists then just send an email
try:
print('else block after User.DoesNotExist - user already exists then just send an email')
add_success = self._send_domain_invitation_email(
requested_email, requestor, requested_user=requested_user, add_success=False
)
if add_success is None: try:
messages.error(self.request, "Can't send invitation email. No email is associated with your account.") if requested_user is None:
elif add_success == True: # no user exists, send an email and make an invitation
messages.success(self.request, f"{requested_email} has been invited to this domain.") self._send_domain_invitation_email(email=requested_email, requestor=requestor)
DomainInvitation.objects.get_or_create(email=requested_email, domain=self.object)
should_add_user_domain_role = True messages.success(self.request, f"{requested_email} has been invited to this domain.")
except EmailSendingError: else:
logger.warn( # user exists, send email and make user domain role
"Could not send email invitation (EmailSendingError)", self._send_domain_invitation_email(
self.object, email=requested_email, requestor=requestor, requested_user=requested_user
exc_info=True,
) )
messages.warning(self.request, "Could not send email invitation.")
should_add_user_domain_role = True
except OutsideOrgMemberError:
logger.warn(
"Could not send email. Can not invite member of a .gov organization to a different organization.",
self.object,
exc_info=True,
)
messages.error(
self.request,
f"{requested_email} is already a member of another .gov organization.",
)
except Exception:
logger.warn(
"Could not send email invitation (Other Exception)",
self.object,
exc_info=True,
)
messages.warning(self.request, "Could not send email invitation.")
if should_add_user_domain_role:
try:
print('will add domain role')
UserDomainRole.objects.create( UserDomainRole.objects.create(
user=requested_user, user=requested_user,
domain=self.object, domain=self.object,
role=UserDomainRole.Roles.MANAGER, role=UserDomainRole.Roles.MANAGER,
) )
messages.success(self.request, f"Added user {requested_email}.") messages.success(self.request, f"Added user {requested_email}.")
except IntegrityError: except EmailSendingError:
messages.warning(self.request, f"{requested_email} is already a manager for this domain") logger.warn(
"Could not send email invitation (EmailSendingError)",
self.object,
exc_info=True,
)
messages.warning(self.request, "Could not send email invitation.")
except OutsideOrgMemberError:
logger.warn(
"Could not send email. Can not invite member of a .gov organization to a different organization.",
self.object,
exc_info=True,
)
messages.error(
self.request,
f"{requested_email} is already a member of another .gov organization.",
)
except AlreadyManagerError as e:
messages.warning(self.request, str(e))
except AlreadyInvitedError as e:
messages.warning(self.request, str(e))
except MissingEmailError as e:
messages.error(self.request, str(e))
logger.error(
f"Can't send email to '{requested_email}' on domain '{self.object}'. "
f"No email exists for the requestor '{e.username}'.",
exc_info=True,
)
except Exception:
logger.warn(
"Could not send email invitation (Other Exception)",
self.object,
exc_info=True,
)
messages.warning(self.request, "Could not send email invitation.")
except IntegrityError:
messages.warning(self.request, f"{requested_email} is already a manager for this domain")
return redirect(self.get_success_url()) return redirect(self.get_success_url())