formatted for code readability

This commit is contained in:
David Kennedy 2025-01-09 08:36:44 -05:00
parent e06a63e321
commit 7e2930f15d
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B
5 changed files with 69 additions and 44 deletions

View file

@ -1393,6 +1393,7 @@ class UserDomainRoleAdmin(ListHeaderAdmin, ImportExportModelAdmin):
return super().changeform_view(request, object_id, form_url, extra_context=extra_context)
class BaseInvitationAdmin(ListHeaderAdmin):
"""Base class for admin classes which will customize save_model and send email invitations
on model adds, and require custom handling of forms and form errors."""
@ -1542,6 +1543,7 @@ class DomainInvitationAdmin(BaseInvitationAdmin):
portfolio=domain_org,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
# if user exists for email, immediately retrieve portfolio invitation upon creation
if requested_user is not None:
portfolio_invitation.retrieve()
portfolio_invitation.save()
@ -1552,7 +1554,7 @@ class DomainInvitationAdmin(BaseInvitationAdmin):
requestor=requestor,
domains=domain,
is_member_of_different_org=member_of_a_different_org,
requested_user=requested_user
requested_user=requested_user,
)
if requested_user is not None:
# Domain Invitation creation for an existing User
@ -1639,6 +1641,7 @@ class PortfolioInvitationAdmin(BaseInvitationAdmin):
if not permission_exists:
# if permission does not exist for a user with requested_email, send email
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=portfolio)
# if user exists for email, immediately retrieve portfolio invitation upon creation
if requested_user is not None:
obj.retrieve()
messages.success(request, f"{requested_email} has been invited.")

View file

