diff --git a/src/registrar/admin.py b/src/registrar/admin.py index e6f0f7929..cdcc0400e 100644 --- a/src/registrar/admin.py +++ b/src/registrar/admin.py @@ -1393,6 +1393,7 @@ class UserDomainRoleAdmin(ListHeaderAdmin, ImportExportModelAdmin): return super().changeform_view(request, object_id, form_url, extra_context=extra_context) + class BaseInvitationAdmin(ListHeaderAdmin): """Base class for admin classes which will customize save_model and send email invitations on model adds, and require custom handling of forms and form errors.""" @@ -1542,6 +1543,7 @@ class DomainInvitationAdmin(BaseInvitationAdmin): portfolio=domain_org, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER], ) + # if user exists for email, immediately retrieve portfolio invitation upon creation if requested_user is not None: portfolio_invitation.retrieve() portfolio_invitation.save() @@ -1552,7 +1554,7 @@ class DomainInvitationAdmin(BaseInvitationAdmin): requestor=requestor, domains=domain, is_member_of_different_org=member_of_a_different_org, - requested_user=requested_user + requested_user=requested_user, ) if requested_user is not None: # Domain Invitation creation for an existing User @@ -1639,6 +1641,7 @@ class PortfolioInvitationAdmin(BaseInvitationAdmin): if not permission_exists: # if permission does not exist for a user with requested_email, send email send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=portfolio) + # if user exists for email, immediately retrieve portfolio invitation upon creation if requested_user is not None: obj.retrieve() messages.success(request, f"{requested_email} has been invited.") diff --git a/src/registrar/utility/email_invitations.py b/src/registrar/utility/email_invitations.py index 55a0bdf14..1491b65a5 100644 --- a/src/registrar/utility/email_invitations.py +++ b/src/registrar/utility/email_invitations.py @@ -1,7 +1,6 @@ from django.conf import settings from registrar.models import DomainInvitation from registrar.models.domain import Domain -from registrar.models.user import User from registrar.utility.errors import ( AlreadyDomainInvitedError, AlreadyDomainManagerError, @@ -15,12 +14,12 @@ import logging logger = logging.getLogger(__name__) -def send_domain_invitation_email(email: str, requestor, domains: Domain | list[Domain], is_member_of_different_org, requested_user=None): +def send_domain_invitation_email( + email: str, requestor, domains: Domain | list[Domain], is_member_of_different_org, requested_user=None +): """ Sends a domain invitation email to the specified address. - Raises exceptions for validation or email-sending issues. - Args: email (str): Email address of the recipient. requestor (User): The user initiating the invitation. @@ -35,45 +34,66 @@ def send_domain_invitation_email(email: str, requestor, domains: Domain | list[D OutsideOrgMemberError: If the requested_user is part of a different organization. EmailSendingError: If there is an error while sending the email. """ - # Normalize domains - if isinstance(domains, Domain): - domains = [domains] + domains = normalize_domains(domains) + requestor_email = get_requestor_email(requestor, domains) - # Default email address for staff - requestor_email = settings.DEFAULT_FROM_EMAIL + validate_invitation(email, domains, requestor, is_member_of_different_org) - # Check if the requestor is staff and has an email - if not requestor.is_staff: - if not requestor.email or requestor.email.strip() == "": - domain_names = ", ".join([domain.name for domain in domains]) - raise MissingEmailError(email=email, domain=domain_names) - else: - requestor_email = requestor.email + send_invitation_email(email, requestor_email, domains, requested_user) - # Check if the recipient is part of a different organization - # COMMENT: this does not account for multiple_portfolios flag being active + +def normalize_domains(domains): + """Ensures domains is always a list.""" + return [domains] if isinstance(domains, Domain) else domains + + +def get_requestor_email(requestor, domains): + """Get the requestor's email or raise an error if it's missing.""" + if requestor.is_staff: + return settings.DEFAULT_FROM_EMAIL + + if not requestor.email or requestor.email.strip() == "": + domain_names = ", ".join([domain.name for domain in domains]) + raise MissingEmailError(email=requestor.email, domain=domain_names) + + return requestor.email + + +def validate_invitation(email, domains, requestor, is_member_of_different_org): + """Validate the invitation conditions.""" + check_outside_org_membership(email, requestor, is_member_of_different_org) + + for domain in domains: + validate_existing_invitation(email, domain) + + +def check_outside_org_membership(email, requestor, is_member_of_different_org): + """Raise an error if the email belongs to a different organization.""" if ( flag_is_active_for_user(requestor, "organization_feature") and not flag_is_active_for_user(requestor, "multiple_portfolios") and is_member_of_different_org ): - raise OutsideOrgMemberError + raise OutsideOrgMemberError(email=email) - # Check for an existing invitation for each domain - for domain in domains: - try: - invite = DomainInvitation.objects.get(email=email, domain=domain) - if invite.status == DomainInvitation.DomainInvitationStatus.RETRIEVED: - raise AlreadyDomainManagerError(email) - elif invite.status == DomainInvitation.DomainInvitationStatus.CANCELED: - invite.update_cancellation_status() - invite.save() - else: - raise AlreadyDomainInvitedError(email) - except DomainInvitation.DoesNotExist: - pass - # Send the email +def validate_existing_invitation(email, domain): + """Check for existing invitations and handle their status.""" + try: + invite = DomainInvitation.objects.get(email=email, domain=domain) + if invite.status == DomainInvitation.DomainInvitationStatus.RETRIEVED: + raise AlreadyDomainManagerError(email) + elif invite.status == DomainInvitation.DomainInvitationStatus.CANCELED: + invite.update_cancellation_status() + invite.save() + else: + raise AlreadyDomainInvitedError(email) + except DomainInvitation.DoesNotExist: + pass + + +def send_invitation_email(email, requestor_email, domains, requested_user): + """Send the invitation email.""" try: send_templated_email( "emails/domain_invitation.txt", @@ -88,9 +108,7 @@ def send_domain_invitation_email(email: str, requestor, domains: Domain | list[D ) except EmailSendingError as err: domain_names = ", ".join([domain.name for domain in domains]) - raise EmailSendingError( - f"Could not send email invitation to {email} for domains: {domain_names}" - ) from err + raise EmailSendingError(f"Could not send email invitation to {email} for domains: {domain_names}") from err def send_portfolio_invitation_email(email: str, requestor, portfolio): diff --git a/src/registrar/views/domain.py b/src/registrar/views/domain.py index b16e3681d..2c5553a92 100644 --- a/src/registrar/views/domain.py +++ b/src/registrar/views/domain.py @@ -1186,6 +1186,7 @@ class DomainAddUserView(DomainFormBaseView): portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create( email=requested_email, portfolio=domain_org, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER] ) + # if user exists for email, immediately retrieve portfolio invitation upon creation if requested_user is not None: portfolio_invitation.retrieve() portfolio_invitation.save() diff --git a/src/registrar/views/portfolios.py b/src/registrar/views/portfolios.py index 60a96bba6..b9249ca6d 100644 --- a/src/registrar/views/portfolios.py +++ b/src/registrar/views/portfolios.py @@ -296,7 +296,7 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V Processes added domains by bulk creating UserDomainRole instances. """ if added_domain_ids: - print('_process_added_domains') + # get added_domains from ids to pass to send email method and bulk create added_domains = Domain.objects.filter(id__in=added_domain_ids) member_of_a_different_org, _ = get_org_membership(portfolio, member.email, member) send_domain_invitation_email( @@ -514,14 +514,15 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission or creating new ones. """ if added_domain_ids: + # get added_domains from ids to pass to send email method and bulk create added_domains = Domain.objects.filter(id__in=added_domain_ids) member_of_a_different_org, _ = get_org_membership(portfolio, email, None) send_domain_invitation_email( - email=email, - requestor=requestor, - domains=added_domains, - is_member_of_different_org=member_of_a_different_org, - ) + email=email, + requestor=requestor, + domains=added_domains, + is_member_of_different_org=member_of_a_different_org, + ) # Update existing invitations from CANCELED to INVITED existing_invitations = DomainInvitation.objects.filter(domain__in=added_domains, email=email) @@ -542,7 +543,7 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission for domain_id in new_domain_ids ] ) - + def _process_removed_domains(self, removed_domain_ids, email): """ Processes removed domain invitations by updating their status to CANCELED. @@ -777,6 +778,7 @@ class PortfolioAddMemberView(PortfolioMembersPermissionView, FormMixin): if not requested_user or not permission_exists: send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=portfolio) portfolio_invitation = form.save() + # if user exists for email, immediately retrieve portfolio invitation upon creation if requested_user is not None: portfolio_invitation.retrieve() portfolio_invitation.save() diff --git a/src/registrar/views/utility/invitation_helper.py b/src/registrar/views/utility/invitation_helper.py index 2cdde6a5e..771300406 100644 --- a/src/registrar/views/utility/invitation_helper.py +++ b/src/registrar/views/utility/invitation_helper.py @@ -19,6 +19,7 @@ logger = logging.getLogger(__name__) # when creating invitations and sending associated emails. These can be reused in # any view, and were initially developed for domain.py, portfolios.py and admin.py + def get_org_membership(requestor_org, requested_email, requested_user): """ Verifies if an email belongs to a different organization as a member or invited member.