consolidate delete domain function

This commit is contained in:
matthewswspence 2024-12-11 10:11:05 -06:00
parent e87c4f78f1
commit 1a9b671758
No known key found for this signature in database
GPG key ID: FB458202A7852BA4
4 changed files with 143 additions and 161 deletions

View file

@ -62,7 +62,7 @@ class RegistryError(Exception):
- 2501 - 2502 Something malicious or abusive may have occurred - 2501 - 2502 Something malicious or abusive may have occurred
""" """
def __init__(self, *args, code=None, note=None,**kwargs): def __init__(self, *args, code=None, note="",**kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.code = code self.code = code
# note is a string that can be used to provide additional context # note is a string that can be used to provide additional context

View file

@ -269,10 +269,7 @@ class Domain(TimeStampedModel, DomainHelper):
{ PublicContact.ContactTypeChoices.REGISTRANT: "jd1234", { PublicContact.ContactTypeChoices.REGISTRANT: "jd1234",
PublicContact.ContactTypeChoices.ADMINISTRATIVE: "sh8013",...} PublicContact.ContactTypeChoices.ADMINISTRATIVE: "sh8013",...}
""" """
if self._cache.get("contacts"): raise NotImplementedError()
return self._cache.get("contacts")
else:
return self._get_property("contacts")
@Cache @Cache
def creation_date(self) -> date: def creation_date(self) -> date:
@ -1037,60 +1034,50 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error(f"registry error removing client hold: {err}") logger.error(f"registry error removing client hold: {err}")
raise (err) raise (err)
def _delete_nonregistrant_contacts(self):
"""Non-registrant contacts associated with this domain will be deleted.
RegistryErrors will be logged and raised. Error
handling should be provided by the caller.
"""
logger.info("Deleting contacts for %s", self.name)
contacts = self.registry_contacts
logger.debug("Contacts to delete for %s inside _delete_contacts -> %s", self.name, contacts)
if contacts:
for contact, id in contacts.items():
# registrants have to be deleted after the domain
if contact != PublicContact.ContactTypeChoices.REGISTRANT:
self._delete_contact(contact, id)
def _delete_subdomains(self):
"""Subdomains of this domain should be deleted from the registry.
Subdomains which are used by other domains (eg as a hostname) will
not be deleted.
raises:
RegistryError: if any subdomain cannot be deleted
"""
logger.info("Deleting nameservers for %s", self.name)
# check if any nameservers are in use by another domain
hosts = Host.objects.filter(name__regex=r".+{}".format(self.name))
for host in hosts:
if host.domain != self:
logger.error("Host %s in use by another domain: %s", host.name, host.domain)
raise RegistryError(
code=ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION,
note=host.domain,
)
nameservers = [n[0] for n in self.nameservers]
hostsToDelete, _ = self.createDeleteHostList(nameservers)
logger.debug("HostsToDelete from %s inside _delete_subdomains -> %s", self.name, hostsToDelete)
self.addAndRemoveHostsFromDomain(None, hostsToDelete=nameservers)
# for objSet in hostsToDelete:
# self._delete_hosts_if_not_used(objSet.hosts)
def _delete_domain(self): def _delete_domain(self):
"""This domain should be deleted from the registry """This domain should be deleted from the registry
may raises RegistryError, should be caught or handled correctly by caller""" may raises RegistryError, should be caught or handled correctly by caller"""
logger.info("Deleting subdomains for %s", self.name)
# check if any subdomains are in use by another domain
hosts = Host.objects.filter(name__regex=r".+{}".format(self.name))
logger.debug("Checking if any subdomains are in use by another domain")
for host in hosts:
if host.domain != self:
logger.error("Unable to delete host: %s is in use by another domain: %s", host.name, host.domain)
raise RegistryError(
code=ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION,
note=f"Host {host.name} is in use by {host.domain}",
)
logger.debug("No subdomains are in use by another domain")
nameservers = [host.name for host in hosts]
hosts = self.createDeleteHostList(hostsToDelete=nameservers)
response_code = self.addAndRemoveHostsFromDomain(hostsToAdd=None, hostsToDelete=hosts)
if response_code != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
raise RegistryError(code=response_code)
logger.debug("Deleting subordinate hosts for %s", self.name)
self._delete_hosts_if_not_used(nameservers)
logger.debug("Deleting non-registrant contacts for %s", self.name)
contacts = PublicContact.objects.filter(domain=self)
for contact in contacts:
if contact.contact_type != PublicContact.ContactTypeChoices.REGISTRANT:
logger.debug("removing contact %s from domain %s", contact.registry_id, self.name)
self._update_domain_with_contact(contact, rem=True)
logger.debug("deleting contact %s from registry", contact.registry_id)
request = commands.DeleteContact(contact.registry_id)
registry.send(request, cleaned=True)
logger.info("Deleting domain %s", self.name)
request = commands.DeleteDomain(name=self.name) request = commands.DeleteDomain(name=self.name)
registry.send(request, cleaned=True) registry.send(request, cleaned=True)
def _delete_domain_registrant(self): logger.debug("Deleting registrant contact for %s", self.name)
"""This domain's registrant should be deleted from the registry registrant_id = self.registrant_contact.registry_id
may raises RegistryError, should be caught or handled correctly by caller""" deleteRegistrant = commands.DeleteContact(id=registrant_id)
if self.registrant_contact: registry.send(deleteRegistrant, cleaned=True)
registrantID = self.registrant_contact.registry_id
request = commands.DeleteContact(id=registrantID)
registry.send(request, cleaned=True)
def __str__(self) -> str: def __str__(self) -> str:
return self.name return self.name
@ -1487,7 +1474,7 @@ class Domain(TimeStampedModel, DomainHelper):
self._remove_client_hold() self._remove_client_hold()
# TODO -on the client hold ticket any additional error handling here # TODO -on the client hold ticket any additional error handling here
@transition(field="state", source=[State.ON_HOLD, State.DNS_NEEDED, State.UNKNOWN], target=State.DELETED) @transition(field="state", source=[State.ON_HOLD, State.DNS_NEEDED], target=State.DELETED)
def deletedInEpp(self): def deletedInEpp(self):
"""Domain is deleted in epp but is saved in our database. """Domain is deleted in epp but is saved in our database.
Subdomains will be deleted first if not in use by another domain. Subdomains will be deleted first if not in use by another domain.
@ -1499,14 +1486,11 @@ class Domain(TimeStampedModel, DomainHelper):
# as doing everything here would reduce reliablity. # as doing everything here would reduce reliablity.
try: try:
logger.info("deletedInEpp()-> inside _delete_domain") logger.info("deletedInEpp()-> inside _delete_domain")
self._delete_subdomains()
self._delete_nonregistrant_contacts()
self._delete_domain() self._delete_domain()
self._delete_domain_registrant()
self.deleted = timezone.now() self.deleted = timezone.now()
self.expiration_date = None self.expiration_date = None
except RegistryError as err: except RegistryError as err:
logger.error(f"Could not delete domain. Registry returned error: {err}. Additional context: {err.note}") logger.error(f"Could not delete domain. Registry returned error: {err}. {err.note}")
raise err raise err
except TransitionNotAllowed as err: except TransitionNotAllowed as err:
logger.error("Could not delete domain. FSM failure: {err}") logger.error("Could not delete domain. FSM failure: {err}")
@ -1705,24 +1689,6 @@ class Domain(TimeStampedModel, DomainHelper):
raise e raise e
def _delete_contact(self, contact_name: str, registry_id: str):
"""Try to delete a contact from the registry.
raises:
RegistryError: if the registry is unable to delete the contact
"""
logger.info("_delete_contact() -> Attempting to delete contact for %s from domain %s", contact_name, self.name)
try:
req = commands.DeleteContact(id=registry_id)
return registry.send(req, cleaned=True).res_data[0]
except RegistryError as error:
logger.error(
"Registry threw error when trying to delete contact %s, error: %s", # noqa
contact_name,
error,
)
raise error
def is_ipv6(self, ip: str): def is_ipv6(self, ip: str):
ip_addr = ipaddress.ip_address(ip) ip_addr = ipaddress.ip_address(ip)
return ip_addr.version == 6 return ip_addr.version == 6

View file

@ -228,7 +228,7 @@ class TestDomainAdminAsStaff(MockEppLib):
""" """
Scenario: Domain deletion is unsuccessful Scenario: Domain deletion is unsuccessful
When the domain is deleted and has shared subdomains When the domain is deleted and has shared subdomains
Then a user-friendly error message is returned for displaying on the web Then a user-friendly success message is returned for displaying on the web
And `state` is not set to `DELETED` And `state` is not set to `DELETED`
""" """
domain, _ = Domain.objects.get_or_create(name="sharingiscaring.gov", state=Domain.State.ON_HOLD) domain, _ = Domain.objects.get_or_create(name="sharingiscaring.gov", state=Domain.State.ON_HOLD)

View file

@ -9,6 +9,7 @@ from django.db.utils import IntegrityError
from unittest.mock import MagicMock, patch, call from unittest.mock import MagicMock, patch, call
import datetime import datetime
from django.utils.timezone import make_aware from django.utils.timezone import make_aware
from api.tests.common import less_console_noise_decorator
from registrar.models import Domain, Host, HostIP from registrar.models import Domain, Host, HostIP
from unittest import skip from unittest import skip
@ -2592,6 +2593,7 @@ class TestAnalystDelete(MockEppLib):
Domain.objects.all().delete() Domain.objects.all().delete()
super().tearDown() super().tearDown()
@less_console_noise_decorator
def test_analyst_deletes_domain(self): def test_analyst_deletes_domain(self):
""" """
Scenario: Analyst permanently deletes a domain Scenario: Analyst permanently deletes a domain
@ -2601,29 +2603,29 @@ class TestAnalystDelete(MockEppLib):
The deleted date is set. The deleted date is set.
""" """
with less_console_noise(): # Put the domain in client hold
# Put the domain in client hold self.domain.place_client_hold()
self.domain.place_client_hold() # Delete it...
# Delete it... self.domain.deletedInEpp()
self.domain.deletedInEpp() self.domain.save()
self.domain.save() self.mockedSendFunction.assert_has_calls(
self.mockedSendFunction.assert_has_calls( [
[ call(
call( commands.DeleteDomain(name="fake.gov"),
commands.DeleteDomain(name="fake.gov"), cleaned=True,
cleaned=True, )
) ]
] )
) # Domain itself should not be deleted
# Domain itself should not be deleted self.assertNotEqual(self.domain, None)
self.assertNotEqual(self.domain, None) # Domain should have the right state
# Domain should have the right state self.assertEqual(self.domain.state, Domain.State.DELETED)
self.assertEqual(self.domain.state, Domain.State.DELETED) # Domain should have a deleted
# Domain should have a deleted self.assertNotEqual(self.domain.deleted, None)
self.assertNotEqual(self.domain.deleted, None) # Cache should be invalidated
# Cache should be invalidated self.assertEqual(self.domain._cache, {})
self.assertEqual(self.domain._cache, {})
# @less_console_noise_decorator
def test_deletion_is_unsuccessful(self): def test_deletion_is_unsuccessful(self):
""" """
Scenario: Domain deletion is unsuccessful Scenario: Domain deletion is unsuccessful
@ -2631,23 +2633,23 @@ class TestAnalystDelete(MockEppLib):
Then a client error is returned of code 2305 Then a client error is returned of code 2305
And `state` is not set to `DELETED` And `state` is not set to `DELETED`
""" """
with less_console_noise(): # Desired domain
# Desired domain domain, _ = Domain.objects.get_or_create(name="sharingiscaring.gov", state=Domain.State.ON_HOLD)
domain, _ = Domain.objects.get_or_create(name="sharingiscaring.gov", state=Domain.State.ON_HOLD) # Put the domain in client hold
# Put the domain in client hold domain.place_client_hold()
domain.place_client_hold() # Delete it
# Delete it with self.assertRaises(RegistryError) as err:
with self.assertRaises(RegistryError) as err: domain.deletedInEpp()
domain.deletedInEpp() domain.save()
domain.save()
self.assertTrue(err.is_client_error() and err.code == ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION) self.assertTrue(err.is_client_error() and err.code == ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION)
self.assertEqual(err.msg, "Host in use by another domain: fake-on-hold.gov") self.assertEqual(err.msg, "Host in use by another domain: fake-on-hold.gov")
# Domain itself should not be deleted # Domain itself should not be deleted
self.assertNotEqual(domain, None) self.assertNotEqual(domain, None)
# State should not have changed # State should not have changed
self.assertEqual(domain.state, Domain.State.ON_HOLD) self.assertEqual(domain.state, Domain.State.ON_HOLD)
# @less_console_noise_decorator
def test_deletion_with_host_and_contacts(self): def test_deletion_with_host_and_contacts(self):
""" """
Scenario: Domain with related Host and Contacts is Deleted Scenario: Domain with related Host and Contacts is Deleted
@ -2658,44 +2660,59 @@ class TestAnalystDelete(MockEppLib):
Then `commands.DeleteContact` is sent to the registry for the registrant contact Then `commands.DeleteContact` is sent to the registry for the registrant contact
And `state` is set to `DELETED` And `state` is set to `DELETED`
""" """
with less_console_noise(): # Desired domain
# Desired domain domain, _ = Domain.objects.get_or_create(name="freeman.gov", state=Domain.State.ON_HOLD)
domain, _ = Domain.objects.get_or_create(name="freeman.gov", state=Domain.State.ON_HOLD) # Put the domain in client hold
# Put the domain in client hold domain.place_client_hold()
domain.place_client_hold() # Delete it
# Delete it domain.deletedInEpp()
domain.deletedInEpp() domain.save()
domain.save()
# Check that the host and contacts are deleted, order doesn't matter # Check that the host and contacts are deleted, order doesn't matter
self.mockedSendFunction.assert_has_calls( self.mockedSendFunction.assert_has_calls(
[ [
call(commands.DeleteHost(name="fake.host.com"), cleaned=True), call(
call(commands.DeleteContact(id="securityContact"), cleaned=True), commands.UpdateDomain(
call(commands.DeleteContact(id="technicalContact"), cleaned=True), name="freeman.gov",
call( add=[
commands.DeleteContact(id="adminContact"), common.Status(
cleaned=True, state=Domain.Status.CLIENT_HOLD,
description="",
lang="en",
)
],
rem=[],
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
), ),
], cleaned=True,
any_order=True, ),
) call(commands.DeleteHost(name="fake.host.com"), cleaned=True),
actual_calls = self.mockedSendFunction.call_args_list call(commands.DeleteContact(id="securityContact"), cleaned=True),
print("actual_calls", actual_calls) call(commands.DeleteContact(id="technicalContact"), cleaned=True),
call(commands.DeleteContact(id="adminContact"), cleaned=True),
],
any_order=True,
)
actual_calls = self.mockedSendFunction.call_args_list
print("actual_calls", actual_calls)
# These calls need to be in order # These calls need to be in order
self.mockedSendFunction.assert_has_calls( self.mockedSendFunction.assert_has_calls(
[ [
call(commands.DeleteDomain(name="freeman.gov"), cleaned=True), call(commands.DeleteDomain(name="freeman.gov"), cleaned=True),
call(commands.InfoContact(id="regContact"), cleaned=True), call(commands.InfoContact(id="regContact"), cleaned=True),
call(commands.DeleteContact(id="regContact"), cleaned=True), call(commands.DeleteContact(id="regContact"), cleaned=True),
], ],
) )
# Domain itself should not be deleted # Domain itself should not be deleted
self.assertNotEqual(domain, None) self.assertNotEqual(domain, None)
# State should have changed # State should have changed
self.assertEqual(domain.state, Domain.State.DELETED) self.assertEqual(domain.state, Domain.State.DELETED)
# @less_console_noise_decorator
def test_deletion_ready_fsm_failure(self): def test_deletion_ready_fsm_failure(self):
""" """
Scenario: Domain deletion is unsuccessful due to FSM rules Scenario: Domain deletion is unsuccessful due to FSM rules
@ -2707,15 +2724,14 @@ class TestAnalystDelete(MockEppLib):
The deleted date is still null. The deleted date is still null.
""" """
with less_console_noise(): self.assertEqual(self.domain.state, Domain.State.READY)
self.assertEqual(self.domain.state, Domain.State.READY) with self.assertRaises(TransitionNotAllowed) as err:
with self.assertRaises(TransitionNotAllowed) as err: self.domain.deletedInEpp()
self.domain.deletedInEpp() self.domain.save()
self.domain.save() self.assertTrue(err.is_client_error() and err.code == ErrorCode.OBJECT_STATUS_PROHIBITS_OPERATION)
self.assertTrue(err.is_client_error() and err.code == ErrorCode.OBJECT_STATUS_PROHIBITS_OPERATION) # Domain should not be deleted
# Domain should not be deleted self.assertNotEqual(self.domain, None)
self.assertNotEqual(self.domain, None) # Domain should have the right state
# Domain should have the right state self.assertEqual(self.domain.state, Domain.State.READY)
self.assertEqual(self.domain.state, Domain.State.READY) # deleted should be null
# deleted should be null self.assertEqual(self.domain.deleted, None)
self.assertEqual(self.domain.deleted, None)