@ -1,7 +1,6 @@
from django.conf import settings
from registrar.models import DomainInvitation
from registrar.models.domain import Domain
from registrar.models.user import User
from registrar.utility.errors import (
AlreadyDomainInvitedError,
AlreadyDomainManagerError,
@ -15,12 +14,12 @@ import logging
logger = logging.getLogger(__name__)
def send_domain_invitation_email(email: str, requestor, domains: Domain | list[Domain], is_member_of_different_org, requested_user=None):
def send_domain_invitation_email(
email: str, requestor, domains: Domain | list[Domain], is_member_of_different_org, requested_user=None
):
"""
Sends a domain invitation email to the specified address.
Raises exceptions for validation or email-sending issues.
Args:
email (str): Email address of the recipient.
requestor (User): The user initiating the invitation.
@ -35,45 +34,66 @@ def send_domain_invitation_email(email: str, requestor, domains: Domain | list[D
OutsideOrgMemberError: If the requested_user is part of a different organization.
EmailSendingError: If there is an error while sending the email.
"""
# Normalize domains
if isinstance(domains, Domain):
domains = [domains]
domains = normalize_domains(domains)
requestor_email = get_requestor_email(requestor, domains)
# Default email address for staff
requestor_email = settings.DEFAULT_FROM_EMAIL
validate_invitation(email, domains, requestor, is_member_of_different_org)
# Check if the requestor is staff and has an email
if not requestor.is_staff:
if not requestor.email or requestor.email.strip() == "":
domain_names = ", ".join([domain.name for domain in domains])
raise MissingEmailError(email=email, domain=domain_names)
else:
requestor_email = requestor.email
send_invitation_email(email, requestor_email, domains, requested_user)
# Check if the recipient is part of a different organization
# COMMENT: this does not account for multiple_portfolios flag being active
def normalize_domains(domains):
"""Ensures domains is always a list."""
return [domains] if isinstance(domains, Domain) else domains
def get_requestor_email(requestor, domains):
"""Get the requestor's email or raise an error if it's missing."""
if requestor.is_staff:
return settings.DEFAULT_FROM_EMAIL
if not requestor.email or requestor.email.strip() == "":
domain_names = ", ".join([domain.name for domain in domains])
raise MissingEmailError(email=requestor.email, domain=domain_names)
return requestor.email
def validate_invitation(email, domains, requestor, is_member_of_different_org):
"""Validate the invitation conditions."""
check_outside_org_membership(email, requestor, is_member_of_different_org)
for domain in domains:
validate_existing_invitation(email, domain)
def check_outside_org_membership(email, requestor, is_member_of_different_org):
"""Raise an error if the email belongs to a different organization."""
if (
flag_is_active_for_user(requestor, "organization_feature")
and not flag_is_active_for_user(requestor, "multiple_portfolios")
and is_member_of_different_org
):
raise OutsideOrgMemberError
raise OutsideOrgMemberError(email=email)
# Check for an existing invitation for each domain
for domain in domains:
try:
invite = DomainInvitation.objects.get(email=email, domain=domain)
if invite.status == DomainInvitation.DomainInvitationStatus.RETRIEVED:
raise AlreadyDomainManagerError(email)
elif invite.status == DomainInvitation.DomainInvitationStatus.CANCELED:
invite.update_cancellation_status()
invite.save()
else:
raise AlreadyDomainInvitedError(email)
except DomainInvitation.DoesNotExist:
pass
# Send the email
def validate_existing_invitation(email, domain):
"""Check for existing invitations and handle their status."""
try:
invite = DomainInvitation.objects.get(email=email, domain=domain)
if invite.status == DomainInvitation.DomainInvitationStatus.RETRIEVED:
raise AlreadyDomainManagerError(email)
elif invite.status == DomainInvitation.DomainInvitationStatus.CANCELED:
invite.update_cancellation_status()
invite.save()
else:
raise AlreadyDomainInvitedError(email)
except DomainInvitation.DoesNotExist:
pass
def send_invitation_email(email, requestor_email, domains, requested_user):
"""Send the invitation email."""
try:
send_templated_email(
"emails/domain_invitation.txt",
@ -88,9 +108,7 @@ def send_domain_invitation_email(email: str, requestor, domains: Domain | list[D
)
except EmailSendingError as err:
domain_names = ", ".join([domain.name for domain in domains])
raise EmailSendingError(
f"Could not send email invitation to {email} for domains: {domain_names}"
) from err
raise EmailSendingError(f"Could not send email invitation to {email} for domains: {domain_names}") from err
def send_portfolio_invitation_email(email: str, requestor, portfolio):

View file

@ -1186,6 +1186,7 @@ class DomainAddUserView(DomainFormBaseView):
portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create(
email=requested_email, portfolio=domain_org, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER]
)
# if user exists for email, immediately retrieve portfolio invitation upon creation
if requested_user is not None:
portfolio_invitation.retrieve()
portfolio_invitation.save()

View file

@ -296,7 +296,7 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V
Processes added domains by bulk creating UserDomainRole instances.
"""
if added_domain_ids:
print('_process_added_domains')
# get added_domains from ids to pass to send email method and bulk create
added_domains = Domain.objects.filter(id__in=added_domain_ids)
member_of_a_different_org, _ = get_org_membership(portfolio, member.email, member)
send_domain_invitation_email(
@ -514,14 +514,15 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission
or creating new ones.
"""
if added_domain_ids:
# get added_domains from ids to pass to send email method and bulk create
added_domains = Domain.objects.filter(id__in=added_domain_ids)
member_of_a_different_org, _ = get_org_membership(portfolio, email, None)
send_domain_invitation_email(
email=email,
requestor=requestor,
domains=added_domains,
is_member_of_different_org=member_of_a_different_org,
)
email=email,
requestor=requestor,
domains=added_domains,
is_member_of_different_org=member_of_a_different_org,
)
# Update existing invitations from CANCELED to INVITED
existing_invitations = DomainInvitation.objects.filter(domain__in=added_domains, email=email)
@ -542,7 +543,7 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission
for domain_id in new_domain_ids
]
)
def _process_removed_domains(self, removed_domain_ids, email):
"""
Processes removed domain invitations by updating their status to CANCELED.
@ -777,6 +778,7 @@ class PortfolioAddMemberView(PortfolioMembersPermissionView, FormMixin):
if not requested_user or not permission_exists:
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=portfolio)
portfolio_invitation = form.save()
# if user exists for email, immediately retrieve portfolio invitation upon creation
if requested_user is not None:
portfolio_invitation.retrieve()
portfolio_invitation.save()

View file

@ -19,6 +19,7 @@ logger = logging.getLogger(__name__)
# when creating invitations and sending associated emails. These can be reused in
# any view, and were initially developed for domain.py, portfolios.py and admin.py
def get_org_membership(requestor_org, requested_email, requested_user):
"""
Verifies if an email belongs to a different organization as a member or invited member.