Merge branch 'main' into rjm/2906-adr-ajax

This commit is contained in:
Rachid Mrad 2025-02-21 15:16:58 -05:00 committed by GitHub
commit fdd4a3e2c0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 762 additions and 211 deletions

View file

@ -1327,6 +1327,7 @@ class UserPortfolioPermissionAdmin(ListHeaderAdmin):
search_help_text = "Search by first name, last name, email, or portfolio."
change_form_template = "django/admin/user_portfolio_permission_change_form.html"
delete_confirmation_template = "django/admin/user_portfolio_permission_delete_confirmation.html"
def get_roles(self, obj):
readable_roles = obj.get_readable_roles()
@ -1670,6 +1671,7 @@ class PortfolioInvitationAdmin(BaseInvitationAdmin):
autocomplete_fields = ["portfolio"]
change_form_template = "django/admin/portfolio_invitation_change_form.html"
delete_confirmation_template = "django/admin/portfolio_invitation_delete_confirmation.html"
# Select portfolio invitations to change -> Portfolio invitations
def changelist_view(self, request, extra_context=None):
@ -3738,11 +3740,13 @@ class DomainAdmin(ListHeaderAdmin, ImportExportModelAdmin):
# Using variables to get past the linter
message1 = f"Cannot delete Domain when in state {obj.state}"
message2 = f"This subdomain is being used as a hostname on another domain: {err.note}"
message3 = f"Command failed with note: {err.note}"
# Human-readable mappings of ErrorCodes. Can be expanded.
error_messages = {
# noqa on these items as black wants to reformat to an invalid length
ErrorCode.OBJECT_STATUS_PROHIBITS_OPERATION: message1,
ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION: message2,
ErrorCode.COMMAND_FAILED: message3,
}
message = "Cannot connect to the registry"

View file

@ -1,4 +1,4 @@
""""
""" "
Converts all ready and DNS needed domains with a non-default public contact
to disclose their public contact. Created for Issue#1535 to resolve
disclose issue of domains with missing security emails.

View file

@ -1,8 +1,8 @@
"""Data migration:
1 - generates a report of data integrity across all
transition domain related tables
2 - allows users to run all migration scripts for
transition domain data
1 - generates a report of data integrity across all
transition domain related tables
2 - allows users to run all migration scripts for
transition domain data
"""
import logging

View file

@ -1,4 +1,4 @@
""""
""" "
Data migration: Renaming deprecated Federal Agencies to
their new updated names ie (U.S. Peace Corps to Peace Corps)
within Domain Information and Domain Requests

View file

@ -2,6 +2,7 @@ from itertools import zip_longest
import logging
import ipaddress
import re
import time
from datetime import date, timedelta
from typing import Optional
from django.db import transaction
@ -750,11 +751,7 @@ class Domain(TimeStampedModel, DomainHelper):
successTotalNameservers = len(oldNameservers) - deleteCount + addToDomainCount
try:
self._delete_hosts_if_not_used(hostsToDelete=deleted_values)
except Exception as e:
# we don't need this part to succeed in order to continue.
logger.error("Failed to delete nameserver hosts: %s", e)
self._delete_hosts_if_not_used(hostsToDelete=deleted_values)
if successTotalNameservers < 2:
try:
@ -1038,13 +1035,13 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error(f"registry error removing client hold: {err}")
raise (err)
def _delete_domain(self):
def _delete_domain(self): # noqa
"""This domain should be deleted from the registry
may raises RegistryError, should be caught or handled correctly by caller"""
logger.info("Deleting subdomains for %s", self.name)
# check if any subdomains are in use by another domain
hosts = Host.objects.filter(name__regex=r".+{}".format(self.name))
hosts = Host.objects.filter(name__regex=r".+\.{}".format(self.name))
for host in hosts:
if host.domain != self:
logger.error("Unable to delete host: %s is in use by another domain: %s", host.name, host.domain)
@ -1052,38 +1049,119 @@ class Domain(TimeStampedModel, DomainHelper):
code=ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION,
note=f"Host {host.name} is in use by {host.domain}",
)
try:
# set hosts to empty list so nameservers are deleted
(
deleted_values,
updated_values,
new_values,
oldNameservers,
) = self.getNameserverChanges(hosts=[])
(
deleted_values,
updated_values,
new_values,
oldNameservers,
) = self.getNameserverChanges(hosts=[])
_ = self._update_host_values(updated_values, oldNameservers) # returns nothing, just need to be run and errors
addToDomainList, _ = self.createNewHostList(new_values)
deleteHostList, _ = self.createDeleteHostList(deleted_values)
responseCode = self.addAndRemoveHostsFromDomain(hostsToAdd=addToDomainList, hostsToDelete=deleteHostList)
# update the hosts
_ = self._update_host_values(
updated_values, oldNameservers
) # returns nothing, just need to be run and errors
addToDomainList, _ = self.createNewHostList(new_values)
deleteHostList, _ = self.createDeleteHostList(deleted_values)
responseCode = self.addAndRemoveHostsFromDomain(hostsToAdd=addToDomainList, hostsToDelete=deleteHostList)
except RegistryError as e:
logger.error(f"Error trying to delete hosts from domain {self}: {e}")
raise e
# if unable to update domain raise error and stop
if responseCode != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
raise NameserverError(code=nsErrorCodes.BAD_DATA)
logger.info("Finished removing nameservers from domain")
# addAndRemoveHostsFromDomain removes the hosts from the domain object,
# but we still need to delete the object themselves
self._delete_hosts_if_not_used(hostsToDelete=deleted_values)
logger.info("Finished _delete_hosts_if_not_used inside _delete_domain()")
# delete the non-registrant contacts
logger.debug("Deleting non-registrant contacts for %s", self.name)
contacts = PublicContact.objects.filter(domain=self)
for contact in contacts:
if contact.contact_type != PublicContact.ContactTypeChoices.REGISTRANT:
self._update_domain_with_contact(contact, rem=True)
request = commands.DeleteContact(contact.registry_id)
registry.send(request, cleaned=True)
logger.info(f"retrieved contacts for domain: {contacts}")
logger.info("Deleting domain %s", self.name)
for contact in contacts:
try:
if contact.contact_type != PublicContact.ContactTypeChoices.REGISTRANT:
logger.info(f"Deleting contact: {contact}")
try:
self._update_domain_with_contact(contact, rem=True)
except Exception as e:
logger.error(f"Error while updating domain with contact: {contact}, e: {e}", exc_info=True)
request = commands.DeleteContact(contact.registry_id)
registry.send(request, cleaned=True)
logger.info(f"sent DeleteContact for {contact}")
except RegistryError as e:
logger.error(f"Error deleting contact: {contact}, {e}", exc_info=True)
logger.info(f"Finished deleting contacts for {self.name}")
# delete ds data if it exists
if self.dnssecdata:
logger.debug("Deleting ds data for %s", self.name)
try:
# set and unset client hold to be able to change ds data
logger.info("removing client hold")
self._remove_client_hold()
self.dnssecdata = None
logger.info("placing client hold")
self._place_client_hold()
except RegistryError as e:
logger.error("Error deleting ds data for %s: %s", self.name, e)
e.note = "Error deleting ds data for %s" % self.name
raise e
# check if the domain can be deleted
if not self._domain_can_be_deleted():
note = "Domain has associated objects that prevent deletion."
raise RegistryError(code=ErrorCode.COMMAND_FAILED, note=note)
# delete the domain
request = commands.DeleteDomain(name=self.name)
registry.send(request, cleaned=True)
try:
registry.send(request, cleaned=True)
logger.info("Domain %s deleted successfully.", self.name)
except RegistryError as e:
logger.error("Error deleting domain %s: %s", self.name, e)
raise e
def _domain_can_be_deleted(self, max_attempts=5, wait_interval=2) -> bool:
"""
Polls the registry using InfoDomain calls to confirm that the domain can be deleted.
Returns True if the domain can be deleted, False otherwise. Includes a retry mechanism
using wait_interval and max_attempts, which may be necessary if subdomains and other
associated objects were only recently deleted as the registry may not be immediately updated.
"""
logger.info("Polling registry to confirm deletion pre-conditions for %s", self.name)
last_info_error = None
for attempt in range(max_attempts):
try:
info_response = registry.send(commands.InfoDomain(name=self.name), cleaned=True)
domain_info = info_response.res_data[0]
hosts_associated = getattr(domain_info, "hosts", None)
if hosts_associated is None or len(hosts_associated) == 0:
logger.info("InfoDomain reports no associated hosts for %s. Proceeding with deletion.", self.name)
return True
else:
logger.info("Attempt %d: Domain %s still has hosts: %s", attempt + 1, self.name, hosts_associated)
except RegistryError as info_e:
# If the domain is already gone, we can assume deletion already occurred.
if info_e.code == ErrorCode.OBJECT_DOES_NOT_EXIST:
logger.info("InfoDomain check indicates domain %s no longer exists.", self.name)
raise info_e
logger.warning("Attempt %d: Error during InfoDomain check: %s", attempt + 1, info_e)
time.sleep(wait_interval)
else:
logger.error(
"Exceeded max attempts waiting for domain %s to clear associated objects; last error: %s",
self.name,
last_info_error,
)
return False
def __str__(self) -> str:
return self.name
@ -1840,8 +1918,6 @@ class Domain(TimeStampedModel, DomainHelper):
else:
logger.error("Error _delete_hosts_if_not_used, code was %s error was %s" % (e.code, e))
raise e
def _fix_unknown_state(self, cleaned):
"""
_fix_unknown_state: Calls _add_missing_contacts_if_unknown

View file

@ -0,0 +1,17 @@
{% extends "admin/delete_confirmation.html" %}
{% block content_subtitle %}
<div class="usa-alert usa-alert--info usa-alert--slim">
<div class="usa-alert__body margin-left-1 maxw-none">
<p class="usa-alert__text maxw-none">
If you cancel the portfolio invitation here, it won't trigger any emails. It also won't remove the user's
portfolio access if they already logged in. Go to the
<a href="{% url 'admin:registrar_userportfoliopermission_changelist' %}">
User Portfolio Permissions
</a>
table if you want to remove the user from a portfolio.
</p>
</div>
</div>
{{ block.super }}
{% endblock %}

View file

@ -0,0 +1,12 @@
{% extends "admin/delete_confirmation.html" %}
{% block content_subtitle %}
<div class="usa-alert usa-alert--info usa-alert--slim">
<div class="usa-alert__body margin-left-1 maxw-none">
<p class="usa-alert__text maxw-none">
If you remove someone from a portfolio here, it will not send any emails when you click "Save".
</p>
</div>
</div>
{{ block.super }}
{% endblock %}

View file

@ -0,0 +1,21 @@
{% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #}
Hi,{% if requested_user and requested_user.first_name %} {{ requested_user.first_name }}.{% endif %}
{{ requestor_email }} has removed you from {{ portfolio.organization_name }}.
You can no longer view this organization or its related domains within the .gov registrar.
SOMETHING WRONG?
If you have questions or concerns, reach out to the person who removed you from the
organization, or reply to this email.
----------------------------------------------------------------
The .gov team
Contact us: <https://get.gov/contact/>
Learn about .gov <https://get.gov>
The .gov registry is a part of the Cybersecurity and Infrastructure Security Agency (CISA) <https://cisa.gov/>
{% endautoescape %}

View file

@ -0,0 +1 @@
You've been removed from a .gov organization

View file

@ -55,6 +55,7 @@ from .common import (
MockDbForSharedTests,
AuditedAdminMockData,
completed_domain_request,
create_test_user,
generic_domain_object,
less_console_noise,
mock_user,
@ -1135,6 +1136,7 @@ class TestUserPortfolioPermissionAdmin(TestCase):
"""Create a client object"""
self.client = Client(HTTP_HOST="localhost:8080")
self.superuser = create_superuser()
self.testuser = create_test_user()
self.portfolio = Portfolio.objects.create(organization_name="Test Portfolio", creator=self.superuser)
def tearDown(self):
@ -1167,6 +1169,21 @@ class TestUserPortfolioPermissionAdmin(TestCase):
"If you add someone to a portfolio here, it will not trigger an invitation email.",
)
@less_console_noise_decorator
def test_delete_confirmation_page_contains_static_message(self):
"""Ensure the custom message appears in the delete confirmation page."""
self.client.force_login(self.superuser)
# Create a test portfolio permission
self.permission = UserPortfolioPermission.objects.create(
user=self.testuser, portfolio=self.portfolio, roles=["organization_member"]
)
delete_url = reverse("admin:registrar_userportfoliopermission_delete", args=[self.permission.pk])
response = self.client.get(delete_url)
# Check if the response contains the expected static message
expected_message = "If you remove someone from a portfolio here, it will not send any emails"
self.assertIn(expected_message, response.content.decode("utf-8"))
class TestPortfolioInvitationAdmin(TestCase):
"""Tests for the PortfolioInvitationAdmin class as super user
@ -1605,6 +1622,21 @@ class TestPortfolioInvitationAdmin(TestCase):
request, "Could not send email notification to existing organization admins."
)
@less_console_noise_decorator
def test_delete_confirmation_page_contains_static_message(self):
"""Ensure the custom message appears in the delete confirmation page."""
self.client.force_login(self.superuser)
# Create a test portfolio invitation
self.invitation = PortfolioInvitation.objects.create(
email="testuser@example.com", portfolio=self.portfolio, roles=["organization_member"]
)
delete_url = reverse("admin:registrar_portfolioinvitation_delete", args=[self.invitation.pk])
response = self.client.get(delete_url)
# Check if the response contains the expected static message
expected_message = "If you cancel the portfolio invitation here"
self.assertIn(expected_message, response.content.decode("utf-8"))
class TestHostAdmin(TestCase):
"""Tests for the HostAdmin class as super user
@ -3816,7 +3848,7 @@ class TestTransferUser(WebTest):
with self.assertRaises(User.DoesNotExist):
self.user2.refresh_from_db()
# @less_console_noise_decorator
@less_console_noise_decorator
def test_transfer_user_throws_transfer_and_delete_success_messages(self):
"""Test that success messages for data transfer and user deletion are displayed."""
# Ensure the setup for VerifiedByStaff

View file

@ -178,7 +178,7 @@ class TestDomainAdminAsStaff(MockEppLib):
Then a user-friendly success message is returned for displaying on the web
And `state` is set to `DELETED`
"""
domain = create_ready_domain()
domain, _ = Domain.objects.get_or_create(name="my-nameserver.gov", state=Domain.State.READY)
# Put in client hold
domain.place_client_hold()
# Ensure everything is displaying correctly
@ -212,7 +212,7 @@ class TestDomainAdminAsStaff(MockEppLib):
mock_add_message.assert_called_once_with(
request,
messages.INFO,
"Domain city.gov has been deleted. Thanks!",
"Domain my-nameserver.gov has been deleted. Thanks!",
extra_tags="",
fail_silently=False,
)
@ -266,7 +266,7 @@ class TestDomainAdminAsStaff(MockEppLib):
mock_add_message.assert_called_once_with(
request,
messages.ERROR,
"Error deleting this Domain: This subdomain is being used as a hostname on another domain: ns1.sharedhost.com", # noqa
"Error deleting this Domain: Command failed with note: Domain has associated objects that prevent deletion.", # noqa
extra_tags="",
fail_silently=False,
)
@ -321,7 +321,7 @@ class TestDomainAdminAsStaff(MockEppLib):
Then `commands.DeleteDomain` is sent to the registry
And Domain returns normally without an error dialog
"""
domain = create_ready_domain()
domain, _ = Domain.objects.get_or_create(name="my-nameserver.gov", state=Domain.State.READY)
# Put in client hold
domain.place_client_hold()
# Ensure everything is displaying correctly
@ -340,12 +340,13 @@ class TestDomainAdminAsStaff(MockEppLib):
)
request.user = self.client
# Delete it once
with patch("django.contrib.messages.add_message") as mock_add_message:
self.admin.do_delete_domain(request, domain)
mock_add_message.assert_called_once_with(
request,
messages.INFO,
"Domain city.gov has been deleted. Thanks!",
"Domain my-nameserver.gov has been deleted. Thanks!",
extra_tags="",
fail_silently=False,
)

View file

@ -3,6 +3,7 @@ from unittest.mock import patch, MagicMock
from datetime import date
from registrar.models.domain import Domain
from registrar.models.portfolio import Portfolio
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user import User
from registrar.models.user_domain_role import UserDomainRole
from registrar.models.user_portfolio_permission import UserPortfolioPermission
@ -16,6 +17,8 @@ from registrar.utility.email_invitations import (
send_portfolio_admin_addition_emails,
send_portfolio_admin_removal_emails,
send_portfolio_invitation_email,
send_portfolio_invitation_remove_email,
send_portfolio_member_permission_remove_email,
send_portfolio_member_permission_update_email,
)
@ -963,3 +966,149 @@ class TestSendPortfolioMemberPermissionUpdateEmail(unittest.TestCase):
# Assertions
mock_logger.warning.assert_not_called() # Function should fail before logging email failure
class TestSendPortfolioMemberPermissionRemoveEmail(unittest.TestCase):
"""Unit tests for send_portfolio_member_permission_remove_email function."""
@patch("registrar.utility.email_invitations.send_templated_email")
@patch("registrar.utility.email_invitations._get_requestor_email")
def test_send_email_success(self, mock_get_requestor_email, mock_send_email):
"""Test that the email is sent successfully when there are no errors."""
# Mock data
requestor = MagicMock()
permissions = MagicMock(spec=UserPortfolioPermission)
permissions.user.email = "user@example.com"
permissions.portfolio.organization_name = "Test Portfolio"
mock_get_requestor_email.return_value = "requestor@example.com"
# Call function
result = send_portfolio_member_permission_remove_email(requestor, permissions)
# Assertions
mock_get_requestor_email.assert_called_once_with(requestor, portfolio=permissions.portfolio)
mock_send_email.assert_called_once_with(
"emails/portfolio_removal.txt",
"emails/portfolio_removal_subject.txt",
to_address="user@example.com",
context={
"requested_user": permissions.user,
"portfolio": permissions.portfolio,
"requestor_email": "requestor@example.com",
},
)
self.assertTrue(result)
@patch("registrar.utility.email_invitations.send_templated_email", side_effect=EmailSendingError("Email failed"))
@patch("registrar.utility.email_invitations._get_requestor_email")
@patch("registrar.utility.email_invitations.logger")
def test_send_email_failure(self, mock_logger, mock_get_requestor_email, mock_send_email):
"""Test that the function returns False and logs an error when email sending fails."""
# Mock data
requestor = MagicMock()
permissions = MagicMock(spec=UserPortfolioPermission)
permissions.user.email = "user@example.com"
permissions.portfolio.organization_name = "Test Portfolio"
mock_get_requestor_email.return_value = "requestor@example.com"
# Call function
result = send_portfolio_member_permission_remove_email(requestor, permissions)
# Assertions
mock_logger.warning.assert_called_once_with(
"Could not send email organization member removal notification to %s for portfolio: %s",
permissions.user.email,
permissions.portfolio.organization_name,
exc_info=True,
)
self.assertFalse(result)
@patch("registrar.utility.email_invitations._get_requestor_email", side_effect=Exception("Unexpected error"))
@patch("registrar.utility.email_invitations.logger")
def test_requestor_email_retrieval_failure(self, mock_logger, mock_get_requestor_email):
"""Test that an exception in retrieving requestor email is logged."""
# Mock data
requestor = MagicMock()
permissions = MagicMock(spec=UserPortfolioPermission)
# Call function
with self.assertRaises(Exception):
send_portfolio_member_permission_remove_email(requestor, permissions)
# Assertions
mock_logger.warning.assert_not_called() # Function should fail before logging email failure
class TestSendPortfolioInvitationRemoveEmail(unittest.TestCase):
"""Unit tests for send_portfolio_invitation_remove_email function."""
@patch("registrar.utility.email_invitations.send_templated_email")
@patch("registrar.utility.email_invitations._get_requestor_email")
def test_send_email_success(self, mock_get_requestor_email, mock_send_email):
"""Test that the email is sent successfully when there are no errors."""
# Mock data
requestor = MagicMock()
invitation = MagicMock(spec=PortfolioInvitation)
invitation.email = "user@example.com"
invitation.portfolio.organization_name = "Test Portfolio"
mock_get_requestor_email.return_value = "requestor@example.com"
# Call function
result = send_portfolio_invitation_remove_email(requestor, invitation)
# Assertions
mock_get_requestor_email.assert_called_once_with(requestor, portfolio=invitation.portfolio)
mock_send_email.assert_called_once_with(
"emails/portfolio_removal.txt",
"emails/portfolio_removal_subject.txt",
to_address="user@example.com",
context={
"requested_user": None,
"portfolio": invitation.portfolio,
"requestor_email": "requestor@example.com",
},
)
self.assertTrue(result)
@patch("registrar.utility.email_invitations.send_templated_email", side_effect=EmailSendingError("Email failed"))
@patch("registrar.utility.email_invitations._get_requestor_email")
@patch("registrar.utility.email_invitations.logger")
def test_send_email_failure(self, mock_logger, mock_get_requestor_email, mock_send_email):
"""Test that the function returns False and logs an error when email sending fails."""
# Mock data
requestor = MagicMock()
invitation = MagicMock(spec=PortfolioInvitation)
invitation.email = "user@example.com"
invitation.portfolio.organization_name = "Test Portfolio"
mock_get_requestor_email.return_value = "requestor@example.com"
# Call function
result = send_portfolio_invitation_remove_email(requestor, invitation)
# Assertions
mock_logger.warning.assert_called_once_with(
"Could not send email organization member removal notification to %s for portfolio: %s",
invitation.email,
invitation.portfolio.organization_name,
exc_info=True,
)
self.assertFalse(result)
@patch("registrar.utility.email_invitations._get_requestor_email", side_effect=Exception("Unexpected error"))
@patch("registrar.utility.email_invitations.logger")
def test_requestor_email_retrieval_failure(self, mock_logger, mock_get_requestor_email):
"""Test that an exception in retrieving requestor email is logged."""
# Mock data
requestor = MagicMock()
invitation = MagicMock(spec=PortfolioInvitation)
# Call function
with self.assertRaises(Exception):
send_portfolio_invitation_remove_email(requestor, invitation)
# Assertions
mock_logger.warning.assert_not_called() # Function should fail before logging email failure

View file

@ -35,6 +35,7 @@ from epplibwrapper import (
from .common import MockEppLib, MockSESClient, less_console_noise
import logging
import boto3_mocking # type: ignore
import copy
logger = logging.getLogger(__name__)
@ -97,58 +98,59 @@ class TestDomainCache(MockEppLib):
self.mockedSendFunction.assert_has_calls(expectedCalls)
# @less_console_noise_decorator
def test_cache_nested_elements_not_subdomain(self):
"""Cache works correctly with the nested objects cache and hosts"""
with less_console_noise():
domain, _ = Domain.objects.get_or_create(name="igorville.gov")
# The contact list will initially contain objects of type 'DomainContact'
# this is then transformed into PublicContact, and cache should NOT
# hold onto the DomainContact object
expectedUnfurledContactsList = [
common.DomainContact(contact="123", type="security"),
]
expectedContactsDict = {
PublicContact.ContactTypeChoices.ADMINISTRATIVE: "adminContact",
PublicContact.ContactTypeChoices.SECURITY: "securityContact",
PublicContact.ContactTypeChoices.TECHNICAL: "technicalContact",
}
expectedHostsDict = {
"name": self.mockDataInfoDomain.hosts[0],
"addrs": [], # should return empty bc fake.host.com is not a subdomain of igorville.gov
"cr_date": self.mockDataInfoHosts.cr_date,
}
# this can be changed when the getter for contacts is implemented
domain._get_property("contacts")
domain, _ = Domain.objects.get_or_create(name="igorville.gov")
# The contact list will initially contain objects of type 'DomainContact'
# this is then transformed into PublicContact, and cache should NOT
# hold onto the DomainContact object
expectedUnfurledContactsList = [
common.DomainContact(contact="123", type="security"),
]
expectedContactsDict = {
PublicContact.ContactTypeChoices.ADMINISTRATIVE: "adminContact",
PublicContact.ContactTypeChoices.SECURITY: "securityContact",
PublicContact.ContactTypeChoices.TECHNICAL: "technicalContact",
}
expectedHostsDict = {
"name": self.mockDataInfoDomain.hosts[0],
"addrs": [], # should return empty bc fake.host.com is not a subdomain of igorville.gov
"cr_date": self.mockDataInfoHosts.cr_date,
}
# check domain info is still correct and not overridden
self.assertEqual(domain._cache["auth_info"], self.mockDataInfoDomain.auth_info)
self.assertEqual(domain._cache["cr_date"], self.mockDataInfoDomain.cr_date)
# this can be changed when the getter for contacts is implemented
domain._get_property("contacts")
# check contacts
self.assertEqual(domain._cache["_contacts"], self.mockDataInfoDomain.contacts)
# The contact list should not contain what is sent by the registry by default,
# as _fetch_cache will transform the type to PublicContact
self.assertNotEqual(domain._cache["contacts"], expectedUnfurledContactsList)
# check domain info is still correct and not overridden
self.assertEqual(domain._cache["auth_info"], self.mockDataInfoDomain.auth_info)
self.assertEqual(domain._cache["cr_date"], self.mockDataInfoDomain.cr_date)
self.assertEqual(domain._cache["contacts"], expectedContactsDict)
# check contacts
self.assertEqual(domain._cache["_contacts"], self.mockDataInfoDomain.contacts)
# The contact list should not contain what is sent by the registry by default,
# as _fetch_cache will transform the type to PublicContact
self.assertNotEqual(domain._cache["contacts"], expectedUnfurledContactsList)
# get and check hosts is set correctly
domain._get_property("hosts")
self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
self.assertEqual(domain._cache["contacts"], expectedContactsDict)
# invalidate cache
domain._cache = {}
self.assertEqual(domain._cache["contacts"], expectedContactsDict)
# get host
domain._get_property("hosts")
# Should return empty bc fake.host.com is not a subdomain of igorville.gov
self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
# get and check hosts is set correctly
domain._get_property("hosts")
self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
self.assertEqual(domain._cache["contacts"], expectedContactsDict)
# invalidate cache
domain._cache = {}
# get contacts
domain._get_property("contacts")
self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
self.assertEqual(domain._cache["contacts"], expectedContactsDict)
# get host
domain._get_property("hosts")
# Should return empty bc fake.host.com is not a subdomain of igorville.gov
self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
# get contacts
domain._get_property("contacts")
self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
self.assertEqual(domain._cache["contacts"], expectedContactsDict)
def test_cache_nested_elements_is_subdomain(self):
"""Cache works correctly with the nested objects cache and hosts"""
@ -1248,6 +1250,13 @@ class TestRegistrantNameservers(MockEppLib):
name="threenameserversDomain.gov", state=Domain.State.READY
)
def tearDown(self):
PublicContact.objects.all().delete()
HostIP.objects.all().delete()
Host.objects.all().delete()
Domain.objects.all().delete()
super().tearDown()
def test_get_nameserver_changes_success_deleted_vals(self):
"""Testing only deleting and no other changes"""
with less_console_noise():
@ -1797,6 +1806,7 @@ class TestRegistrantNameservers(MockEppLib):
mock_host_ip_get_or_create.assert_not_called()
self.assertEqual(mock_host_ip_get_or_create.call_count, 0)
# @less_console_noise_decorator
def test_nameservers_stored_on_fetch_cache_not_subdomain_with_ip(self):
"""
Scenario: Nameservers are stored in db when they are retrieved from fetch_cache.
@ -1808,21 +1818,20 @@ class TestRegistrantNameservers(MockEppLib):
#3: Nameserver is not a subdomain, but it does have an IP address returned
due to how we set up our defaults
"""
with less_console_noise():
domain, _ = Domain.objects.get_or_create(name="fake.gov", state=Domain.State.READY)
domain, _ = Domain.objects.get_or_create(name="freeman.gov", state=Domain.State.READY)
with patch.object(Host.objects, "get_or_create") as mock_host_get_or_create, patch.object(
HostIP.objects, "get_or_create"
) as mock_host_ip_get_or_create:
mock_host_get_or_create.return_value = (Host(domain=domain), True)
mock_host_ip_get_or_create.return_value = (HostIP(), True)
with patch.object(Host.objects, "get_or_create") as mock_host_get_or_create, patch.object(
HostIP.objects, "get_or_create"
) as mock_host_ip_get_or_create:
mock_host_get_or_create.return_value = (Host(domain=domain), True)
mock_host_ip_get_or_create.return_value = (HostIP(), True)
# force fetch_cache to be called, which will return above documented mocked hosts
domain.nameservers
# force fetch_cache to be called, which will return above documented mocked hosts
domain.nameservers
mock_host_get_or_create.assert_called_once_with(domain=domain, name="fake.host.com")
mock_host_ip_get_or_create.assert_not_called()
self.assertEqual(mock_host_ip_get_or_create.call_count, 0)
mock_host_get_or_create.assert_called_once_with(domain=domain, name="fake.host.com")
mock_host_ip_get_or_create.assert_not_called()
self.assertEqual(mock_host_ip_get_or_create.call_count, 0)
def test_nameservers_stored_on_fetch_cache_not_subdomain_without_ip(self):
"""
@ -1861,12 +1870,6 @@ class TestRegistrantNameservers(MockEppLib):
with self.assertRaises(RegistryError):
domain.nameservers = [("ns1.failednameserver.gov", ["4.5.6"])]
def tearDown(self):
HostIP.objects.all().delete()
Host.objects.all().delete()
Domain.objects.all().delete()
return super().tearDown()
class TestNameserverValidation(TestCase):
"""Test the isValidDomain method which validates nameservers"""
@ -1947,8 +1950,6 @@ class TestRegistrantDNSSEC(MockEppLib):
And a domain exists in the registry
"""
super().setUp()
# for the tests, need a domain in the unknown state
self.domain, _ = Domain.objects.get_or_create(name="fake.gov")
def tearDown(self):
PublicContact.objects.all().delete()
@ -2041,6 +2042,7 @@ class TestRegistrantDNSSEC(MockEppLib):
self.assertEquals(dnssecdata_get.dsData, self.dnssecExtensionWithDsData.dsData)
patcher.stop()
@less_console_noise_decorator
def test_dnssec_is_idempotent(self):
"""
Scenario: Registrant adds DNS data twice, due to a UI glitch
@ -2126,6 +2128,7 @@ class TestRegistrantDNSSEC(MockEppLib):
self.assertEquals(dnssecdata_get.dsData, self.dnssecExtensionWithDsData.dsData)
patcher.stop()
@less_console_noise_decorator
def test_user_adds_dnssec_data_multiple_dsdata(self):
"""
Scenario: Registrant adds DNSSEC data with multiple DSData.
@ -2194,6 +2197,7 @@ class TestRegistrantDNSSEC(MockEppLib):
self.assertEquals(dnssecdata_get.dsData, self.dnssecExtensionWithMultDsData.dsData)
patcher.stop()
# @less_console_noise_decorator
def test_user_removes_dnssec_data(self):
"""
Scenario: Registrant removes DNSSEC ds data.
@ -2219,28 +2223,27 @@ class TestRegistrantDNSSEC(MockEppLib):
else:
return MagicMock(res_data=[self.mockDataInfoHosts])
with less_console_noise():
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
with patch("registrar.models.domain.registry.send") as mocked_send:
mocked_send.side_effect = side_effect
domain, _ = Domain.objects.get_or_create(name="dnssec-dsdata.gov")
# Initial setting of dnssec data
domain.dnssecdata = self.dnssecExtensionWithDsData
# Check dsdata_last_change is updated
domain = Domain.objects.get(name="dnssec-dsdata.gov")
self.assertIsNotNone(domain.dsdata_last_change)
initial_change = domain.dsdata_last_change
# Invalidate the cache to force a fresh lookup
domain._invalidate_cache()
# Remove dnssec data
domain.dnssecdata = self.dnssecExtensionRemovingDsData
# Check that dsdata_last_change is updated again
domain = Domain.objects.get(name="dnssec-dsdata.gov")
self.assertIsNotNone(domain.dsdata_last_change)
self.assertNotEqual(domain.dsdata_last_change, initial_change)
# get the DNS SEC extension added to the UpdateDomain command and
@ -2292,7 +2295,6 @@ class TestRegistrantDNSSEC(MockEppLib):
),
]
)
patcher.stop()
def test_update_is_unsuccessful(self):
"""
@ -2697,38 +2699,6 @@ class TestAnalystDelete(MockEppLib):
Domain.objects.all().delete()
super().tearDown()
@less_console_noise_decorator
def test_analyst_deletes_domain(self):
"""
Scenario: Analyst permanently deletes a domain
When `domain.deletedInEpp()` is called
Then `commands.DeleteDomain` is sent to the registry
And `state` is set to `DELETED`
The deleted date is set.
"""
# Put the domain in client hold
self.domain.place_client_hold()
# Delete it...
self.domain.deletedInEpp()
self.domain.save()
self.mockedSendFunction.assert_has_calls(
[
call(
commands.DeleteDomain(name="fake.gov"),
cleaned=True,
)
]
)
# Domain itself should not be deleted
self.assertNotEqual(self.domain, None)
# Domain should have the right state
self.assertEqual(self.domain.state, Domain.State.DELETED)
# Domain should have a deleted
self.assertNotEqual(self.domain.deleted, None)
# Cache should be invalidated
self.assertEqual(self.domain._cache, {})
@less_console_noise_decorator
def test_deletion_is_unsuccessful(self):
"""
@ -2756,18 +2726,44 @@ class TestAnalystDelete(MockEppLib):
@less_console_noise_decorator
def test_deletion_with_host_and_contacts(self):
"""
Scenario: Domain with related Host and Contacts is Deleted
When a contact and host exists that is tied to this domain
Then all the needed commands are sent to the registry
And `state` is set to `DELETED`
"""
# Put the domain in client hold
self.domain_with_contacts.place_client_hold()
# Delete it
self.domain_with_contacts.deletedInEpp()
self.domain_with_contacts.save()
Scenario: Domain with related Host and Contacts is Deleted.
When a contact and host exists that is tied to this domain,
then all the needed commands are sent to the registry and
the domain's state is set to DELETED.
# Check that the host and contacts are deleted
This test now asserts only the commands that are actually issued
during the deletion process.
"""
# Put the domain in client hold.
self.domain_with_contacts.place_client_hold()
# Invalidate the cache so that deletion fetches fresh data.
self.domain_with_contacts._invalidate_cache()
# We'll use a mutable counter to simulate different responses if needed.
info_domain_call_count = [0]
# TODO: This is a hack, we should refactor the MockEPPLib to be more flexible
def side_effect(request, cleaned=True):
# For an InfoDomain command for "freeman.gov", simulate behavior:
if isinstance(request, commands.InfoDomain) and request.name.lower() == "freeman.gov":
info_domain_call_count[0] += 1
fake_info = copy.deepcopy(self.InfoDomainWithContacts)
# If this branch ever gets hit, you could vary response based on call count.
# But note: in our current deletion flow, InfoDomain may not be called.
if info_domain_call_count[0] == 1:
fake_info.hosts = ["fake.host.com"]
else:
fake_info.hosts = []
return MagicMock(res_data=[fake_info])
return self.mockedSendFunction(request, cleaned=cleaned)
with patch("registrar.models.domain.registry.send", side_effect=side_effect):
self.domain_with_contacts.deletedInEpp()
self.domain_with_contacts.save()
# Now assert the expected calls that we know occur.
# Note: we no longer assert a call to InfoDomain.
self.mockedSendFunction.assert_has_calls(
[
call(
@ -2782,14 +2778,10 @@ class TestAnalystDelete(MockEppLib):
),
cleaned=True,
),
]
],
)
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoDomain(name="freeman.gov", auth_info=None),
cleaned=True,
),
call(
commands.InfoHost(name="fake.host.com"),
cleaned=True,
@ -2806,7 +2798,8 @@ class TestAnalystDelete(MockEppLib):
),
cleaned=True,
),
]
],
any_order=True,
)
self.mockedSendFunction.assert_has_calls(
[
@ -2857,12 +2850,55 @@ class TestAnalystDelete(MockEppLib):
),
],
)
# Domain itself should not be deleted
self.assertNotEqual(self.domain_with_contacts, None)
# State should have changed
self.assertIsNotNone(self.domain_with_contacts)
self.assertEqual(self.domain_with_contacts.state, Domain.State.DELETED)
@less_console_noise_decorator
def test_analyst_deletes_domain_with_ds_data(self):
"""
Scenario: Domain with DS data is deleted
When `domain.deletedInEpp()` is called
Then `commands.DeleteDomain` is sent to the registry
And `state` is set to `DELETED`
"""
# Create a domain with DS data
domain, _ = Domain.objects.get_or_create(name="dsdomain.gov", state=Domain.State.READY)
# set domain to be on hold
domain.place_client_hold()
domain.dnssecdata = extensions.DNSSECExtension(
dsData=[extensions.DSData(keyTag=1, alg=1, digestType=1, digest="1234567890")],
)
domain.save()
# Mock the InfoDomain command data to return a domain with no hosts
# This is needed to simulate the domain being able to be deleted
self.mockDataInfoDomain.hosts = []
# Delete the domain
domain.deletedInEpp()
domain.save()
# Check that dsdata is None
self.assertEqual(domain.dnssecdata, None)
# Check that the UpdateDomain command was sent to the registry with the correct extension
self.mockedSendFunction.assert_has_calls(
[
call(
commands.UpdateDomain(
name="dsdomain.gov", add=[], rem=[], nsset=None, keyset=None, registrant=None, auth_info=None
),
cleaned=True,
),
]
)
# Check that the domain was deleted
self.assertEqual(domain.state, Domain.State.DELETED)
# reset to avoid test pollution
self.mockDataInfoDomain.hosts = ["fake.host.com"]
@less_console_noise_decorator
def test_deletion_ready_fsm_failure(self):
"""

View file

@ -1675,7 +1675,8 @@ class TestPortfolioMemberDeleteView(WebTest):
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
def test_portfolio_member_delete_view_members_table_active_requests(self, send_removal_emails):
@patch("registrar.views.portfolios.send_portfolio_member_permission_remove_email")
def test_portfolio_member_delete_view_members_table_active_requests(self, send_member_removal, send_removal_emails):
"""Error state w/ deleting a member with active request on Members Table"""
# I'm a user
UserPortfolioPermission.objects.get_or_create(
@ -1715,12 +1716,15 @@ class TestPortfolioMemberDeleteView(WebTest):
# assert that send_portfolio_admin_removal_emails is not called
send_removal_emails.assert_not_called()
# assert that send_portfolio_member_permission_remove_email is not called
send_member_removal.assert_not_called()
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
def test_portfolio_member_delete_view_members_table_only_admin(self, send_removal_emails):
@patch("registrar.views.portfolios.send_portfolio_member_permission_remove_email")
def test_portfolio_member_delete_view_members_table_only_admin(self, send_member_removal, send_removal_emails):
"""Error state w/ deleting a member that's the only admin on Members Table"""
# I'm a user with admin permission
@ -1750,12 +1754,15 @@ class TestPortfolioMemberDeleteView(WebTest):
# assert that send_portfolio_admin_removal_emails is not called
send_removal_emails.assert_not_called()
# assert that send_portfolio_member_permission_remove_email is not called
send_member_removal.assert_not_called()
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
def test_portfolio_member_table_delete_member_success(self, mock_send_removal_emails):
@patch("registrar.views.portfolios.send_portfolio_member_permission_remove_email")
def test_portfolio_member_table_delete_member_success(self, send_member_removal, mock_send_removal_emails):
"""Success state with deleting on Members Table page bc no active request AND not only admin"""
# I'm a user
@ -1780,6 +1787,9 @@ class TestPortfolioMemberDeleteView(WebTest):
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
# Member removal email sent successfully
send_member_removal.return_value = True
# And set that the member has no active requests AND it's not the only admin
with patch.object(User, "get_active_requests_count_in_portfolio", return_value=0), patch.object(
User, "is_only_admin_of_portfolio", return_value=False
@ -1802,12 +1812,23 @@ class TestPortfolioMemberDeleteView(WebTest):
# assert that send_portfolio_admin_removal_emails is not called
# because member being removed is not an admin
mock_send_removal_emails.assert_not_called()
# assert that send_portfolio_member_permission_remove_email is called
send_member_removal.assert_called_once()
# Get the arguments passed to send_portfolio_member_permission_remove_email
_, called_kwargs = send_member_removal.call_args
# Assert the email content
self.assertEqual(called_kwargs["requestor"], self.user)
self.assertEqual(called_kwargs["permissions"].user, upp.user)
self.assertEqual(called_kwargs["permissions"].portfolio, upp.portfolio)
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
def test_portfolio_member_table_delete_admin_success(self, mock_send_removal_emails):
@patch("registrar.views.portfolios.send_portfolio_member_permission_remove_email")
def test_portfolio_member_table_delete_admin_success(self, send_member_removal, mock_send_removal_emails):
"""Success state with deleting on Members Table page bc no active request AND
not only admin. Because admin, removal emails are sent."""
@ -1834,6 +1855,7 @@ class TestPortfolioMemberDeleteView(WebTest):
)
mock_send_removal_emails.return_value = True
send_member_removal.return_value = True
# And set that the member has no active requests AND it's not the only admin
with patch.object(User, "get_active_requests_count_in_portfolio", return_value=0), patch.object(
@ -1856,6 +1878,8 @@ class TestPortfolioMemberDeleteView(WebTest):
# assert that send_portfolio_admin_removal_emails is called
mock_send_removal_emails.assert_called_once()
# assert that send_portfolio_member_permission_remove_email is called
send_member_removal.assert_called_once()
# Get the arguments passed to send_portfolio_admin_addition_emails
_, called_kwargs = mock_send_removal_emails.call_args
@ -1865,13 +1889,25 @@ class TestPortfolioMemberDeleteView(WebTest):
self.assertEqual(called_kwargs["requestor"], self.user)
self.assertEqual(called_kwargs["portfolio"], self.portfolio)
# Get the arguments passed to send_portfolio_member_permission_remove_email
_, called_kwargs = send_member_removal.call_args
# Assert the email content
self.assertEqual(called_kwargs["requestor"], self.user)
self.assertEqual(called_kwargs["permissions"].user, upp.user)
self.assertEqual(called_kwargs["permissions"].portfolio, upp.portfolio)
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
def test_portfolio_member_table_delete_admin_success_removal_email_fail(self, mock_send_removal_emails):
@patch("registrar.views.portfolios.send_portfolio_member_permission_remove_email")
def test_portfolio_member_table_delete_admin_success_removal_email_fail(
self, send_member_removal, mock_send_removal_emails
):
"""Success state with deleting on Members Table page bc no active request AND
not only admin. Because admin, removal emails are sent, but fail to send."""
not only admin. Because admin, removal emails are sent, but fail to send.
Email to removed member also fails to send."""
# I'm a user
UserPortfolioPermission.objects.get_or_create(
@ -1896,6 +1932,7 @@ class TestPortfolioMemberDeleteView(WebTest):
)
mock_send_removal_emails.return_value = False
send_member_removal.return_value = False
# And set that the member has no active requests AND it's not the only admin
with patch.object(User, "get_active_requests_count_in_portfolio", return_value=0), patch.object(
@ -1918,6 +1955,8 @@ class TestPortfolioMemberDeleteView(WebTest):
# assert that send_portfolio_admin_removal_emails is called
mock_send_removal_emails.assert_called_once()
# assert that send_portfolio_member_permission_remove_email is called
send_member_removal.assert_called_once()
# Get the arguments passed to send_portfolio_admin_addition_emails
_, called_kwargs = mock_send_removal_emails.call_args
@ -1927,6 +1966,14 @@ class TestPortfolioMemberDeleteView(WebTest):
self.assertEqual(called_kwargs["requestor"], self.user)
self.assertEqual(called_kwargs["portfolio"], self.portfolio)
# Get the arguments passed to send_portfolio_member_permission_remove_email
_, called_kwargs = send_member_removal.call_args
# Assert the email content
self.assertEqual(called_kwargs["requestor"], self.user)
self.assertEqual(called_kwargs["permissions"].user, upp.user)
self.assertEqual(called_kwargs["permissions"].portfolio, upp.portfolio)
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@ -2051,7 +2098,10 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
def test_portfolio_member_delete_view_manage_members_page_invitedmember(self, mock_send_removal_emails):
@patch("registrar.views.portfolios.send_portfolio_invitation_remove_email")
def test_portfolio_member_delete_view_manage_members_page_invitedmember(
self, send_invited_member_removal, mock_send_removal_emails
):
"""Success state w/ deleting invited member on Manage Members page should redirect back to Members Table"""
# I'm a user
@ -2072,6 +2122,10 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
portfolio=self.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
# Invited member removal email sent successfully
send_invited_member_removal.return_value = True
with patch("django.contrib.messages.success") as mock_success:
self.client.force_login(self.user)
response = self.client.post(
@ -2095,12 +2149,25 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
# assert send_portfolio_admin_removal_emails not called since invitation
# is for a basic member
mock_send_removal_emails.assert_not_called()
# assert that send_portfolio_invitation_remove_email is called
send_invited_member_removal.assert_called_once()
# Get the arguments passed to send_portfolio_invitation_removal_email
_, called_kwargs = send_invited_member_removal.call_args
# Assert the email content
self.assertEqual(called_kwargs["requestor"], self.user)
self.assertEqual(called_kwargs["invitation"].email, invitation.email)
self.assertEqual(called_kwargs["invitation"].portfolio, invitation.portfolio)
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
def test_portfolio_member_delete_view_manage_members_page_invitedadmin(self, mock_send_removal_emails):
@patch("registrar.views.portfolios.send_portfolio_invitation_remove_email")
def test_portfolio_member_delete_view_manage_members_page_invitedadmin(
self, send_invited_member_email, mock_send_removal_emails
):
"""Success state w/ deleting invited admin on Manage Members page should redirect back to Members Table"""
# I'm a user
@ -2115,6 +2182,7 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
)
mock_send_removal_emails.return_value = True
send_invited_member_email.return_value = True
# Invite an admin under same portfolio
invited_member_email = "invited_member@example.com"
@ -2146,6 +2214,8 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
# assert send_portfolio_admin_removal_emails is called since invitation
# is for an admin
mock_send_removal_emails.assert_called_once()
# assert that send_portfolio_invitation_remove_email is called
send_invited_member_email.assert_called_once()
# Get the arguments passed to send_portfolio_admin_addition_emails
_, called_kwargs = mock_send_removal_emails.call_args
@ -2155,11 +2225,22 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
self.assertEqual(called_kwargs["requestor"], self.user)
self.assertEqual(called_kwargs["portfolio"], self.portfolio)
# Get the arguments passed to send_portfolio_invitation_remove_email
_, called_kwargs = send_invited_member_email.call_args
# Assert the email content
self.assertEqual(called_kwargs["requestor"], self.user)
self.assertEqual(called_kwargs["invitation"].email, invitation.email)
self.assertEqual(called_kwargs["invitation"].portfolio, invitation.portfolio)
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_admin_removal_emails")
def test_portfolio_member_delete_view_manage_members_page_invitedadmin_email_fails(self, mock_send_removal_emails):
@patch("registrar.views.portfolios.send_portfolio_invitation_remove_email")
def test_portfolio_member_delete_view_manage_members_page_invitedadmin_email_fails(
self, send_invited_member_email, mock_send_removal_emails
):
"""Success state w/ deleting invited admin on Manage Members page should redirect back to Members Table"""
# I'm a user
@ -2174,6 +2255,7 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
)
mock_send_removal_emails.return_value = False
send_invited_member_email.return_value = False
# Invite an admin under same portfolio
invited_member_email = "invited_member@example.com"
@ -2205,6 +2287,8 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
# assert send_portfolio_admin_removal_emails is called since invitation
# is for an admin
mock_send_removal_emails.assert_called_once()
# assert that send_portfolio_invitation_remove_email is called
send_invited_member_email.assert_called_once()
# Get the arguments passed to send_portfolio_admin_addition_emails
_, called_kwargs = mock_send_removal_emails.call_args
@ -2214,6 +2298,14 @@ class TestPortfolioInvitedMemberDeleteView(WebTest):
self.assertEqual(called_kwargs["requestor"], self.user)
self.assertEqual(called_kwargs["portfolio"], self.portfolio)
# Get the arguments passed to send_portfolio_invitation_remove_email
_, called_kwargs = send_invited_member_email.call_args
# Assert the email content
self.assertEqual(called_kwargs["requestor"], self.user)
self.assertEqual(called_kwargs["invitation"].email, invitation.email)
self.assertEqual(called_kwargs["invitation"].portfolio, invitation.portfolio)
class TestPortfolioMemberDomainsView(TestWithUser, WebTest):
@classmethod

View file

@ -2,6 +2,7 @@ from datetime import date
from django.conf import settings
from registrar.models import Domain, DomainInvitation, UserDomainRole
from registrar.models.portfolio import Portfolio
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
from registrar.utility.errors import (
@ -269,6 +270,88 @@ def send_portfolio_member_permission_update_email(requestor, permissions: UserPo
return True
def send_portfolio_member_permission_remove_email(requestor, permissions: UserPortfolioPermission):
"""
Sends an email notification to a portfolio member when their permissions are deleted.
This function retrieves the requestor's email and sends a templated email to the affected user,
notifying them of the removal of their portfolio permissions.
Args:
requestor (User): The user initiating the permission update.
permissions (UserPortfolioPermission): The updated permissions object containing the affected user
and the portfolio details.
Returns:
bool: True if the email was sent successfully, False if an EmailSendingError occurred.
Raises:
MissingEmailError: If the requestor has no email associated with their account.
"""
requestor_email = _get_requestor_email(requestor, portfolio=permissions.portfolio)
try:
send_templated_email(
"emails/portfolio_removal.txt",
"emails/portfolio_removal_subject.txt",
to_address=permissions.user.email,
context={
"requested_user": permissions.user,
"portfolio": permissions.portfolio,
"requestor_email": requestor_email,
},
)
except EmailSendingError:
logger.warning(
"Could not send email organization member removal notification to %s " "for portfolio: %s",
permissions.user.email,
permissions.portfolio.organization_name,
exc_info=True,
)
return False
return True
def send_portfolio_invitation_remove_email(requestor, invitation: PortfolioInvitation):
"""
Sends an email notification to a portfolio invited member when their permissions are deleted.
This function retrieves the requestor's email and sends a templated email to the affected email,
notifying them of the removal of their invited portfolio permissions.
Args:
requestor (User): The user initiating the permission update.
invitation (PortfolioInvitation): The invitation object containing the affected user
and the portfolio details.
Returns:
bool: True if the email was sent successfully, False if an EmailSendingError occurred.
Raises:
MissingEmailError: If the requestor has no email associated with their account.
"""
requestor_email = _get_requestor_email(requestor, portfolio=invitation.portfolio)
try:
send_templated_email(
"emails/portfolio_removal.txt",
"emails/portfolio_removal_subject.txt",
to_address=invitation.email,
context={
"requested_user": None,
"portfolio": invitation.portfolio,
"requestor_email": requestor_email,
},
)
except EmailSendingError:
logger.warning(
"Could not send email organization member removal notification to %s " "for portfolio: %s",
invitation.email,
invitation.portfolio.organization_name,
exc_info=True,
)
return False
return True
def send_portfolio_admin_addition_emails(email: str, requestor, portfolio: Portfolio):
"""
Notifies all portfolio admins of the provided portfolio of a newly invited portfolio admin

View file

@ -32,6 +32,8 @@ from registrar.utility.email_invitations import (
send_portfolio_admin_addition_emails,
send_portfolio_admin_removal_emails,
send_portfolio_invitation_email,
send_portfolio_invitation_remove_email,
send_portfolio_member_permission_remove_email,
send_portfolio_member_permission_update_email,
)
from registrar.utility.errors import MissingEmailError
@ -123,60 +125,84 @@ class PortfolioMemberDeleteView(View):
"""
portfolio_member_permission = get_object_or_404(UserPortfolioPermission, pk=pk)
member = portfolio_member_permission.user
portfolio = portfolio_member_permission.portfolio
# Validate if the member can be removed
error_message = self._validate_member_removal(request, member, portfolio)
if error_message:
return self._handle_error_response(request, error_message, pk)
# Attempt to send notification emails
self._send_removal_notifications(request, portfolio_member_permission)
# Passed all error conditions, proceed with deletion
portfolio_member_permission.delete()
# Return success response
return self._handle_success_response(request, member.email)
def _validate_member_removal(self, request, member, portfolio):
"""
Check whether the member can be removed from the portfolio.
Returns an error message if removal is not allowed; otherwise, returns None.
"""
active_requests_count = member.get_active_requests_count_in_portfolio(request)
support_url = "https://get.gov/contact/"
error_message = ""
if active_requests_count > 0:
# If they have any in progress requests
error_message = mark_safe( # nosec
return mark_safe( # nosec
"This member can't be removed from the organization because they have an active domain request. "
f"Please <a class='usa-link' href='{support_url}' target='_blank'>contact us</a> to remove this member."
)
elif member.is_only_admin_of_portfolio(portfolio_member_permission.portfolio):
# If they are the last manager of a domain
error_message = (
if member.is_only_admin_of_portfolio(portfolio):
return (
"There must be at least one admin in your organization. Give another member admin "
"permissions, make sure they log into the registrar, and then remove this member."
)
return None
# From the Members Table page Else the Member Page
if error_message:
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
return JsonResponse(
{"error": error_message},
status=400,
)
else:
messages.error(request, error_message)
return redirect(reverse("member", kwargs={"pk": pk}))
def _handle_error_response(self, request, error_message, pk):
"""
Return an error response (JSON or redirect with messages).
"""
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
return JsonResponse({"error": error_message}, status=400)
messages.error(request, error_message)
return redirect(reverse("member", kwargs={"pk": pk}))
# if member being removed is an admin
if UserPortfolioRoleChoices.ORGANIZATION_ADMIN in portfolio_member_permission.roles:
try:
# attempt to send notification emails of the removal to other portfolio admins
def _send_removal_notifications(self, request, portfolio_member_permission):
"""
Attempt to send notification emails about the member's removal.
"""
try:
# Notify other portfolio admins if removing an admin
if UserPortfolioRoleChoices.ORGANIZATION_ADMIN in portfolio_member_permission.roles:
if not send_portfolio_admin_removal_emails(
email=portfolio_member_permission.user.email,
requestor=request.user,
portfolio=portfolio_member_permission.portfolio,
):
messages.warning(self.request, "Could not send email notification to existing organization admins.")
except Exception as e:
self._handle_exceptions(e)
messages.warning(request, "Could not send email notification to existing organization admins.")
# passed all error conditions
portfolio_member_permission.delete()
# Notify the member being removed
if not send_portfolio_member_permission_remove_email(
requestor=request.user, permissions=portfolio_member_permission
):
messages.warning(
request, f"Could not send email notification to {portfolio_member_permission.user.email}"
)
except Exception as e:
self._handle_exceptions(e)
# From the Members Table page Else the Member Page
success_message = f"You've removed {member.email} from the organization."
def _handle_success_response(self, request, member_email):
"""
Return a success response (JSON or redirect with messages).
"""
success_message = f"You've removed {member_email} from the organization."
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
return JsonResponse({"success": success_message}, status=200)
else:
messages.success(request, success_message)
return redirect(reverse("members"))
messages.success(request, success_message)
return redirect(reverse("members"))
def _handle_exceptions(self, exception):
"""Handle exceptions raised during the process."""
@ -458,16 +484,18 @@ class PortfolioInvitedMemberDeleteView(View):
"""
portfolio_invitation = get_object_or_404(PortfolioInvitation, pk=pk)
# if invitation being removed is an admin
if UserPortfolioRoleChoices.ORGANIZATION_ADMIN in portfolio_invitation.roles:
try:
try:
# if invitation being removed is an admin
if UserPortfolioRoleChoices.ORGANIZATION_ADMIN in portfolio_invitation.roles:
# attempt to send notification emails of the removal to portfolio admins
if not send_portfolio_admin_removal_emails(
email=portfolio_invitation.email, requestor=request.user, portfolio=portfolio_invitation.portfolio
):
messages.warning(self.request, "Could not send email notification to existing organization admins.")
except Exception as e:
self._handle_exceptions(e)
if not send_portfolio_invitation_remove_email(requestor=request.user, invitation=portfolio_invitation):
messages.warning(request, f"Could not send email notification to {portfolio_invitation.email}")
except Exception as e:
self._handle_exceptions(e)
portfolio_invitation.delete()

View file

@ -1,5 +1,4 @@
"""Views for a User Profile.
"""
"""Views for a User Profile."""
import logging