diff --git a/src/registrar/admin.py b/src/registrar/admin.py
index 8f36a2125..4b05bbb6d 100644
--- a/src/registrar/admin.py
+++ b/src/registrar/admin.py
@@ -1327,6 +1327,7 @@ class UserPortfolioPermissionAdmin(ListHeaderAdmin):
search_help_text = "Search by first name, last name, email, or portfolio."
change_form_template = "django/admin/user_portfolio_permission_change_form.html"
+ delete_confirmation_template = "django/admin/user_portfolio_permission_delete_confirmation.html"
def get_roles(self, obj):
readable_roles = obj.get_readable_roles()
@@ -1670,6 +1671,7 @@ class PortfolioInvitationAdmin(BaseInvitationAdmin):
autocomplete_fields = ["portfolio"]
change_form_template = "django/admin/portfolio_invitation_change_form.html"
+ delete_confirmation_template = "django/admin/portfolio_invitation_delete_confirmation.html"
# Select portfolio invitations to change -> Portfolio invitations
def changelist_view(self, request, extra_context=None):
@@ -3739,11 +3741,13 @@ class DomainAdmin(ListHeaderAdmin, ImportExportModelAdmin):
# Using variables to get past the linter
message1 = f"Cannot delete Domain when in state {obj.state}"
message2 = f"This subdomain is being used as a hostname on another domain: {err.note}"
+ message3 = f"Command failed with note: {err.note}"
# Human-readable mappings of ErrorCodes. Can be expanded.
error_messages = {
# noqa on these items as black wants to reformat to an invalid length
ErrorCode.OBJECT_STATUS_PROHIBITS_OPERATION: message1,
ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION: message2,
+ ErrorCode.COMMAND_FAILED: message3,
}
message = "Cannot connect to the registry"
diff --git a/src/registrar/management/commands/disclose_security_emails.py b/src/registrar/management/commands/disclose_security_emails.py
index 62989e4c0..62456747a 100644
--- a/src/registrar/management/commands/disclose_security_emails.py
+++ b/src/registrar/management/commands/disclose_security_emails.py
@@ -1,4 +1,4 @@
-""""
+""" "
Converts all ready and DNS needed domains with a non-default public contact
to disclose their public contact. Created for Issue#1535 to resolve
disclose issue of domains with missing security emails.
diff --git a/src/registrar/management/commands/master_domain_migrations.py b/src/registrar/management/commands/master_domain_migrations.py
index 7f702e047..2907add06 100644
--- a/src/registrar/management/commands/master_domain_migrations.py
+++ b/src/registrar/management/commands/master_domain_migrations.py
@@ -1,8 +1,8 @@
"""Data migration:
- 1 - generates a report of data integrity across all
- transition domain related tables
- 2 - allows users to run all migration scripts for
- transition domain data
+1 - generates a report of data integrity across all
+transition domain related tables
+2 - allows users to run all migration scripts for
+transition domain data
"""
import logging
diff --git a/src/registrar/management/commands/populate_domain_updated_federal_agency.py b/src/registrar/management/commands/populate_domain_updated_federal_agency.py
index dd8ceb3b2..3fbf2792c 100644
--- a/src/registrar/management/commands/populate_domain_updated_federal_agency.py
+++ b/src/registrar/management/commands/populate_domain_updated_federal_agency.py
@@ -1,4 +1,4 @@
-""""
+""" "
Data migration: Renaming deprecated Federal Agencies to
their new updated names ie (U.S. Peace Corps to Peace Corps)
within Domain Information and Domain Requests
diff --git a/src/registrar/models/domain.py b/src/registrar/models/domain.py
index 42310c3bb..d3c0ed347 100644
--- a/src/registrar/models/domain.py
+++ b/src/registrar/models/domain.py
@@ -2,6 +2,7 @@ from itertools import zip_longest
import logging
import ipaddress
import re
+import time
from datetime import date, timedelta
from typing import Optional
from django.db import transaction
@@ -750,11 +751,7 @@ class Domain(TimeStampedModel, DomainHelper):
successTotalNameservers = len(oldNameservers) - deleteCount + addToDomainCount
- try:
- self._delete_hosts_if_not_used(hostsToDelete=deleted_values)
- except Exception as e:
- # we don't need this part to succeed in order to continue.
- logger.error("Failed to delete nameserver hosts: %s", e)
+ self._delete_hosts_if_not_used(hostsToDelete=deleted_values)
if successTotalNameservers < 2:
try:
@@ -1038,13 +1035,13 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error(f"registry error removing client hold: {err}")
raise (err)
- def _delete_domain(self):
+ def _delete_domain(self): # noqa
"""This domain should be deleted from the registry
may raises RegistryError, should be caught or handled correctly by caller"""
logger.info("Deleting subdomains for %s", self.name)
# check if any subdomains are in use by another domain
- hosts = Host.objects.filter(name__regex=r".+{}".format(self.name))
+ hosts = Host.objects.filter(name__regex=r".+\.{}".format(self.name))
for host in hosts:
if host.domain != self:
logger.error("Unable to delete host: %s is in use by another domain: %s", host.name, host.domain)
@@ -1052,38 +1049,119 @@ class Domain(TimeStampedModel, DomainHelper):
code=ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION,
note=f"Host {host.name} is in use by {host.domain}",
)
+ try:
+ # set hosts to empty list so nameservers are deleted
+ (
+ deleted_values,
+ updated_values,
+ new_values,
+ oldNameservers,
+ ) = self.getNameserverChanges(hosts=[])
- (
- deleted_values,
- updated_values,
- new_values,
- oldNameservers,
- ) = self.getNameserverChanges(hosts=[])
-
- _ = self._update_host_values(updated_values, oldNameservers) # returns nothing, just need to be run and errors
- addToDomainList, _ = self.createNewHostList(new_values)
- deleteHostList, _ = self.createDeleteHostList(deleted_values)
- responseCode = self.addAndRemoveHostsFromDomain(hostsToAdd=addToDomainList, hostsToDelete=deleteHostList)
-
+ # update the hosts
+ _ = self._update_host_values(
+ updated_values, oldNameservers
+ ) # returns nothing, just need to be run and errors
+ addToDomainList, _ = self.createNewHostList(new_values)
+ deleteHostList, _ = self.createDeleteHostList(deleted_values)
+ responseCode = self.addAndRemoveHostsFromDomain(hostsToAdd=addToDomainList, hostsToDelete=deleteHostList)
+ except RegistryError as e:
+ logger.error(f"Error trying to delete hosts from domain {self}: {e}")
+ raise e
# if unable to update domain raise error and stop
if responseCode != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
raise NameserverError(code=nsErrorCodes.BAD_DATA)
+ logger.info("Finished removing nameservers from domain")
+
# addAndRemoveHostsFromDomain removes the hosts from the domain object,
# but we still need to delete the object themselves
self._delete_hosts_if_not_used(hostsToDelete=deleted_values)
+ logger.info("Finished _delete_hosts_if_not_used inside _delete_domain()")
+ # delete the non-registrant contacts
logger.debug("Deleting non-registrant contacts for %s", self.name)
contacts = PublicContact.objects.filter(domain=self)
- for contact in contacts:
- if contact.contact_type != PublicContact.ContactTypeChoices.REGISTRANT:
- self._update_domain_with_contact(contact, rem=True)
- request = commands.DeleteContact(contact.registry_id)
- registry.send(request, cleaned=True)
+ logger.info(f"retrieved contacts for domain: {contacts}")
- logger.info("Deleting domain %s", self.name)
+ for contact in contacts:
+ try:
+ if contact.contact_type != PublicContact.ContactTypeChoices.REGISTRANT:
+ logger.info(f"Deleting contact: {contact}")
+ try:
+ self._update_domain_with_contact(contact, rem=True)
+ except Exception as e:
+ logger.error(f"Error while updating domain with contact: {contact}, e: {e}", exc_info=True)
+ request = commands.DeleteContact(contact.registry_id)
+ registry.send(request, cleaned=True)
+ logger.info(f"sent DeleteContact for {contact}")
+ except RegistryError as e:
+ logger.error(f"Error deleting contact: {contact}, {e}", exc_info=True)
+
+ logger.info(f"Finished deleting contacts for {self.name}")
+
+ # delete ds data if it exists
+ if self.dnssecdata:
+ logger.debug("Deleting ds data for %s", self.name)
+ try:
+ # set and unset client hold to be able to change ds data
+ logger.info("removing client hold")
+ self._remove_client_hold()
+ self.dnssecdata = None
+ logger.info("placing client hold")
+ self._place_client_hold()
+ except RegistryError as e:
+ logger.error("Error deleting ds data for %s: %s", self.name, e)
+ e.note = "Error deleting ds data for %s" % self.name
+ raise e
+
+ # check if the domain can be deleted
+ if not self._domain_can_be_deleted():
+ note = "Domain has associated objects that prevent deletion."
+ raise RegistryError(code=ErrorCode.COMMAND_FAILED, note=note)
+
+ # delete the domain
request = commands.DeleteDomain(name=self.name)
- registry.send(request, cleaned=True)
+ try:
+ registry.send(request, cleaned=True)
+ logger.info("Domain %s deleted successfully.", self.name)
+ except RegistryError as e:
+ logger.error("Error deleting domain %s: %s", self.name, e)
+ raise e
+
+ def _domain_can_be_deleted(self, max_attempts=5, wait_interval=2) -> bool:
+ """
+ Polls the registry using InfoDomain calls to confirm that the domain can be deleted.
+ Returns True if the domain can be deleted, False otherwise. Includes a retry mechanism
+ using wait_interval and max_attempts, which may be necessary if subdomains and other
+ associated objects were only recently deleted as the registry may not be immediately updated.
+ """
+ logger.info("Polling registry to confirm deletion pre-conditions for %s", self.name)
+ last_info_error = None
+ for attempt in range(max_attempts):
+ try:
+ info_response = registry.send(commands.InfoDomain(name=self.name), cleaned=True)
+ domain_info = info_response.res_data[0]
+ hosts_associated = getattr(domain_info, "hosts", None)
+ if hosts_associated is None or len(hosts_associated) == 0:
+ logger.info("InfoDomain reports no associated hosts for %s. Proceeding with deletion.", self.name)
+ return True
+ else:
+ logger.info("Attempt %d: Domain %s still has hosts: %s", attempt + 1, self.name, hosts_associated)
+ except RegistryError as info_e:
+ # If the domain is already gone, we can assume deletion already occurred.
+ if info_e.code == ErrorCode.OBJECT_DOES_NOT_EXIST:
+ logger.info("InfoDomain check indicates domain %s no longer exists.", self.name)
+ raise info_e
+ logger.warning("Attempt %d: Error during InfoDomain check: %s", attempt + 1, info_e)
+ time.sleep(wait_interval)
+ else:
+ logger.error(
+ "Exceeded max attempts waiting for domain %s to clear associated objects; last error: %s",
+ self.name,
+ last_info_error,
+ )
+ return False
def __str__(self) -> str:
return self.name
@@ -1840,8 +1918,6 @@ class Domain(TimeStampedModel, DomainHelper):
else:
logger.error("Error _delete_hosts_if_not_used, code was %s error was %s" % (e.code, e))
- raise e
-
def _fix_unknown_state(self, cleaned):
"""
_fix_unknown_state: Calls _add_missing_contacts_if_unknown
diff --git a/src/registrar/templates/django/admin/portfolio_invitation_delete_confirmation.html b/src/registrar/templates/django/admin/portfolio_invitation_delete_confirmation.html
new file mode 100644
index 000000000..932822766
--- /dev/null
+++ b/src/registrar/templates/django/admin/portfolio_invitation_delete_confirmation.html
@@ -0,0 +1,17 @@
+{% extends "admin/delete_confirmation.html" %}
+
+{% block content_subtitle %}
+
+
+
+ If you cancel the portfolio invitation here, it won't trigger any emails. It also won't remove the user's
+ portfolio access if they already logged in. Go to the
+
+ User Portfolio Permissions
+
+ table if you want to remove the user from a portfolio.
+
+
+
+ {{ block.super }}
+{% endblock %}
diff --git a/src/registrar/templates/django/admin/user_portfolio_permission_delete_confirmation.html b/src/registrar/templates/django/admin/user_portfolio_permission_delete_confirmation.html
new file mode 100644
index 000000000..71c789a63
--- /dev/null
+++ b/src/registrar/templates/django/admin/user_portfolio_permission_delete_confirmation.html
@@ -0,0 +1,12 @@
+{% extends "admin/delete_confirmation.html" %}
+
+{% block content_subtitle %}
+
+
+
+ If you remove someone from a portfolio here, it will not send any emails when you click "Save".
+
+
+
+ {{ block.super }}
+{% endblock %}
diff --git a/src/registrar/templates/emails/portfolio_removal.txt b/src/registrar/templates/emails/portfolio_removal.txt
new file mode 100644
index 000000000..779f37546
--- /dev/null
+++ b/src/registrar/templates/emails/portfolio_removal.txt
@@ -0,0 +1,21 @@
+{% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #}
+Hi,{% if requested_user and requested_user.first_name %} {{ requested_user.first_name }}.{% endif %}
+
+{{ requestor_email }} has removed you from {{ portfolio.organization_name }}.
+
+You can no longer view this organization or its related domains within the .gov registrar.
+
+
+SOMETHING WRONG?
+If you have questions or concerns, reach out to the person who removed you from the
+organization, or reply to this email.
+
+
+----------------------------------------------------------------
+
+The .gov team
+Contact us:
+Learn about .gov
+
+The .gov registry is a part of the Cybersecurity and Infrastructure Security Agency (CISA)
+{% endautoescape %}
diff --git a/src/registrar/templates/emails/portfolio_removal_subject.txt b/src/registrar/templates/emails/portfolio_removal_subject.txt
new file mode 100644
index 000000000..d60ef9859
--- /dev/null
+++ b/src/registrar/templates/emails/portfolio_removal_subject.txt
@@ -0,0 +1 @@
+You've been removed from a .gov organization
\ No newline at end of file
diff --git a/src/registrar/tests/test_admin.py b/src/registrar/tests/test_admin.py
index aadb85c66..ccd0e6cc2 100644
--- a/src/registrar/tests/test_admin.py
+++ b/src/registrar/tests/test_admin.py
@@ -55,6 +55,7 @@ from .common import (
MockDbForSharedTests,
AuditedAdminMockData,
completed_domain_request,
+ create_test_user,
generic_domain_object,
less_console_noise,
mock_user,
@@ -1135,6 +1136,7 @@ class TestUserPortfolioPermissionAdmin(TestCase):
"""Create a client object"""
self.client = Client(HTTP_HOST="localhost:8080")
self.superuser = create_superuser()
+ self.testuser = create_test_user()
self.portfolio = Portfolio.objects.create(organization_name="Test Portfolio", creator=self.superuser)
def tearDown(self):
@@ -1167,6 +1169,21 @@ class TestUserPortfolioPermissionAdmin(TestCase):
"If you add someone to a portfolio here, it will not trigger an invitation email.",
)
+ @less_console_noise_decorator
+ def test_delete_confirmation_page_contains_static_message(self):
+ """Ensure the custom message appears in the delete confirmation page."""
+ self.client.force_login(self.superuser)
+ # Create a test portfolio permission
+ self.permission = UserPortfolioPermission.objects.create(
+ user=self.testuser, portfolio=self.portfolio, roles=["organization_member"]
+ )
+ delete_url = reverse("admin:registrar_userportfoliopermission_delete", args=[self.permission.pk])
+ response = self.client.get(delete_url)
+
+ # Check if the response contains the expected static message
+ expected_message = "If you remove someone from a portfolio here, it will not send any emails"
+ self.assertIn(expected_message, response.content.decode("utf-8"))
+
class TestPortfolioInvitationAdmin(TestCase):
"""Tests for the PortfolioInvitationAdmin class as super user
@@ -1605,6 +1622,21 @@ class TestPortfolioInvitationAdmin(TestCase):
request, "Could not send email notification to existing organization admins."
)
+ @less_console_noise_decorator
+ def test_delete_confirmation_page_contains_static_message(self):
+ """Ensure the custom message appears in the delete confirmation page."""
+ self.client.force_login(self.superuser)
+ # Create a test portfolio invitation
+ self.invitation = PortfolioInvitation.objects.create(
+ email="testuser@example.com", portfolio=self.portfolio, roles=["organization_member"]
+ )
+ delete_url = reverse("admin:registrar_portfolioinvitation_delete", args=[self.invitation.pk])
+ response = self.client.get(delete_url)
+
+ # Check if the response contains the expected static message
+ expected_message = "If you cancel the portfolio invitation here"
+ self.assertIn(expected_message, response.content.decode("utf-8"))
+
class TestHostAdmin(TestCase):
"""Tests for the HostAdmin class as super user
@@ -3816,7 +3848,7 @@ class TestTransferUser(WebTest):
with self.assertRaises(User.DoesNotExist):
self.user2.refresh_from_db()
- # @less_console_noise_decorator
+ @less_console_noise_decorator
def test_transfer_user_throws_transfer_and_delete_success_messages(self):
"""Test that success messages for data transfer and user deletion are displayed."""
# Ensure the setup for VerifiedByStaff
diff --git a/src/registrar/tests/test_admin_domain.py b/src/registrar/tests/test_admin_domain.py
index 17d3f38bd..969d043d7 100644
--- a/src/registrar/tests/test_admin_domain.py
+++ b/src/registrar/tests/test_admin_domain.py
@@ -178,7 +178,7 @@ class TestDomainAdminAsStaff(MockEppLib):
Then a user-friendly success message is returned for displaying on the web
And `state` is set to `DELETED`
"""
- domain = create_ready_domain()
+ domain, _ = Domain.objects.get_or_create(name="my-nameserver.gov", state=Domain.State.READY)
# Put in client hold
domain.place_client_hold()
# Ensure everything is displaying correctly
@@ -212,7 +212,7 @@ class TestDomainAdminAsStaff(MockEppLib):
mock_add_message.assert_called_once_with(
request,
messages.INFO,
- "Domain city.gov has been deleted. Thanks!",
+ "Domain my-nameserver.gov has been deleted. Thanks!",
extra_tags="",
fail_silently=False,
)
@@ -266,7 +266,7 @@ class TestDomainAdminAsStaff(MockEppLib):
mock_add_message.assert_called_once_with(
request,
messages.ERROR,
- "Error deleting this Domain: This subdomain is being used as a hostname on another domain: ns1.sharedhost.com", # noqa
+ "Error deleting this Domain: Command failed with note: Domain has associated objects that prevent deletion.", # noqa
extra_tags="",
fail_silently=False,
)
@@ -321,7 +321,7 @@ class TestDomainAdminAsStaff(MockEppLib):
Then `commands.DeleteDomain` is sent to the registry
And Domain returns normally without an error dialog
"""
- domain = create_ready_domain()
+ domain, _ = Domain.objects.get_or_create(name="my-nameserver.gov", state=Domain.State.READY)
# Put in client hold
domain.place_client_hold()
# Ensure everything is displaying correctly
@@ -340,12 +340,13 @@ class TestDomainAdminAsStaff(MockEppLib):
)
request.user = self.client
# Delete it once
+
with patch("django.contrib.messages.add_message") as mock_add_message:
self.admin.do_delete_domain(request, domain)
mock_add_message.assert_called_once_with(
request,
messages.INFO,
- "Domain city.gov has been deleted. Thanks!",
+ "Domain my-nameserver.gov has been deleted. Thanks!",
extra_tags="",
fail_silently=False,
)
diff --git a/src/registrar/tests/test_email_invitations.py b/src/registrar/tests/test_email_invitations.py
index 981bca6dd..2331d35e8 100644
--- a/src/registrar/tests/test_email_invitations.py
+++ b/src/registrar/tests/test_email_invitations.py
@@ -3,6 +3,7 @@ from unittest.mock import patch, MagicMock
from datetime import date
from registrar.models.domain import Domain
from registrar.models.portfolio import Portfolio
+from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user import User
from registrar.models.user_domain_role import UserDomainRole
from registrar.models.user_portfolio_permission import UserPortfolioPermission
@@ -16,6 +17,8 @@ from registrar.utility.email_invitations import (
send_portfolio_admin_addition_emails,
send_portfolio_admin_removal_emails,
send_portfolio_invitation_email,
+ send_portfolio_invitation_remove_email,
+ send_portfolio_member_permission_remove_email,
send_portfolio_member_permission_update_email,
)
@@ -963,3 +966,149 @@ class TestSendPortfolioMemberPermissionUpdateEmail(unittest.TestCase):
# Assertions
mock_logger.warning.assert_not_called() # Function should fail before logging email failure
+
+
+class TestSendPortfolioMemberPermissionRemoveEmail(unittest.TestCase):
+ """Unit tests for send_portfolio_member_permission_remove_email function."""
+
+ @patch("registrar.utility.email_invitations.send_templated_email")
+ @patch("registrar.utility.email_invitations._get_requestor_email")
+ def test_send_email_success(self, mock_get_requestor_email, mock_send_email):
+ """Test that the email is sent successfully when there are no errors."""
+ # Mock data
+ requestor = MagicMock()
+ permissions = MagicMock(spec=UserPortfolioPermission)
+ permissions.user.email = "user@example.com"
+ permissions.portfolio.organization_name = "Test Portfolio"
+
+ mock_get_requestor_email.return_value = "requestor@example.com"
+
+ # Call function
+ result = send_portfolio_member_permission_remove_email(requestor, permissions)
+
+ # Assertions
+ mock_get_requestor_email.assert_called_once_with(requestor, portfolio=permissions.portfolio)
+ mock_send_email.assert_called_once_with(
+ "emails/portfolio_removal.txt",
+ "emails/portfolio_removal_subject.txt",
+ to_address="user@example.com",
+ context={
+ "requested_user": permissions.user,
+ "portfolio": permissions.portfolio,
+ "requestor_email": "requestor@example.com",
+ },
+ )
+ self.assertTrue(result)
+
+ @patch("registrar.utility.email_invitations.send_templated_email", side_effect=EmailSendingError("Email failed"))
+ @patch("registrar.utility.email_invitations._get_requestor_email")
+ @patch("registrar.utility.email_invitations.logger")
+ def test_send_email_failure(self, mock_logger, mock_get_requestor_email, mock_send_email):
+ """Test that the function returns False and logs an error when email sending fails."""
+ # Mock data
+ requestor = MagicMock()
+ permissions = MagicMock(spec=UserPortfolioPermission)
+ permissions.user.email = "user@example.com"
+ permissions.portfolio.organization_name = "Test Portfolio"
+
+ mock_get_requestor_email.return_value = "requestor@example.com"
+
+ # Call function
+ result = send_portfolio_member_permission_remove_email(requestor, permissions)
+
+ # Assertions
+ mock_logger.warning.assert_called_once_with(
+ "Could not send email organization member removal notification to %s for portfolio: %s",
+ permissions.user.email,
+ permissions.portfolio.organization_name,
+ exc_info=True,
+ )
+ self.assertFalse(result)
+
+ @patch("registrar.utility.email_invitations._get_requestor_email", side_effect=Exception("Unexpected error"))
+ @patch("registrar.utility.email_invitations.logger")
+ def test_requestor_email_retrieval_failure(self, mock_logger, mock_get_requestor_email):
+ """Test that an exception in retrieving requestor email is logged."""
+ # Mock data
+ requestor = MagicMock()
+ permissions = MagicMock(spec=UserPortfolioPermission)
+
+ # Call function
+ with self.assertRaises(Exception):
+ send_portfolio_member_permission_remove_email(requestor, permissions)
+
+ # Assertions
+ mock_logger.warning.assert_not_called() # Function should fail before logging email failure
+
+
+class TestSendPortfolioInvitationRemoveEmail(unittest.TestCase):
+ """Unit tests for send_portfolio_invitation_remove_email function."""
+
+ @patch("registrar.utility.email_invitations.send_templated_email")
+ @patch("registrar.utility.email_invitations._get_requestor_email")
+ def test_send_email_success(self, mock_get_requestor_email, mock_send_email):
+ """Test that the email is sent successfully when there are no errors."""
+ # Mock data
+ requestor = MagicMock()
+ invitation = MagicMock(spec=PortfolioInvitation)
+ invitation.email = "user@example.com"
+ invitation.portfolio.organization_name = "Test Portfolio"
+
+ mock_get_requestor_email.return_value = "requestor@example.com"
+
+ # Call function
+ result = send_portfolio_invitation_remove_email(requestor, invitation)
+
+ # Assertions
+ mock_get_requestor_email.assert_called_once_with(requestor, portfolio=invitation.portfolio)
+ mock_send_email.assert_called_once_with(
+ "emails/portfolio_removal.txt",
+ "emails/portfolio_removal_subject.txt",
+ to_address="user@example.com",
+ context={
+ "requested_user": None,
+ "portfolio": invitation.portfolio,
+ "requestor_email": "requestor@example.com",
+ },
+ )
+ self.assertTrue(result)
+
+ @patch("registrar.utility.email_invitations.send_templated_email", side_effect=EmailSendingError("Email failed"))
+ @patch("registrar.utility.email_invitations._get_requestor_email")
+ @patch("registrar.utility.email_invitations.logger")
+ def test_send_email_failure(self, mock_logger, mock_get_requestor_email, mock_send_email):
+ """Test that the function returns False and logs an error when email sending fails."""
+ # Mock data
+ requestor = MagicMock()
+ invitation = MagicMock(spec=PortfolioInvitation)
+ invitation.email = "user@example.com"
+ invitation.portfolio.organization_name = "Test Portfolio"
+
+ mock_get_requestor_email.return_value = "requestor@example.com"
+
+ # Call function
+ result = send_portfolio_invitation_remove_email(requestor, invitation)
+
+ # Assertions
+ mock_logger.warning.assert_called_once_with(
+ "Could not send email organization member removal notification to %s for portfolio: %s",
+ invitation.email,
+ invitation.portfolio.organization_name,
+ exc_info=True,
+ )
+ self.assertFalse(result)
+
+ @patch("registrar.utility.email_invitations._get_requestor_email", side_effect=Exception("Unexpected error"))
+ @patch("registrar.utility.email_invitations.logger")
+ def test_requestor_email_retrieval_failure(self, mock_logger, mock_get_requestor_email):
+ """Test that an exception in retrieving requestor email is logged."""
+ # Mock data
+ requestor = MagicMock()
+ invitation = MagicMock(spec=PortfolioInvitation)
+
+ # Call function
+ with self.assertRaises(Exception):
+ send_portfolio_invitation_remove_email(requestor, invitation)
+
+ # Assertions
+ mock_logger.warning.assert_not_called() # Function should fail before logging email failure
diff --git a/src/registrar/tests/test_models_domain.py b/src/registrar/tests/test_models_domain.py
index 083725a55..93072f93b 100644
--- a/src/registrar/tests/test_models_domain.py
+++ b/src/registrar/tests/test_models_domain.py
@@ -35,6 +35,7 @@ from epplibwrapper import (
from .common import MockEppLib, MockSESClient, less_console_noise
import logging
import boto3_mocking # type: ignore
+import copy
logger = logging.getLogger(__name__)
@@ -97,58 +98,59 @@ class TestDomainCache(MockEppLib):
self.mockedSendFunction.assert_has_calls(expectedCalls)
+ # @less_console_noise_decorator
def test_cache_nested_elements_not_subdomain(self):
"""Cache works correctly with the nested objects cache and hosts"""
- with less_console_noise():
- domain, _ = Domain.objects.get_or_create(name="igorville.gov")
- # The contact list will initially contain objects of type 'DomainContact'
- # this is then transformed into PublicContact, and cache should NOT
- # hold onto the DomainContact object
- expectedUnfurledContactsList = [
- common.DomainContact(contact="123", type="security"),
- ]
- expectedContactsDict = {
- PublicContact.ContactTypeChoices.ADMINISTRATIVE: "adminContact",
- PublicContact.ContactTypeChoices.SECURITY: "securityContact",
- PublicContact.ContactTypeChoices.TECHNICAL: "technicalContact",
- }
- expectedHostsDict = {
- "name": self.mockDataInfoDomain.hosts[0],
- "addrs": [], # should return empty bc fake.host.com is not a subdomain of igorville.gov
- "cr_date": self.mockDataInfoHosts.cr_date,
- }
- # this can be changed when the getter for contacts is implemented
- domain._get_property("contacts")
+ domain, _ = Domain.objects.get_or_create(name="igorville.gov")
+ # The contact list will initially contain objects of type 'DomainContact'
+ # this is then transformed into PublicContact, and cache should NOT
+ # hold onto the DomainContact object
+ expectedUnfurledContactsList = [
+ common.DomainContact(contact="123", type="security"),
+ ]
+ expectedContactsDict = {
+ PublicContact.ContactTypeChoices.ADMINISTRATIVE: "adminContact",
+ PublicContact.ContactTypeChoices.SECURITY: "securityContact",
+ PublicContact.ContactTypeChoices.TECHNICAL: "technicalContact",
+ }
+ expectedHostsDict = {
+ "name": self.mockDataInfoDomain.hosts[0],
+ "addrs": [], # should return empty bc fake.host.com is not a subdomain of igorville.gov
+ "cr_date": self.mockDataInfoHosts.cr_date,
+ }
- # check domain info is still correct and not overridden
- self.assertEqual(domain._cache["auth_info"], self.mockDataInfoDomain.auth_info)
- self.assertEqual(domain._cache["cr_date"], self.mockDataInfoDomain.cr_date)
+ # this can be changed when the getter for contacts is implemented
+ domain._get_property("contacts")
- # check contacts
- self.assertEqual(domain._cache["_contacts"], self.mockDataInfoDomain.contacts)
- # The contact list should not contain what is sent by the registry by default,
- # as _fetch_cache will transform the type to PublicContact
- self.assertNotEqual(domain._cache["contacts"], expectedUnfurledContactsList)
+ # check domain info is still correct and not overridden
+ self.assertEqual(domain._cache["auth_info"], self.mockDataInfoDomain.auth_info)
+ self.assertEqual(domain._cache["cr_date"], self.mockDataInfoDomain.cr_date)
- self.assertEqual(domain._cache["contacts"], expectedContactsDict)
+ # check contacts
+ self.assertEqual(domain._cache["_contacts"], self.mockDataInfoDomain.contacts)
+ # The contact list should not contain what is sent by the registry by default,
+ # as _fetch_cache will transform the type to PublicContact
+ self.assertNotEqual(domain._cache["contacts"], expectedUnfurledContactsList)
- # get and check hosts is set correctly
- domain._get_property("hosts")
- self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
- self.assertEqual(domain._cache["contacts"], expectedContactsDict)
- # invalidate cache
- domain._cache = {}
+ self.assertEqual(domain._cache["contacts"], expectedContactsDict)
- # get host
- domain._get_property("hosts")
- # Should return empty bc fake.host.com is not a subdomain of igorville.gov
- self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
+ # get and check hosts is set correctly
+ domain._get_property("hosts")
+ self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
+ self.assertEqual(domain._cache["contacts"], expectedContactsDict)
+ # invalidate cache
+ domain._cache = {}
- # get contacts
- domain._get_property("contacts")
- self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
- self.assertEqual(domain._cache["contacts"], expectedContactsDict)
+ # get host
+ domain._get_property("hosts")
+ # Should return empty bc fake.host.com is not a subdomain of igorville.gov
+ self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
+
+ # get contacts
+ domain._get_property("contacts")
+ self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
+ self.assertEqual(domain._cache["contacts"], expectedContactsDict)
def test_cache_nested_elements_is_subdomain(self):
"""Cache works correctly with the nested objects cache and hosts"""
@@ -1248,6 +1250,13 @@ class TestRegistrantNameservers(MockEppLib):
name="threenameserversDomain.gov", state=Domain.State.READY
)
+ def tearDown(self):
+ PublicContact.objects.all().delete()
+ HostIP.objects.all().delete()
+ Host.objects.all().delete()
+ Domain.objects.all().delete()
+ super().tearDown()
+
def test_get_nameserver_changes_success_deleted_vals(self):
"""Testing only deleting and no other changes"""
with less_console_noise():
@@ -1797,6 +1806,7 @@ class TestRegistrantNameservers(MockEppLib):
mock_host_ip_get_or_create.assert_not_called()
self.assertEqual(mock_host_ip_get_or_create.call_count, 0)
+ # @less_console_noise_decorator
def test_nameservers_stored_on_fetch_cache_not_subdomain_with_ip(self):
"""
Scenario: Nameservers are stored in db when they are retrieved from fetch_cache.
@@ -1808,21 +1818,20 @@ class TestRegistrantNameservers(MockEppLib):
#3: Nameserver is not a subdomain, but it does have an IP address returned
due to how we set up our defaults
"""
- with less_console_noise():
- domain, _ = Domain.objects.get_or_create(name="fake.gov", state=Domain.State.READY)
+ domain, _ = Domain.objects.get_or_create(name="freeman.gov", state=Domain.State.READY)
- with patch.object(Host.objects, "get_or_create") as mock_host_get_or_create, patch.object(
- HostIP.objects, "get_or_create"
- ) as mock_host_ip_get_or_create:
- mock_host_get_or_create.return_value = (Host(domain=domain), True)
- mock_host_ip_get_or_create.return_value = (HostIP(), True)
+ with patch.object(Host.objects, "get_or_create") as mock_host_get_or_create, patch.object(
+ HostIP.objects, "get_or_create"
+ ) as mock_host_ip_get_or_create:
+ mock_host_get_or_create.return_value = (Host(domain=domain), True)
+ mock_host_ip_get_or_create.return_value = (HostIP(), True)
- # force fetch_cache to be called, which will return above documented mocked hosts
- domain.nameservers
+ # force fetch_cache to be called, which will return above documented mocked hosts
+ domain.nameservers
- mock_host_get_or_create.assert_called_once_with(domain=domain, name="fake.host.com")
- mock_host_ip_get_or_create.assert_not_called()
- self.assertEqual(mock_host_ip_get_or_create.call_count, 0)
+ mock_host_get_or_create.assert_called_once_with(domain=domain, name="fake.host.com")
+ mock_host_ip_get_or_create.assert_not_called()
+ self.assertEqual(mock_host_ip_get_or_create.call_count, 0)
def test_nameservers_stored_on_fetch_cache_not_subdomain_without_ip(self):
"""
@@ -1861,12 +1870,6 @@ class TestRegistrantNameservers(MockEppLib):
with self.assertRaises(RegistryError):
domain.nameservers = [("ns1.failednameserver.gov", ["4.5.6"])]
- def tearDown(self):
- HostIP.objects.all().delete()
- Host.objects.all().delete()
- Domain.objects.all().delete()
- return super().tearDown()
-
class TestNameserverValidation(TestCase):
"""Test the isValidDomain method which validates nameservers"""
@@ -1947,8 +1950,6 @@ class TestRegistrantDNSSEC(MockEppLib):
And a domain exists in the registry
"""
super().setUp()
- # for the tests, need a domain in the unknown state
- self.domain, _ = Domain.objects.get_or_create(name="fake.gov")
def tearDown(self):
PublicContact.objects.all().delete()
@@ -2041,6 +2042,7 @@ class TestRegistrantDNSSEC(MockEppLib):
self.assertEquals(dnssecdata_get.dsData, self.dnssecExtensionWithDsData.dsData)
patcher.stop()
+ @less_console_noise_decorator
def test_dnssec_is_idempotent(self):
"""
Scenario: Registrant adds DNS data twice, due to a UI glitch
@@ -2126,6 +2128,7 @@ class TestRegistrantDNSSEC(MockEppLib):
self.assertEquals(dnssecdata_get.dsData, self.dnssecExtensionWithDsData.dsData)
patcher.stop()
+ @less_console_noise_decorator
def test_user_adds_dnssec_data_multiple_dsdata(self):
"""
Scenario: Registrant adds DNSSEC data with multiple DSData.
@@ -2194,6 +2197,7 @@ class TestRegistrantDNSSEC(MockEppLib):
self.assertEquals(dnssecdata_get.dsData, self.dnssecExtensionWithMultDsData.dsData)
patcher.stop()
+ # @less_console_noise_decorator
def test_user_removes_dnssec_data(self):
"""
Scenario: Registrant removes DNSSEC ds data.
@@ -2219,28 +2223,27 @@ class TestRegistrantDNSSEC(MockEppLib):
else:
return MagicMock(res_data=[self.mockDataInfoHosts])
- with less_console_noise():
- patcher = patch("registrar.models.domain.registry.send")
- mocked_send = patcher.start()
+ with patch("registrar.models.domain.registry.send") as mocked_send:
mocked_send.side_effect = side_effect
+
domain, _ = Domain.objects.get_or_create(name="dnssec-dsdata.gov")
- # Initial setting of dnssec data
domain.dnssecdata = self.dnssecExtensionWithDsData
# Check dsdata_last_change is updated
domain = Domain.objects.get(name="dnssec-dsdata.gov")
self.assertIsNotNone(domain.dsdata_last_change)
-
initial_change = domain.dsdata_last_change
+ # Invalidate the cache to force a fresh lookup
+ domain._invalidate_cache()
+
# Remove dnssec data
domain.dnssecdata = self.dnssecExtensionRemovingDsData
# Check that dsdata_last_change is updated again
domain = Domain.objects.get(name="dnssec-dsdata.gov")
self.assertIsNotNone(domain.dsdata_last_change)
-
self.assertNotEqual(domain.dsdata_last_change, initial_change)
# get the DNS SEC extension added to the UpdateDomain command and
@@ -2292,7 +2295,6 @@ class TestRegistrantDNSSEC(MockEppLib):
),
]
)
- patcher.stop()
def test_update_is_unsuccessful(self):
"""
@@ -2697,38 +2699,6 @@ class TestAnalystDelete(MockEppLib):
Domain.objects.all().delete()
super().tearDown()
- @less_console_noise_decorator
- def test_analyst_deletes_domain(self):
- """
- Scenario: Analyst permanently deletes a domain
- When `domain.deletedInEpp()` is called
- Then `commands.DeleteDomain` is sent to the registry
- And `state` is set to `DELETED`
-
- The deleted date is set.
- """
- # Put the domain in client hold
- self.domain.place_client_hold()
- # Delete it...
- self.domain.deletedInEpp()
- self.domain.save()
- self.mockedSendFunction.assert_has_calls(
- [
- call(
- commands.DeleteDomain(name="fake.gov"),
- cleaned=True,
- )
- ]
- )
- # Domain itself should not be deleted
- self.assertNotEqual(self.domain, None)
- # Domain should have the right state
- self.assertEqual(self.domain.state, Domain.State.DELETED)
- # Domain should have a deleted
- self.assertNotEqual(self.domain.deleted, None)
- # Cache should be invalidated
- self.assertEqual(self.domain._cache, {})
-
@less_console_noise_decorator
def test_deletion_is_unsuccessful(self):
"""
@@ -2756,18 +2726,44 @@ class TestAnalystDelete(MockEppLib):
@less_console_noise_decorator
def test_deletion_with_host_and_contacts(self):
"""
- Scenario: Domain with related Host and Contacts is Deleted
- When a contact and host exists that is tied to this domain
- Then all the needed commands are sent to the registry
- And `state` is set to `DELETED`
- """
- # Put the domain in client hold
- self.domain_with_contacts.place_client_hold()
- # Delete it
- self.domain_with_contacts.deletedInEpp()
- self.domain_with_contacts.save()
+ Scenario: Domain with related Host and Contacts is Deleted.
+ When a contact and host exists that is tied to this domain,
+ then all the needed commands are sent to the registry and
+ the domain's state is set to DELETED.
- # Check that the host and contacts are deleted
+ This test now asserts only the commands that are actually issued
+ during the deletion process.
+ """
+ # Put the domain in client hold.
+ self.domain_with_contacts.place_client_hold()
+
+ # Invalidate the cache so that deletion fetches fresh data.
+ self.domain_with_contacts._invalidate_cache()
+
+ # We'll use a mutable counter to simulate different responses if needed.
+ info_domain_call_count = [0]
+
+ # TODO: This is a hack, we should refactor the MockEPPLib to be more flexible
+ def side_effect(request, cleaned=True):
+ # For an InfoDomain command for "freeman.gov", simulate behavior:
+ if isinstance(request, commands.InfoDomain) and request.name.lower() == "freeman.gov":
+ info_domain_call_count[0] += 1
+ fake_info = copy.deepcopy(self.InfoDomainWithContacts)
+ # If this branch ever gets hit, you could vary response based on call count.
+ # But note: in our current deletion flow, InfoDomain may not be called.
+ if info_domain_call_count[0] == 1:
+ fake_info.hosts = ["fake.host.com"]
+ else:
+ fake_info.hosts = []
+ return MagicMock(res_data=[fake_info])
+ return self.mockedSendFunction(request, cleaned=cleaned)
+
+ with patch("registrar.models.domain.registry.send", side_effect=side_effect):
+ self.domain_with_contacts.deletedInEpp()
+ self.domain_with_contacts.save()
+
+ # Now assert the expected calls that we know occur.
+ # Note: we no longer assert a call to InfoDomain.
self.mockedSendFunction.assert_has_calls(
[
call(
@@ -2782,14 +2778,10 @@ class TestAnalystDelete(MockEppLib):
),
cleaned=True,
),
- ]
+ ],
)
self.mockedSendFunction.assert_has_calls(
[
- call(
- commands.InfoDomain(name="freeman.gov", auth_info=None),
- cleaned=True,
- ),
call(
commands.InfoHost(name="fake.host.com"),
cleaned=True,
@@ -2806,7 +2798,8 @@ class TestAnalystDelete(MockEppLib):
),
cleaned=True,
),
- ]
+ ],
+ any_order=True,
)
self.mockedSendFunction.assert_has_calls(
[
@@ -2857,12 +2850,55 @@ class TestAnalystDelete(MockEppLib):
),
],
)
-
- # Domain itself should not be deleted
- self.assertNotEqual(self.domain_with_contacts, None)
- # State should have changed
+ self.assertIsNotNone(self.domain_with_contacts)
self.assertEqual(self.domain_with_contacts.state, Domain.State.DELETED)
+ @less_console_noise_decorator
+ def test_analyst_deletes_domain_with_ds_data(self):
+ """
+ Scenario: Domain with DS data is deleted
+ When `domain.deletedInEpp()` is called
+ Then `commands.DeleteDomain` is sent to the registry
+ And `state` is set to `DELETED`
+ """
+ # Create a domain with DS data
+ domain, _ = Domain.objects.get_or_create(name="dsdomain.gov", state=Domain.State.READY)
+ # set domain to be on hold
+ domain.place_client_hold()
+ domain.dnssecdata = extensions.DNSSECExtension(
+ dsData=[extensions.DSData(keyTag=1, alg=1, digestType=1, digest="1234567890")],
+ )
+ domain.save()
+
+ # Mock the InfoDomain command data to return a domain with no hosts
+ # This is needed to simulate the domain being able to be deleted
+ self.mockDataInfoDomain.hosts = []
+
+ # Delete the domain
+ domain.deletedInEpp()
+ domain.save()
+
+ # Check that dsdata is None
+ self.assertEqual(domain.dnssecdata, None)
+
+ # Check that the UpdateDomain command was sent to the registry with the correct extension
+ self.mockedSendFunction.assert_has_calls(
+ [
+ call(
+ commands.UpdateDomain(
+ name="dsdomain.gov", add=[], rem=[], nsset=None, keyset=None, registrant=None, auth_info=None
+ ),
+ cleaned=True,
+ ),
+ ]
+ )
+
+ # Check that the domain was deleted
+ self.assertEqual(domain.state, Domain.State.DELETED)
+
+ # reset to avoid test pollution
+ self.mockDataInfoDomain.hosts = ["fake.host.com"]
+
@less_console_noise_decorator
def test_deletion_ready_fsm_failure(self):
"""
diff --git a/src/registrar/tests/test_views_portfolio.py b/src/registrar/tests/test_views_portfolio.py
index 5ba556028..26fae8973 100644
--- a/src/registrar/tests/test_views_portfolio.py
+++ b/src/registrar/tests/test_views_portfolio.py
@@ -1675,7 +1675,8 @@ class TestPortfolioMemberDeleteView(WebTest):
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
- def test_portfolio_member_delete_view_members_table_active_requests(self, send_removal_emails):
+ @patch("registrar.views.portfolios.send_portfolio_member_permission_remove_email")
+ def test_portfolio_member_delete_view_members_table_active_requests(self, send_member_removal, send_removal_emails):
"""Error state w/ deleting a member with active request on Members Table"""
# I'm a user
UserPortfolioPermission.objects.get_or_create(
@@ -1715,12 +1716,15 @@ class TestPortfolioMemberDeleteView(WebTest):
# assert that send_portfolio_admin_removal_emails is not called
send_removal_emails.assert_not_called()
+ # assert that send_portfolio_member_permission_remove_email is not called
+ send_member_removal.assert_not_called()
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
- def test_portfolio_member_delete_view_members_table_only_admin(self, send_removal_emails):
+ @patch("registrar.views.portfolios.send_portfolio_member_permission_remove_email")
+ def test_portfolio_member_delete_view_members_table_only_admin(self, send_member_removal, send_removal_emails):
"""Error state w/ deleting a member that's the only admin on Members Table"""
# I'm a user with admin permission
@@ -1750,12 +1754,15 @@ class TestPortfolioMemberDeleteView(WebTest):
# assert that send_portfolio_admin_removal_emails is not called
send_removal_emails.assert_not_called()
+ # assert that send_portfolio_member_permission_remove_email is not called
+ send_member_removal.assert_not_called()
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
- def test_portfolio_member_table_delete_member_success(self, mock_send_removal_emails):
+ @patch("registrar.views.portfolios.send_portfolio_member_permission_remove_email")
+ def test_portfolio_member_table_delete_member_success(self, send_member_removal, mock_send_removal_emails):
"""Success state with deleting on Members Table page bc no active request AND not only admin"""
# I'm a user
@@ -1780,6 +1787,9 @@ class TestPortfolioMemberDeleteView(WebTest):
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
+ # Member removal email sent successfully
+ send_member_removal.return_value = True
+
# And set that the member has no active requests AND it's not the only admin
with patch.object(User, "get_active_requests_count_in_portfolio", return_value=0), patch.object(
User, "is_only_admin_of_portfolio", return_value=False
@@ -1802,12 +1812,23 @@ class TestPortfolioMemberDeleteView(WebTest):
# assert that send_portfolio_admin_removal_emails is not called
# because member being removed is not an admin
mock_send_removal_emails.assert_not_called()
+ # assert that send_portfolio_member_permission_remove_email is called
+ send_member_removal.assert_called_once()
+
+ # Get the arguments passed to send_portfolio_member_permission_remove_email
+ _, called_kwargs = send_member_removal.call_args
+
+ # Assert the email content
+ self.assertEqual(called_kwargs["requestor"], self.user)
+ self.assertEqual(called_kwargs["permissions"].user, upp.user)
+ self.assertEqual(called_kwargs["permissions"].portfolio, upp.portfolio)
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
- def test_portfolio_member_table_delete_admin_success(self, mock_send_removal_emails):
+ @patch("registrar.views.portfolios.send_portfolio_member_permission_remove_email")
+ def test_portfolio_member_table_delete_admin_success(self, send_member_removal, mock_send_removal_emails):
"""Success state with deleting on Members Table page bc no active request AND
not only admin. Because admin, removal emails are sent."""
@@ -1834,6 +1855,7 @@ class TestPortfolioMemberDeleteView(WebTest):
)
mock_send_removal_emails.return_value = True
+ send_member_removal.return_value = True
# And set that the member has no active requests AND it's not the only admin
with patch.object(User, "get_active_requests_count_in_portfolio", return_value=0), patch.object(
@@ -1856,6 +1878,8 @@ class TestPortfolioMemberDeleteView(WebTest):
# assert that send_portfolio_admin_removal_emails is called
mock_send_removal_emails.assert_called_once()
+ # assert that send_portfolio_member_permission_remove_email is called
+ send_member_removal.assert_called_once()
# Get the arguments passed to send_portfolio_admin_addition_emails
_, called_kwargs = mock_send_removal_emails.call_args
@@ -1865,13 +1889,25 @@ class TestPortfolioMemberDeleteView(WebTest):
self.assertEqual(called_kwargs["requestor"], self.user)
self.assertEqual(called_kwargs["portfolio"], self.portfolio)
+ # Get the arguments passed to send_portfolio_member_permission_remove_email
+ _, called_kwargs = send_member_removal.call_args
+
+ # Assert the email content
+ self.assertEqual(called_kwargs["requestor"], self.user)
+ self.assertEqual(called_kwargs["permissions"].user, upp.user)
+ self.assertEqual(called_kwargs["permissions"].portfolio, upp.portfolio)
+
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
- def test_portfolio_member_table_delete_admin_success_removal_email_fail(self, mock_send_removal_emails):
+ @patch("registrar.views.portfolios.send_portfolio_member_permission_remove_email")
+ def test_portfolio_member_table_delete_admin_success_removal_email_fail(
+ self, send_member_removal, mock_send_removal_emails
+ ):
"""Success state with deleting on Members Table page bc no active request AND
- not only admin. Because admin, removal emails are sent, but fail to send."""
+ not only admin. Because admin, removal emails are sent, but fail to send.
+ Email to removed member also fails to send."""
# I'm a user
UserPortfolioPermission.objects.get_or_create(
@@ -1896,6 +1932,7 @@ class TestPortfolioMemberDeleteView(WebTest):
)
mock_send_removal_emails.return_value = False
+ send_member_removal.return_value = False
# And set that the member has no active requests AND it's not the only admin
with patch.object(User, "get_active_requests_count_in_portfolio", return_value=0), patch.object(
@@ -1918,6 +1955,8 @@ class TestPortfolioMemberDeleteView(WebTest):
# assert that send_portfolio_admin_removal_emails is called
mock_send_removal_emails.assert_called_once()
+ # assert that send_portfolio_member_permission_remove_email is called
+ send_member_removal.assert_called_once()
# Get the arguments passed to send_portfolio_admin_addition_emails
_, called_kwargs = mock_send_removal_emails.call_args
@@ -1927,6 +1966,14 @@ class TestPortfolioMemberDeleteView(WebTest):
self.assertEqual(called_kwargs["requestor"], self.user)
self.assertEqual(called_kwargs["portfolio"], self.portfolio)
+ # Get the arguments passed to send_portfolio_member_permission_remove_email
+ _, called_kwargs = send_member_removal.call_args
+
+ # Assert the email content
+ self.assertEqual(called_kwargs["requestor"], self.user)
+ self.assertEqual(called_kwargs["permissions"].user, upp.user)
+ self.assertEqual(called_kwargs["permissions"].portfolio, upp.portfolio)
+
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@@ -2051,7 +2098,10 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
- def test_portfolio_member_delete_view_manage_members_page_invitedmember(self, mock_send_removal_emails):
+ @patch("registrar.views.portfolios.send_portfolio_invitation_remove_email")
+ def test_portfolio_member_delete_view_manage_members_page_invitedmember(
+ self, send_invited_member_removal, mock_send_removal_emails
+ ):
"""Success state w/ deleting invited member on Manage Members page should redirect back to Members Table"""
# I'm a user
@@ -2072,6 +2122,10 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
portfolio=self.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
+
+ # Invited member removal email sent successfully
+ send_invited_member_removal.return_value = True
+
with patch("django.contrib.messages.success") as mock_success:
self.client.force_login(self.user)
response = self.client.post(
@@ -2095,12 +2149,25 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
# assert send_portfolio_admin_removal_emails not called since invitation
# is for a basic member
mock_send_removal_emails.assert_not_called()
+ # assert that send_portfolio_invitation_remove_email is called
+ send_invited_member_removal.assert_called_once()
+
+ # Get the arguments passed to send_portfolio_invitation_removal_email
+ _, called_kwargs = send_invited_member_removal.call_args
+
+ # Assert the email content
+ self.assertEqual(called_kwargs["requestor"], self.user)
+ self.assertEqual(called_kwargs["invitation"].email, invitation.email)
+ self.assertEqual(called_kwargs["invitation"].portfolio, invitation.portfolio)
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
- def test_portfolio_member_delete_view_manage_members_page_invitedadmin(self, mock_send_removal_emails):
+ @patch("registrar.views.portfolios.send_portfolio_invitation_remove_email")
+ def test_portfolio_member_delete_view_manage_members_page_invitedadmin(
+ self, send_invited_member_email, mock_send_removal_emails
+ ):
"""Success state w/ deleting invited admin on Manage Members page should redirect back to Members Table"""
# I'm a user
@@ -2115,6 +2182,7 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
)
mock_send_removal_emails.return_value = True
+ send_invited_member_email.return_value = True
# Invite an admin under same portfolio
invited_member_email = "invited_member@example.com"
@@ -2146,6 +2214,8 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
# assert send_portfolio_admin_removal_emails is called since invitation
# is for an admin
mock_send_removal_emails.assert_called_once()
+ # assert that send_portfolio_invitation_remove_email is called
+ send_invited_member_email.assert_called_once()
# Get the arguments passed to send_portfolio_admin_addition_emails
_, called_kwargs = mock_send_removal_emails.call_args
@@ -2155,11 +2225,22 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
self.assertEqual(called_kwargs["requestor"], self.user)
self.assertEqual(called_kwargs["portfolio"], self.portfolio)
+ # Get the arguments passed to send_portfolio_invitation_remove_email
+ _, called_kwargs = send_invited_member_email.call_args
+
+ # Assert the email content
+ self.assertEqual(called_kwargs["requestor"], self.user)
+ self.assertEqual(called_kwargs["invitation"].email, invitation.email)
+ self.assertEqual(called_kwargs["invitation"].portfolio, invitation.portfolio)
+
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
- def test_portfolio_member_delete_view_manage_members_page_invitedadmin_email_fails(self, mock_send_removal_emails):
+ @patch("registrar.views.portfolios.send_portfolio_invitation_remove_email")
+ def test_portfolio_member_delete_view_manage_members_page_invitedadmin_email_fails(
+ self, send_invited_member_email, mock_send_removal_emails
+ ):
"""Success state w/ deleting invited admin on Manage Members page should redirect back to Members Table"""
# I'm a user
@@ -2174,6 +2255,7 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
)
mock_send_removal_emails.return_value = False
+ send_invited_member_email.return_value = False
# Invite an admin under same portfolio
invited_member_email = "invited_member@example.com"
@@ -2205,6 +2287,8 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
# assert send_portfolio_admin_removal_emails is called since invitation
# is for an admin
mock_send_removal_emails.assert_called_once()
+ # assert that send_portfolio_invitation_remove_email is called
+ send_invited_member_email.assert_called_once()
# Get the arguments passed to send_portfolio_admin_addition_emails
_, called_kwargs = mock_send_removal_emails.call_args
@@ -2214,6 +2298,14 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
self.assertEqual(called_kwargs["requestor"], self.user)
self.assertEqual(called_kwargs["portfolio"], self.portfolio)
+ # Get the arguments passed to send_portfolio_invitation_remove_email
+ _, called_kwargs = send_invited_member_email.call_args
+
+ # Assert the email content
+ self.assertEqual(called_kwargs["requestor"], self.user)
+ self.assertEqual(called_kwargs["invitation"].email, invitation.email)
+ self.assertEqual(called_kwargs["invitation"].portfolio, invitation.portfolio)
+
class TestPortfolioMemberDomainsView(TestWithUser, WebTest):
@classmethod
diff --git a/src/registrar/utility/email_invitations.py b/src/registrar/utility/email_invitations.py
index 7ddab65f1..08ebb4d86 100644
--- a/src/registrar/utility/email_invitations.py
+++ b/src/registrar/utility/email_invitations.py
@@ -2,6 +2,7 @@ from datetime import date
from django.conf import settings
from registrar.models import Domain, DomainInvitation, UserDomainRole
from registrar.models.portfolio import Portfolio
+from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
from registrar.utility.errors import (
@@ -269,6 +270,88 @@ def send_portfolio_member_permission_update_email(requestor, permissions: UserPo
return True
+def send_portfolio_member_permission_remove_email(requestor, permissions: UserPortfolioPermission):
+ """
+ Sends an email notification to a portfolio member when their permissions are deleted.
+
+ This function retrieves the requestor's email and sends a templated email to the affected user,
+ notifying them of the removal of their portfolio permissions.
+
+ Args:
+ requestor (User): The user initiating the permission update.
+ permissions (UserPortfolioPermission): The updated permissions object containing the affected user
+ and the portfolio details.
+
+ Returns:
+ bool: True if the email was sent successfully, False if an EmailSendingError occurred.
+
+ Raises:
+ MissingEmailError: If the requestor has no email associated with their account.
+ """
+ requestor_email = _get_requestor_email(requestor, portfolio=permissions.portfolio)
+ try:
+ send_templated_email(
+ "emails/portfolio_removal.txt",
+ "emails/portfolio_removal_subject.txt",
+ to_address=permissions.user.email,
+ context={
+ "requested_user": permissions.user,
+ "portfolio": permissions.portfolio,
+ "requestor_email": requestor_email,
+ },
+ )
+ except EmailSendingError:
+ logger.warning(
+ "Could not send email organization member removal notification to %s " "for portfolio: %s",
+ permissions.user.email,
+ permissions.portfolio.organization_name,
+ exc_info=True,
+ )
+ return False
+ return True
+
+
+def send_portfolio_invitation_remove_email(requestor, invitation: PortfolioInvitation):
+ """
+ Sends an email notification to a portfolio invited member when their permissions are deleted.
+
+ This function retrieves the requestor's email and sends a templated email to the affected email,
+ notifying them of the removal of their invited portfolio permissions.
+
+ Args:
+ requestor (User): The user initiating the permission update.
+ invitation (PortfolioInvitation): The invitation object containing the affected user
+ and the portfolio details.
+
+ Returns:
+ bool: True if the email was sent successfully, False if an EmailSendingError occurred.
+
+ Raises:
+ MissingEmailError: If the requestor has no email associated with their account.
+ """
+ requestor_email = _get_requestor_email(requestor, portfolio=invitation.portfolio)
+ try:
+ send_templated_email(
+ "emails/portfolio_removal.txt",
+ "emails/portfolio_removal_subject.txt",
+ to_address=invitation.email,
+ context={
+ "requested_user": None,
+ "portfolio": invitation.portfolio,
+ "requestor_email": requestor_email,
+ },
+ )
+ except EmailSendingError:
+ logger.warning(
+ "Could not send email organization member removal notification to %s " "for portfolio: %s",
+ invitation.email,
+ invitation.portfolio.organization_name,
+ exc_info=True,
+ )
+ return False
+ return True
+
+
def send_portfolio_admin_addition_emails(email: str, requestor, portfolio: Portfolio):
"""
Notifies all portfolio admins of the provided portfolio of a newly invited portfolio admin
diff --git a/src/registrar/views/portfolios.py b/src/registrar/views/portfolios.py
index 6bace8b7e..26430258f 100644
--- a/src/registrar/views/portfolios.py
+++ b/src/registrar/views/portfolios.py
@@ -32,6 +32,8 @@ from registrar.utility.email_invitations import (
send_portfolio_admin_addition_emails,
send_portfolio_admin_removal_emails,
send_portfolio_invitation_email,
+ send_portfolio_invitation_remove_email,
+ send_portfolio_member_permission_remove_email,
send_portfolio_member_permission_update_email,
)
from registrar.utility.errors import MissingEmailError
@@ -123,60 +125,84 @@ class PortfolioMemberDeleteView(View):
"""
portfolio_member_permission = get_object_or_404(UserPortfolioPermission, pk=pk)
member = portfolio_member_permission.user
+ portfolio = portfolio_member_permission.portfolio
+ # Validate if the member can be removed
+ error_message = self._validate_member_removal(request, member, portfolio)
+ if error_message:
+ return self._handle_error_response(request, error_message, pk)
+
+ # Attempt to send notification emails
+ self._send_removal_notifications(request, portfolio_member_permission)
+
+ # Passed all error conditions, proceed with deletion
+ portfolio_member_permission.delete()
+
+ # Return success response
+ return self._handle_success_response(request, member.email)
+
+ def _validate_member_removal(self, request, member, portfolio):
+ """
+ Check whether the member can be removed from the portfolio.
+ Returns an error message if removal is not allowed; otherwise, returns None.
+ """
active_requests_count = member.get_active_requests_count_in_portfolio(request)
-
support_url = "https://get.gov/contact/"
- error_message = ""
-
if active_requests_count > 0:
- # If they have any in progress requests
- error_message = mark_safe( # nosec
+ return mark_safe( # nosec
"This member can't be removed from the organization because they have an active domain request. "
f"Please contact us to remove this member."
)
- elif member.is_only_admin_of_portfolio(portfolio_member_permission.portfolio):
- # If they are the last manager of a domain
- error_message = (
+ if member.is_only_admin_of_portfolio(portfolio):
+ return (
"There must be at least one admin in your organization. Give another member admin "
"permissions, make sure they log into the registrar, and then remove this member."
)
+ return None
- # From the Members Table page Else the Member Page
- if error_message:
- if request.headers.get("X-Requested-With") == "XMLHttpRequest":
- return JsonResponse(
- {"error": error_message},
- status=400,
- )
- else:
- messages.error(request, error_message)
- return redirect(reverse("member", kwargs={"pk": pk}))
+ def _handle_error_response(self, request, error_message, pk):
+ """
+ Return an error response (JSON or redirect with messages).
+ """
+ if request.headers.get("X-Requested-With") == "XMLHttpRequest":
+ return JsonResponse({"error": error_message}, status=400)
+ messages.error(request, error_message)
+ return redirect(reverse("member", kwargs={"pk": pk}))
- # if member being removed is an admin
- if UserPortfolioRoleChoices.ORGANIZATION_ADMIN in portfolio_member_permission.roles:
- try:
- # attempt to send notification emails of the removal to other portfolio admins
+ def _send_removal_notifications(self, request, portfolio_member_permission):
+ """
+ Attempt to send notification emails about the member's removal.
+ """
+ try:
+ # Notify other portfolio admins if removing an admin
+ if UserPortfolioRoleChoices.ORGANIZATION_ADMIN in portfolio_member_permission.roles:
if not send_portfolio_admin_removal_emails(
email=portfolio_member_permission.user.email,
requestor=request.user,
portfolio=portfolio_member_permission.portfolio,
):
- messages.warning(self.request, "Could not send email notification to existing organization admins.")
- except Exception as e:
- self._handle_exceptions(e)
+ messages.warning(request, "Could not send email notification to existing organization admins.")
- # passed all error conditions
- portfolio_member_permission.delete()
+ # Notify the member being removed
+ if not send_portfolio_member_permission_remove_email(
+ requestor=request.user, permissions=portfolio_member_permission
+ ):
+ messages.warning(
+ request, f"Could not send email notification to {portfolio_member_permission.user.email}"
+ )
+ except Exception as e:
+ self._handle_exceptions(e)
- # From the Members Table page Else the Member Page
- success_message = f"You've removed {member.email} from the organization."
+ def _handle_success_response(self, request, member_email):
+ """
+ Return a success response (JSON or redirect with messages).
+ """
+ success_message = f"You've removed {member_email} from the organization."
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
return JsonResponse({"success": success_message}, status=200)
- else:
- messages.success(request, success_message)
- return redirect(reverse("members"))
+ messages.success(request, success_message)
+ return redirect(reverse("members"))
def _handle_exceptions(self, exception):
"""Handle exceptions raised during the process."""
@@ -458,16 +484,18 @@ class PortfolioInvitedMemberDeleteView(View):
"""
portfolio_invitation = get_object_or_404(PortfolioInvitation, pk=pk)
- # if invitation being removed is an admin
- if UserPortfolioRoleChoices.ORGANIZATION_ADMIN in portfolio_invitation.roles:
- try:
+ try:
+ # if invitation being removed is an admin
+ if UserPortfolioRoleChoices.ORGANIZATION_ADMIN in portfolio_invitation.roles:
# attempt to send notification emails of the removal to portfolio admins
if not send_portfolio_admin_removal_emails(
email=portfolio_invitation.email, requestor=request.user, portfolio=portfolio_invitation.portfolio
):
messages.warning(self.request, "Could not send email notification to existing organization admins.")
- except Exception as e:
- self._handle_exceptions(e)
+ if not send_portfolio_invitation_remove_email(requestor=request.user, invitation=portfolio_invitation):
+ messages.warning(request, f"Could not send email notification to {portfolio_invitation.email}")
+ except Exception as e:
+ self._handle_exceptions(e)
portfolio_invitation.delete()
diff --git a/src/registrar/views/user_profile.py b/src/registrar/views/user_profile.py
index 3434eedb3..fd9d36dd1 100644
--- a/src/registrar/views/user_profile.py
+++ b/src/registrar/views/user_profile.py
@@ -1,5 +1,4 @@
-"""Views for a User Profile.
-"""
+"""Views for a User Profile."""
import logging