This commit is contained in:
zandercymatics 2024-05-02 13:00:02 -06:00
parent a9509ebdce
commit 41089aadfd
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7

View file

@ -830,15 +830,68 @@ class Domain(TimeStampedModel, DomainHelper):
# TODO-error handling better here? # TODO-error handling better here?
def try_set_singleton_contact(self, contact: PublicContact, expected_type: str): # noqa def try_set_singleton_contact(self, contact: PublicContact, expected_type: str): # noqa
"""Runs the _set_singleton_contact function, while catching an IntegrityError """
from the DB and handling it appropriately""" Attempts to set a singleton contact for a domain,
handling potential IntegrityErrors due to duplicate entries.
This function wraps the `_set_singleton_contact` method,
which is intended to assign a unique contact of a specified type to a domain.
If a duplicate is detected, all duplicate entries are deleted before retrying the operation.
Args:
contact (PublicContact): The contact instance to set as a singleton.
expected_type (str): The expected type of the contact (e.g., 'technical', 'administrative').
Raises:
IntegrityError: Re-raises the IntegrityError if an unexpected number of duplicates are found,
or on the rerun of _set_singleton_contact.
"""
try: try:
self._set_singleton_contact(contact=contact, expectedType=expected_type) self._set_singleton_contact(contact=contact, expectedType=expected_type)
except IntegrityError as err: except IntegrityError as err:
# If this error occurs, it indicates that our DB tried to create # If this error occurs, it indicates that our DB tried to create a duplicate PublicContact
# a duplicate PublicContact
logger.error(f"A contact with this registry ID already exists. Error: {err}") logger.error(f"A contact with this registry ID already exists. Error: {err}")
# Delete the duplicates and try again.
duplicates = PublicContact.objects.filter(
registry_id=contact.registry_id,
contact_type=contact.contact_type,
domain=self,
)
# If we find duplicates, log it and delete the newest one.
if duplicates.count() <= 1:
# Something weird happened - we got an integrity error with one or fewer records
# which should not be possible.
logger.error(
"try_set_singleton_contact() -> "
f"An IntegrityError was raised but {duplicates.count()} entries exist"
)
# Raise the error. This is a case in which it is appropriate to do so.
raise err
else:
logger.warning("try_set_singleton_contact() -> Duplicate contacts found. Deleting duplicate.")
self._delete_duplicates(duplicates)
# Try to set the contact again. If we get an error at this point, then
# this indicates a very bad data issue in the DB (duplicate registry_ids on other domains).
# This requires patching the DB and epp with a script due to the epp <---> DB link.
# Trying to fix that here will just result in corrupt EPP records.
self._set_singleton_contact(contact=contact, expectedType=expected_type)
def _delete_duplicates(self, duplicates):
"""Given a list of duplicates, delete all but the oldest one."""
# Q: Should we be deleting the newest or the oldest? Does it even matter?
oldest_duplicate = duplicates.order_by('created_at').first()
# Exclude the oldest entry
duplicates_to_delete = duplicates.exclude(id=oldest_duplicate.id)
# Delete all duplicates
duplicates_to_delete.delete()
def _set_singleton_contact(self, contact: PublicContact, expectedType: str): # noqa def _set_singleton_contact(self, contact: PublicContact, expectedType: str): # noqa
"""Sets the contacts by adding them to the registry as new contacts, """Sets the contacts by adding them to the registry as new contacts,
updates the contact if it is already in epp, updates the contact if it is already in epp,
@ -1969,9 +2022,7 @@ class Domain(TimeStampedModel, DomainHelper):
# If we find duplicates, log it and delete the newest one. # If we find duplicates, log it and delete the newest one.
if db_contact.count() > 1: if db_contact.count() > 1:
logger.warning("_get_or_create_public_contact() -> Duplicate contacts found. Deleting duplicate.") logger.warning("_get_or_create_public_contact() -> Duplicate contacts found. Deleting duplicate.")
# Q: Should we be deleting the newest or the oldest? Does it even matter? self._delete_duplicates(db_contact)
newest_duplicate = db_contact.order_by('-created_at').first()
newest_duplicate.delete()
# Save to DB if it doesn't exist already. # Save to DB if it doesn't exist already.
if db_contact.count() == 0: if db_contact.count() == 0: