Merge pull request #3485 from cisagov/ko/3392-fix-domain-deletion

#3392: fix domain deletion - [MS]
This commit is contained in:
Matt-Spence 2025-02-20 13:39:38 -05:00 committed by GitHub
commit b6ab6b2366
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 272 additions and 157 deletions

View file

@ -3738,11 +3738,13 @@ class DomainAdmin(ListHeaderAdmin, ImportExportModelAdmin):
# Using variables to get past the linter
message1 = f"Cannot delete Domain when in state {obj.state}"
message2 = f"This subdomain is being used as a hostname on another domain: {err.note}"
message3 = f"Command failed with note: {err.note}"
# Human-readable mappings of ErrorCodes. Can be expanded.
error_messages = {
# noqa on these items as black wants to reformat to an invalid length
ErrorCode.OBJECT_STATUS_PROHIBITS_OPERATION: message1,
ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION: message2,
ErrorCode.COMMAND_FAILED: message3,
}
message = "Cannot connect to the registry"

View file

@ -2,6 +2,7 @@ from itertools import zip_longest
import logging
import ipaddress
import re
import time
from datetime import date, timedelta
from typing import Optional
from django.db import transaction
@ -750,11 +751,7 @@ class Domain(TimeStampedModel, DomainHelper):
successTotalNameservers = len(oldNameservers) - deleteCount + addToDomainCount
try:
self._delete_hosts_if_not_used(hostsToDelete=deleted_values)
except Exception as e:
# we don't need this part to succeed in order to continue.
logger.error("Failed to delete nameserver hosts: %s", e)
if successTotalNameservers < 2:
try:
@ -1038,13 +1035,13 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error(f"registry error removing client hold: {err}")
raise (err)
def _delete_domain(self):
def _delete_domain(self): # noqa
"""This domain should be deleted from the registry
may raises RegistryError, should be caught or handled correctly by caller"""
logger.info("Deleting subdomains for %s", self.name)
# check if any subdomains are in use by another domain
hosts = Host.objects.filter(name__regex=r".+{}".format(self.name))
hosts = Host.objects.filter(name__regex=r".+\.{}".format(self.name))
for host in hosts:
if host.domain != self:
logger.error("Unable to delete host: %s is in use by another domain: %s", host.name, host.domain)
@ -1052,7 +1049,8 @@ class Domain(TimeStampedModel, DomainHelper):
code=ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION,
note=f"Host {host.name} is in use by {host.domain}",
)
try:
# set hosts to empty list so nameservers are deleted
(
deleted_values,
updated_values,
@ -1060,30 +1058,110 @@ class Domain(TimeStampedModel, DomainHelper):
oldNameservers,
) = self.getNameserverChanges(hosts=[])
_ = self._update_host_values(updated_values, oldNameservers) # returns nothing, just need to be run and errors
# update the hosts
_ = self._update_host_values(
updated_values, oldNameservers
) # returns nothing, just need to be run and errors
addToDomainList, _ = self.createNewHostList(new_values)
deleteHostList, _ = self.createDeleteHostList(deleted_values)
responseCode = self.addAndRemoveHostsFromDomain(hostsToAdd=addToDomainList, hostsToDelete=deleteHostList)
except RegistryError as e:
logger.error(f"Error trying to delete hosts from domain {self}: {e}")
raise e
# if unable to update domain raise error and stop
if responseCode != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
raise NameserverError(code=nsErrorCodes.BAD_DATA)
logger.info("Finished removing nameservers from domain")
# addAndRemoveHostsFromDomain removes the hosts from the domain object,
# but we still need to delete the object themselves
self._delete_hosts_if_not_used(hostsToDelete=deleted_values)
logger.info("Finished _delete_hosts_if_not_used inside _delete_domain()")
# delete the non-registrant contacts
logger.debug("Deleting non-registrant contacts for %s", self.name)
contacts = PublicContact.objects.filter(domain=self)
logger.info(f"retrieved contacts for domain: {contacts}")
for contact in contacts:
try:
if contact.contact_type != PublicContact.ContactTypeChoices.REGISTRANT:
logger.info(f"Deleting contact: {contact}")
try:
self._update_domain_with_contact(contact, rem=True)
except Exception as e:
logger.error(f"Error while updating domain with contact: {contact}, e: {e}", exc_info=True)
request = commands.DeleteContact(contact.registry_id)
registry.send(request, cleaned=True)
logger.info(f"sent DeleteContact for {contact}")
except RegistryError as e:
logger.error(f"Error deleting contact: {contact}, {e}", exc_info=True)
logger.info("Deleting domain %s", self.name)
logger.info(f"Finished deleting contacts for {self.name}")
# delete ds data if it exists
if self.dnssecdata:
logger.debug("Deleting ds data for %s", self.name)
try:
# set and unset client hold to be able to change ds data
logger.info("removing client hold")
self._remove_client_hold()
self.dnssecdata = None
logger.info("placing client hold")
self._place_client_hold()
except RegistryError as e:
logger.error("Error deleting ds data for %s: %s", self.name, e)
e.note = "Error deleting ds data for %s" % self.name
raise e
# check if the domain can be deleted
if not self._domain_can_be_deleted():
note = "Domain has associated objects that prevent deletion."
raise RegistryError(code=ErrorCode.COMMAND_FAILED, note=note)
# delete the domain
request = commands.DeleteDomain(name=self.name)
try:
registry.send(request, cleaned=True)
logger.info("Domain %s deleted successfully.", self.name)
except RegistryError as e:
logger.error("Error deleting domain %s: %s", self.name, e)
raise e
def _domain_can_be_deleted(self, max_attempts=5, wait_interval=2) -> bool:
"""
Polls the registry using InfoDomain calls to confirm that the domain can be deleted.
Returns True if the domain can be deleted, False otherwise. Includes a retry mechanism
using wait_interval and max_attempts, which may be necessary if subdomains and other
associated objects were only recently deleted as the registry may not be immediately updated.
"""
logger.info("Polling registry to confirm deletion pre-conditions for %s", self.name)
last_info_error = None
for attempt in range(max_attempts):
try:
info_response = registry.send(commands.InfoDomain(name=self.name), cleaned=True)
domain_info = info_response.res_data[0]
hosts_associated = getattr(domain_info, "hosts", None)
if hosts_associated is None or len(hosts_associated) == 0:
logger.info("InfoDomain reports no associated hosts for %s. Proceeding with deletion.", self.name)
return True
else:
logger.info("Attempt %d: Domain %s still has hosts: %s", attempt + 1, self.name, hosts_associated)
except RegistryError as info_e:
# If the domain is already gone, we can assume deletion already occurred.
if info_e.code == ErrorCode.OBJECT_DOES_NOT_EXIST:
logger.info("InfoDomain check indicates domain %s no longer exists.", self.name)
raise info_e
logger.warning("Attempt %d: Error during InfoDomain check: %s", attempt + 1, info_e)
time.sleep(wait_interval)
else:
logger.error(
"Exceeded max attempts waiting for domain %s to clear associated objects; last error: %s",
self.name,
last_info_error,
)
return False
def __str__(self) -> str:
return self.name
@ -1840,8 +1918,6 @@ class Domain(TimeStampedModel, DomainHelper):
else:
logger.error("Error _delete_hosts_if_not_used, code was %s error was %s" % (e.code, e))
raise e
def _fix_unknown_state(self, cleaned):
"""
_fix_unknown_state: Calls _add_missing_contacts_if_unknown

View file

@ -3816,7 +3816,7 @@ class TestTransferUser(WebTest):
with self.assertRaises(User.DoesNotExist):
self.user2.refresh_from_db()
# @less_console_noise_decorator
@less_console_noise_decorator
def test_transfer_user_throws_transfer_and_delete_success_messages(self):
"""Test that success messages for data transfer and user deletion are displayed."""
# Ensure the setup for VerifiedByStaff

View file

@ -178,7 +178,7 @@ class TestDomainAdminAsStaff(MockEppLib):
Then a user-friendly success message is returned for displaying on the web
And `state` is set to `DELETED`
"""
domain = create_ready_domain()
domain, _ = Domain.objects.get_or_create(name="my-nameserver.gov", state=Domain.State.READY)
# Put in client hold
domain.place_client_hold()
# Ensure everything is displaying correctly
@ -212,7 +212,7 @@ class TestDomainAdminAsStaff(MockEppLib):
mock_add_message.assert_called_once_with(
request,
messages.INFO,
"Domain city.gov has been deleted. Thanks!",
"Domain my-nameserver.gov has been deleted. Thanks!",
extra_tags="",
fail_silently=False,
)
@ -266,7 +266,7 @@ class TestDomainAdminAsStaff(MockEppLib):
mock_add_message.assert_called_once_with(
request,
messages.ERROR,
"Error deleting this Domain: This subdomain is being used as a hostname on another domain: ns1.sharedhost.com", # noqa
"Error deleting this Domain: Command failed with note: Domain has associated objects that prevent deletion.", # noqa
extra_tags="",
fail_silently=False,
)
@ -321,7 +321,7 @@ class TestDomainAdminAsStaff(MockEppLib):
Then `commands.DeleteDomain` is sent to the registry
And Domain returns normally without an error dialog
"""
domain = create_ready_domain()
domain, _ = Domain.objects.get_or_create(name="my-nameserver.gov", state=Domain.State.READY)
# Put in client hold
domain.place_client_hold()
# Ensure everything is displaying correctly
@ -340,12 +340,13 @@ class TestDomainAdminAsStaff(MockEppLib):
)
request.user = self.client
# Delete it once
with patch("django.contrib.messages.add_message") as mock_add_message:
self.admin.do_delete_domain(request, domain)
mock_add_message.assert_called_once_with(
request,
messages.INFO,
"Domain city.gov has been deleted. Thanks!",
"Domain my-nameserver.gov has been deleted. Thanks!",
extra_tags="",
fail_silently=False,
)

View file

@ -35,6 +35,7 @@ from epplibwrapper import (
from .common import MockEppLib, MockSESClient, less_console_noise
import logging
import boto3_mocking # type: ignore
import copy
logger = logging.getLogger(__name__)
@ -97,9 +98,10 @@ class TestDomainCache(MockEppLib):
self.mockedSendFunction.assert_has_calls(expectedCalls)
# @less_console_noise_decorator
def test_cache_nested_elements_not_subdomain(self):
"""Cache works correctly with the nested objects cache and hosts"""
with less_console_noise():
domain, _ = Domain.objects.get_or_create(name="igorville.gov")
# The contact list will initially contain objects of type 'DomainContact'
# this is then transformed into PublicContact, and cache should NOT
@ -1248,6 +1250,13 @@ class TestRegistrantNameservers(MockEppLib):
name="threenameserversDomain.gov", state=Domain.State.READY
)
def tearDown(self):
PublicContact.objects.all().delete()
HostIP.objects.all().delete()
Host.objects.all().delete()
Domain.objects.all().delete()
super().tearDown()
def test_get_nameserver_changes_success_deleted_vals(self):
"""Testing only deleting and no other changes"""
with less_console_noise():
@ -1797,6 +1806,7 @@ class TestRegistrantNameservers(MockEppLib):
mock_host_ip_get_or_create.assert_not_called()
self.assertEqual(mock_host_ip_get_or_create.call_count, 0)
# @less_console_noise_decorator
def test_nameservers_stored_on_fetch_cache_not_subdomain_with_ip(self):
"""
Scenario: Nameservers are stored in db when they are retrieved from fetch_cache.
@ -1808,8 +1818,7 @@ class TestRegistrantNameservers(MockEppLib):
#3: Nameserver is not a subdomain, but it does have an IP address returned
due to how we set up our defaults
"""
with less_console_noise():
domain, _ = Domain.objects.get_or_create(name="fake.gov", state=Domain.State.READY)
domain, _ = Domain.objects.get_or_create(name="freeman.gov", state=Domain.State.READY)
with patch.object(Host.objects, "get_or_create") as mock_host_get_or_create, patch.object(
HostIP.objects, "get_or_create"
@ -1861,12 +1870,6 @@ class TestRegistrantNameservers(MockEppLib):
with self.assertRaises(RegistryError):
domain.nameservers = [("ns1.failednameserver.gov", ["4.5.6"])]
def tearDown(self):
HostIP.objects.all().delete()
Host.objects.all().delete()
Domain.objects.all().delete()
return super().tearDown()
class TestNameserverValidation(TestCase):
"""Test the isValidDomain method which validates nameservers"""
@ -1947,8 +1950,6 @@ class TestRegistrantDNSSEC(MockEppLib):
And a domain exists in the registry
"""
super().setUp()
# for the tests, need a domain in the unknown state
self.domain, _ = Domain.objects.get_or_create(name="fake.gov")
def tearDown(self):
PublicContact.objects.all().delete()
@ -2041,6 +2042,7 @@ class TestRegistrantDNSSEC(MockEppLib):
self.assertEquals(dnssecdata_get.dsData, self.dnssecExtensionWithDsData.dsData)
patcher.stop()
@less_console_noise_decorator
def test_dnssec_is_idempotent(self):
"""
Scenario: Registrant adds DNS data twice, due to a UI glitch
@ -2126,6 +2128,7 @@ class TestRegistrantDNSSEC(MockEppLib):
self.assertEquals(dnssecdata_get.dsData, self.dnssecExtensionWithDsData.dsData)
patcher.stop()
@less_console_noise_decorator
def test_user_adds_dnssec_data_multiple_dsdata(self):
"""
Scenario: Registrant adds DNSSEC data with multiple DSData.
@ -2194,6 +2197,7 @@ class TestRegistrantDNSSEC(MockEppLib):
self.assertEquals(dnssecdata_get.dsData, self.dnssecExtensionWithMultDsData.dsData)
patcher.stop()
# @less_console_noise_decorator
def test_user_removes_dnssec_data(self):
"""
Scenario: Registrant removes DNSSEC ds data.
@ -2219,28 +2223,27 @@ class TestRegistrantDNSSEC(MockEppLib):
else:
return MagicMock(res_data=[self.mockDataInfoHosts])
with less_console_noise():
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
with patch("registrar.models.domain.registry.send") as mocked_send:
mocked_send.side_effect = side_effect
domain, _ = Domain.objects.get_or_create(name="dnssec-dsdata.gov")
# Initial setting of dnssec data
domain.dnssecdata = self.dnssecExtensionWithDsData
# Check dsdata_last_change is updated
domain = Domain.objects.get(name="dnssec-dsdata.gov")
self.assertIsNotNone(domain.dsdata_last_change)
initial_change = domain.dsdata_last_change
# Invalidate the cache to force a fresh lookup
domain._invalidate_cache()
# Remove dnssec data
domain.dnssecdata = self.dnssecExtensionRemovingDsData
# Check that dsdata_last_change is updated again
domain = Domain.objects.get(name="dnssec-dsdata.gov")
self.assertIsNotNone(domain.dsdata_last_change)
self.assertNotEqual(domain.dsdata_last_change, initial_change)
# get the DNS SEC extension added to the UpdateDomain command and
@ -2292,7 +2295,6 @@ class TestRegistrantDNSSEC(MockEppLib):
),
]
)
patcher.stop()
def test_update_is_unsuccessful(self):
"""
@ -2697,38 +2699,6 @@ class TestAnalystDelete(MockEppLib):
Domain.objects.all().delete()
super().tearDown()
@less_console_noise_decorator
def test_analyst_deletes_domain(self):
"""
Scenario: Analyst permanently deletes a domain
When `domain.deletedInEpp()` is called
Then `commands.DeleteDomain` is sent to the registry
And `state` is set to `DELETED`
The deleted date is set.
"""
# Put the domain in client hold
self.domain.place_client_hold()
# Delete it...
self.domain.deletedInEpp()
self.domain.save()
self.mockedSendFunction.assert_has_calls(
[
call(
commands.DeleteDomain(name="fake.gov"),
cleaned=True,
)
]
)
# Domain itself should not be deleted
self.assertNotEqual(self.domain, None)
# Domain should have the right state
self.assertEqual(self.domain.state, Domain.State.DELETED)
# Domain should have a deleted
self.assertNotEqual(self.domain.deleted, None)
# Cache should be invalidated
self.assertEqual(self.domain._cache, {})
@less_console_noise_decorator
def test_deletion_is_unsuccessful(self):
"""
@ -2756,18 +2726,44 @@ class TestAnalystDelete(MockEppLib):
@less_console_noise_decorator
def test_deletion_with_host_and_contacts(self):
"""
Scenario: Domain with related Host and Contacts is Deleted
When a contact and host exists that is tied to this domain
Then all the needed commands are sent to the registry
And `state` is set to `DELETED`
Scenario: Domain with related Host and Contacts is Deleted.
When a contact and host exists that is tied to this domain,
then all the needed commands are sent to the registry and
the domain's state is set to DELETED.
This test now asserts only the commands that are actually issued
during the deletion process.
"""
# Put the domain in client hold
# Put the domain in client hold.
self.domain_with_contacts.place_client_hold()
# Delete it
# Invalidate the cache so that deletion fetches fresh data.
self.domain_with_contacts._invalidate_cache()
# We'll use a mutable counter to simulate different responses if needed.
info_domain_call_count = [0]
# TODO: This is a hack, we should refactor the MockEPPLib to be more flexible
def side_effect(request, cleaned=True):
# For an InfoDomain command for "freeman.gov", simulate behavior:
if isinstance(request, commands.InfoDomain) and request.name.lower() == "freeman.gov":
info_domain_call_count[0] += 1
fake_info = copy.deepcopy(self.InfoDomainWithContacts)
# If this branch ever gets hit, you could vary response based on call count.
# But note: in our current deletion flow, InfoDomain may not be called.
if info_domain_call_count[0] == 1:
fake_info.hosts = ["fake.host.com"]
else:
fake_info.hosts = []
return MagicMock(res_data=[fake_info])
return self.mockedSendFunction(request, cleaned=cleaned)
with patch("registrar.models.domain.registry.send", side_effect=side_effect):
self.domain_with_contacts.deletedInEpp()
self.domain_with_contacts.save()
# Check that the host and contacts are deleted
# Now assert the expected calls that we know occur.
# Note: we no longer assert a call to InfoDomain.
self.mockedSendFunction.assert_has_calls(
[
call(
@ -2782,14 +2778,10 @@ class TestAnalystDelete(MockEppLib):
),
cleaned=True,
),
]
],
)
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoDomain(name="freeman.gov", auth_info=None),
cleaned=True,
),
call(
commands.InfoHost(name="fake.host.com"),
cleaned=True,
@ -2806,7 +2798,8 @@ class TestAnalystDelete(MockEppLib):
),
cleaned=True,
),
]
],
any_order=True,
)
self.mockedSendFunction.assert_has_calls(
[
@ -2857,12 +2850,55 @@ class TestAnalystDelete(MockEppLib):
),
],
)
# Domain itself should not be deleted
self.assertNotEqual(self.domain_with_contacts, None)
# State should have changed
self.assertIsNotNone(self.domain_with_contacts)
self.assertEqual(self.domain_with_contacts.state, Domain.State.DELETED)
@less_console_noise_decorator
def test_analyst_deletes_domain_with_ds_data(self):
"""
Scenario: Domain with DS data is deleted
When `domain.deletedInEpp()` is called
Then `commands.DeleteDomain` is sent to the registry
And `state` is set to `DELETED`
"""
# Create a domain with DS data
domain, _ = Domain.objects.get_or_create(name="dsdomain.gov", state=Domain.State.READY)
# set domain to be on hold
domain.place_client_hold()
domain.dnssecdata = extensions.DNSSECExtension(
dsData=[extensions.DSData(keyTag=1, alg=1, digestType=1, digest="1234567890")],
)
domain.save()
# Mock the InfoDomain command data to return a domain with no hosts
# This is needed to simulate the domain being able to be deleted
self.mockDataInfoDomain.hosts = []
# Delete the domain
domain.deletedInEpp()
domain.save()
# Check that dsdata is None
self.assertEqual(domain.dnssecdata, None)
# Check that the UpdateDomain command was sent to the registry with the correct extension
self.mockedSendFunction.assert_has_calls(
[
call(
commands.UpdateDomain(
name="dsdomain.gov", add=[], rem=[], nsset=None, keyset=None, registrant=None, auth_info=None
),
cleaned=True,
),
]
)
# Check that the domain was deleted
self.assertEqual(domain.state, Domain.State.DELETED)
# reset to avoid test pollution
self.mockDataInfoDomain.hosts = ["fake.host.com"]
@less_console_noise_decorator
def test_deletion_ready_fsm_failure(self):
"""