moving around logic, prepping for refactor

This commit is contained in:
David Kennedy 2024-12-16 10:59:58 -05:00
parent dd32cb9e5d
commit ddf7d92b4b
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B
3 changed files with 117 additions and 65 deletions

View file

@ -17,19 +17,7 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _is_member_of_different_org(email, requestor, requested_user): def send_domain_invitation_email(email: str, requestor, domain, is_member_of_different_org):
"""Verifies if an email belongs to a different organization as a member or invited member."""
# Check if requested_user is a already member of a different organization than the requestor's org
requestor_org = UserPortfolioPermission.objects.filter(user=requestor).first().portfolio
existing_org_permission = UserPortfolioPermission.objects.filter(user=requested_user).first()
existing_org_invitation = PortfolioInvitation.objects.filter(email=email).first()
return (existing_org_permission and existing_org_permission.portfolio != requestor_org) or (
existing_org_invitation and existing_org_invitation.portfolio != requestor_org
)
def send_domain_invitation_email(email: str, requestor, domain, requested_user=None):
""" """
Sends a domain invitation email to the specified address. Sends a domain invitation email to the specified address.
@ -39,7 +27,7 @@ def send_domain_invitation_email(email: str, requestor, domain, requested_user=N
email (str): Email address of the recipient. email (str): Email address of the recipient.
requestor (User): The user initiating the invitation. requestor (User): The user initiating the invitation.
domain (Domain): The domain object for which the invitation is being sent. domain (Domain): The domain object for which the invitation is being sent.
requested_user (User): The user of the recipient, if exists; defaults to None is_member_of_different_org (bool): if an email belongs to a different org
Raises: Raises:
MissingEmailError: If the requestor has no email associated with their account. MissingEmailError: If the requestor has no email associated with their account.
@ -59,8 +47,11 @@ def send_domain_invitation_email(email: str, requestor, domain, requested_user=N
requestor_email = requestor.email requestor_email = requestor.email
# Check if the recipient is part of a different organization # Check if the recipient is part of a different organization
if flag_is_active_for_user(requestor, "organization_feature") and _is_member_of_different_org( # COMMENT: this does not account for multiple_portfolios flag being active
email, requestor, requested_user if (
flag_is_active_for_user(requestor, "organization_feature")
and not flag_is_active_for_user(requestor, "multiple_portfolios")
and is_member_of_different_org
): ):
raise OutsideOrgMemberError raise OutsideOrgMemberError
@ -78,24 +69,15 @@ def send_domain_invitation_email(email: str, requestor, domain, requested_user=N
pass pass
# Send the email # Send the email
try: send_templated_email(
send_templated_email( "emails/domain_invitation.txt",
"emails/domain_invitation.txt", "emails/domain_invitation_subject.txt",
"emails/domain_invitation_subject.txt", to_address=email,
to_address=email, context={
context={ "domain": domain,
"domain": domain, "requestor_email": requestor_email,
"requestor_email": requestor_email, },
}, )
)
except EmailSendingError as exc:
logger.warning(
"Could not send email invitation to %s for domain %s",
email,
domain,
exc_info=True,
)
raise EmailSendingError("Could not send email invitation.") from exc
def send_portfolio_invitation_email(email: str, requestor, portfolio): def send_portfolio_invitation_email(email: str, requestor, portfolio):
@ -136,23 +118,13 @@ def send_portfolio_invitation_email(email: str, requestor, portfolio):
except PortfolioInvitation.DoesNotExist: except PortfolioInvitation.DoesNotExist:
pass pass
try: send_templated_email(
send_templated_email( "emails/portfolio_invitation.txt",
"emails/portfolio_invitation.txt", "emails/portfolio_invitation_subject.txt",
"emails/portfolio_invitation_subject.txt", to_address=email,
to_address=email, context={
context={ "portfolio": portfolio,
"portfolio": portfolio, "requestor_email": requestor_email,
"requestor_email": requestor_email, "email": email,
"email": email, },
}, )
)
except EmailSendingError as exc:
logger.warning(
"Could not sent email invitation to %s for portfolio %s",
email,
portfolio,
exc_info=True,
)
raise EmailSendingError("Could not send email invitation.") from exc

View file

@ -27,11 +27,14 @@ from registrar.models import (
UserDomainRole, UserDomainRole,
PublicContact, PublicContact,
) )
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
from registrar.utility.enums import DefaultEmail from registrar.utility.enums import DefaultEmail
from registrar.utility.errors import ( from registrar.utility.errors import (
AlreadyDomainInvitedError, AlreadyDomainInvitedError,
AlreadyDomainManagerError, AlreadyDomainManagerError,
AlreadyPortfolioInvitedError,
AlreadyPortfolioMemberError,
GenericError, GenericError,
GenericErrorCodes, GenericErrorCodes,
MissingEmailError, MissingEmailError,
@ -65,7 +68,7 @@ from epplibwrapper import (
) )
from ..utility.email import send_templated_email, EmailSendingError from ..utility.email import send_templated_email, EmailSendingError
from ..utility.email_invitations import send_domain_invitation_email from ..utility.email_invitations import send_domain_invitation_email, send_portfolio_invitation_email
from .utility import DomainPermissionView, DomainInvitationPermissionCancelView from .utility import DomainPermissionView, DomainInvitationPermissionCancelView
from django import forms from django import forms
@ -1117,6 +1120,7 @@ class DomainUsersView(DomainBaseView):
# Check if there are any PortfolioInvitations linked to the same portfolio with the ORGANIZATION_ADMIN role # Check if there are any PortfolioInvitations linked to the same portfolio with the ORGANIZATION_ADMIN role
has_admin_flag = False has_admin_flag = False
logger.info(domain_invitation)
# Query PortfolioInvitations linked to the same portfolio and check roles # Query PortfolioInvitations linked to the same portfolio and check roles
portfolio_invitations = PortfolioInvitation.objects.filter( portfolio_invitations = PortfolioInvitation.objects.filter(
portfolio=portfolio, email=domain_invitation.email portfolio=portfolio, email=domain_invitation.email
@ -1124,7 +1128,9 @@ class DomainUsersView(DomainBaseView):
# If any of the PortfolioInvitations have the ORGANIZATION_ADMIN role, set the flag to True # If any of the PortfolioInvitations have the ORGANIZATION_ADMIN role, set the flag to True
for portfolio_invitation in portfolio_invitations: for portfolio_invitation in portfolio_invitations:
if UserPortfolioRoleChoices.ORGANIZATION_ADMIN in portfolio_invitation.roles: logger.info(portfolio_invitation)
logger.info(portfolio_invitation.roles)
if portfolio_invitation.roles and UserPortfolioRoleChoices.ORGANIZATION_ADMIN in portfolio_invitation.roles:
has_admin_flag = True has_admin_flag = True
break # Once we find one match, no need to check further break # Once we find one match, no need to check further
@ -1186,6 +1192,38 @@ class DomainAddUserView(DomainFormBaseView):
def get_success_url(self): def get_success_url(self):
return reverse("domain-users", kwargs={"pk": self.object.pk}) return reverse("domain-users", kwargs={"pk": self.object.pk})
def _get_org_membership(self, requestor_org, requested_email, requested_user):
"""
Verifies if an email belongs to a different organization as a member or invited member.
Verifies if an email belongs to this organization as a member or invited member.
User does not belong to any org can be deduced from the tuple returned.
Returns a tuple (member_of_a_different_org, member_of_this_org).
"""
# COMMENT: this code does not take into account multiple portfolios flag
# COMMENT: shouldn't this code be based on the organization of the domain, not the org
# of the requestor? requestor could have multiple portfolios
# Check for existing permissions or invitations for the requested user
existing_org_permission = UserPortfolioPermission.objects.filter(user=requested_user).first()
existing_org_invitation = PortfolioInvitation.objects.filter(email=requested_email).first()
# Determine membership in a different organization
member_of_a_different_org = (
(existing_org_permission and existing_org_permission.portfolio != requestor_org) or
(existing_org_invitation and existing_org_invitation.portfolio != requestor_org)
)
# Determine membership in the same organization
member_of_this_org = (
(existing_org_permission and existing_org_permission.portfolio == requestor_org) or
(existing_org_invitation and existing_org_invitation.portfolio == requestor_org)
)
return member_of_a_different_org, member_of_this_org
def form_valid(self, form): def form_valid(self, form):
"""Add the specified user to this domain.""" """Add the specified user to this domain."""
requested_email = form.cleaned_data["email"] requested_email = form.cleaned_data["email"]
@ -1193,12 +1231,34 @@ class DomainAddUserView(DomainFormBaseView):
# Look up a user with that email # Look up a user with that email
requested_user = self._get_requested_user(requested_email) requested_user = self._get_requested_user(requested_email)
# Get the requestor's organization
requestor_org = UserPortfolioPermission.objects.filter(user=requestor).first().portfolio
member_of_a_different_org, member_of_this_org = self._get_org_membership(requestor_org, requested_email, requested_user)
# determine portfolio of the domain (code currently is looking at requestor's portfolio)
# if requested_email/user is not member or invited member of this portfolio
# COMMENT: this code does not take into account multiple portfolios flag
# send portfolio invitation email
# create portfolio invitation
# create message to view
if (
flag_is_active_for_user(requestor, "organization_feature")
and not flag_is_active_for_user(requestor, "multiple_portfolios")
and not member_of_this_org
):
try:
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=requestor_org)
PortfolioInvitation.objects.get_or_create(email=requested_email, portfolio=requestor_org)
messages.success(self.request, f"{requested_email} has been invited.")
except Exception as e:
self._handle_portfolio_exceptions(e, requested_email, requestor_org)
try: try:
if requested_user is None: if requested_user is None:
self._handle_new_user_invitation(requested_email, requestor) self._handle_new_user_invitation(requested_email, requestor, member_of_a_different_org)
else: else:
self._handle_existing_user(requested_email, requestor, requested_user) self._handle_existing_user(requested_email, requestor, requested_user, member_of_a_different_org)
except Exception as e: except Exception as e:
self._handle_exceptions(e, requested_email) self._handle_exceptions(e, requested_email)
@ -1211,23 +1271,25 @@ class DomainAddUserView(DomainFormBaseView):
except User.DoesNotExist: except User.DoesNotExist:
return None return None
def _handle_new_user_invitation(self, email, requestor): def _handle_new_user_invitation(self, email, requestor, member_of_different_org):
"""Handle invitation for a new user who does not exist in the system.""" """Handle invitation for a new user who does not exist in the system."""
send_domain_invitation_email( send_domain_invitation_email(
email=email, email=email,
requestor=requestor, requestor=requestor,
domain=self.object, domain=self.object,
is_member_of_different_org=member_of_different_org,
) )
DomainInvitation.objects.get_or_create(email=email, domain=self.object) DomainInvitation.objects.get_or_create(email=email, domain=self.object)
messages.success(self.request, f"{email} has been invited to this domain.") messages.success(self.request, f"{email} has been invited to this domain.")
def _handle_existing_user(self, email, requestor, requested_user): def _handle_existing_user(self, email, requestor, requested_user, member_of_different_org):
"""Handle adding an existing user to the domain.""" """Handle adding an existing user to the domain."""
send_domain_invitation_email( send_domain_invitation_email(
email=email, email=email,
requestor=requestor, requestor=requestor,
requested_user=requested_user, requested_user=requested_user,
domain=self.object, domain=self.object,
is_member_of_different_org=member_of_different_org,
) )
UserDomainRole.objects.create( UserDomainRole.objects.create(
user=requested_user, user=requested_user,
@ -1239,10 +1301,10 @@ class DomainAddUserView(DomainFormBaseView):
def _handle_exceptions(self, exception, email): def _handle_exceptions(self, exception, email):
"""Handle exceptions raised during the process.""" """Handle exceptions raised during the process."""
if isinstance(exception, EmailSendingError): if isinstance(exception, EmailSendingError):
logger.warn("Could not send email invitation (EmailSendingError)", self.object, exc_info=True) logger.warning("Could not send email invitation to %s for domain %s (EmailSendingError)", email, self.object, exc_info=True)
messages.warning(self.request, "Could not send email invitation.") messages.warning(self.request, "Could not send email invitation.")
elif isinstance(exception, OutsideOrgMemberError): elif isinstance(exception, OutsideOrgMemberError):
logger.warn( logger.warning(
"Could not send email. Can not invite member of a .gov organization to a different organization.", "Could not send email. Can not invite member of a .gov organization to a different organization.",
self.object, self.object,
exc_info=True, exc_info=True,
@ -1264,9 +1326,27 @@ class DomainAddUserView(DomainFormBaseView):
elif isinstance(exception, IntegrityError): elif isinstance(exception, IntegrityError):
messages.warning(self.request, f"{email} is already a manager for this domain") messages.warning(self.request, f"{email} is already a manager for this domain")
else: else:
logger.warn("Could not send email invitation (Other Exception)", self.object, exc_info=True) logger.warning("Could not send email invitation (Other Exception)", self.object, exc_info=True)
messages.warning(self.request, "Could not send email invitation.") messages.warning(self.request, "Could not send email invitation.")
def _handle_portfolio_exceptions(self, exception, email, portfolio):
"""Handle exceptions raised during the process."""
if isinstance(exception, EmailSendingError):
logger.warning("Could not send email invitation (EmailSendingError)", portfolio, exc_info=True)
messages.warning(self.request, "Could not send email invitation.")
elif isinstance(exception, AlreadyPortfolioMemberError):
messages.warning(self.request, str(exception))
elif isinstance(exception, AlreadyPortfolioInvitedError):
messages.warning(self.request, str(exception))
elif isinstance(exception, MissingEmailError):
messages.error(self.request, str(exception))
logger.error(
f"Can't send email to '{email}' for portfolio '{portfolio}'. No email exists for the requestor.",
exc_info=True,
)
else:
logger.warning("Could not send email invitation (Other Exception)", portfolio, exc_info=True)
messages.warning(self.request, "Could not send email invitation.")
class DomainInvitationCancelView(SuccessMessageMixin, DomainInvitationPermissionCancelView): class DomainInvitationCancelView(SuccessMessageMixin, DomainInvitationPermissionCancelView):
object: DomainInvitation object: DomainInvitation

View file

@ -530,10 +530,10 @@ class NewMemberView(PortfolioMembersPermissionView, FormMixin):
requested_user = User.objects.filter(email=requested_email).first() requested_user = User.objects.filter(email=requested_email).first()
permission_exists = UserPortfolioPermission.objects.filter(user=requested_user, portfolio=self.object).exists() permission_exists = UserPortfolioPermission.objects.filter(user=requested_user, portfolio=self.object).exists()
# invitation_exists = PortfolioInvitation.objects.filter(email=requested_email, portfolio=self.object).exists()
try: try:
if not requested_user or not permission_exists: if not requested_user or not permission_exists:
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=self.object) send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=self.object)
## NOTE : this is not yet accounting properly for roles and permissions
PortfolioInvitation.objects.get_or_create(email=requested_email, portfolio=self.object) PortfolioInvitation.objects.get_or_create(email=requested_email, portfolio=self.object)
messages.success(self.request, f"{requested_email} has been invited.") messages.success(self.request, f"{requested_email} has been invited.")
else: else:
@ -546,7 +546,7 @@ class NewMemberView(PortfolioMembersPermissionView, FormMixin):
def _handle_exceptions(self, exception, email): def _handle_exceptions(self, exception, email):
"""Handle exceptions raised during the process.""" """Handle exceptions raised during the process."""
if isinstance(exception, EmailSendingError): if isinstance(exception, EmailSendingError):
logger.warning("Could not send email invitation (EmailSendingError)", self.object, exc_info=True) logger.warning("Could not sent email invitation to %s for portfolio %s (EmailSendingError)", email, self.object, exc_info=True)
messages.warning(self.request, "Could not send email invitation.") messages.warning(self.request, "Could not send email invitation.")
elif isinstance(exception, AlreadyPortfolioMemberError): elif isinstance(exception, AlreadyPortfolioMemberError):
messages.warning(self.request, str(exception)) messages.warning(self.request, str(exception))