Add test for race condition

This commit is contained in:
zandercymatics 2025-01-16 15:15:36 -07:00
parent 511e9610e2
commit 1ece895ac6
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
2 changed files with 64 additions and 13 deletions

View file

@ -1939,6 +1939,8 @@ class Domain(TimeStampedModel, DomainHelper):
Additionally, capture and cache old hosts and contacts from cache if they Additionally, capture and cache old hosts and contacts from cache if they
don't exist in cleaned don't exist in cleaned
""" """
# object reference issue between self._cache vs cleaned?
old_cache_hosts = self._cache.get("hosts") old_cache_hosts = self._cache.get("hosts")
old_cache_contacts = self._cache.get("contacts") old_cache_contacts = self._cache.get("contacts")
@ -2111,19 +2113,20 @@ class Domain(TimeStampedModel, DomainHelper):
# Save to DB if it doesn't exist already. # Save to DB if it doesn't exist already.
if db_contact.count() == 0: if db_contact.count() == 0:
# Doesn't run custom save logic, just saves to DB # Doesn't run custom save logic, just saves to DB
try: public_contact.save(skip_epp_save=True)
public_contact.save(skip_epp_save=True) # try:
logger.info(f"Created a new PublicContact: {public_contact}") # public_contact.save(skip_epp_save=True)
except IntegrityError as err: # logger.info(f"Created a new PublicContact: {public_contact}")
logger.error( # except IntegrityError as err:
"_get_or_create_public_contact() => tried to create a duplicate public contact: " # logger.error(
f"{err}", exc_info=True # "_get_or_create_public_contact() => tried to create a duplicate public contact: "
) # f"{err}", exc_info=True
return PublicContact.objects.get( # )
registry_id=public_contact.registry_id, # return PublicContact.objects.get(
contact_type=public_contact.contact_type, # registry_id=public_contact.registry_id,
domain=self, # contact_type=public_contact.contact_type,
) # domain=self,
# )
# Append the item we just created # Append the item we just created
return public_contact return public_contact

View file

@ -348,6 +348,54 @@ class TestDomainCache(MockEppLib):
class TestDomainCreation(MockEppLib): class TestDomainCreation(MockEppLib):
"""Rule: An approved domain request must result in a domain""" """Rule: An approved domain request must result in a domain"""
def test_get_security_email_during_unknown_state_race_condition(self):
"""
Scenario: A domain is accessed for the first time
Given a domain in UNKNOWN state with a security contact in registry
When get_security_email is called during state transition
Then the security contact is fetched from registry
And only one security contact exists in database
And the security email matches the registry contact
And no duplicate contact creation is attempted
"""
domain, _ = Domain.objects.get_or_create(name="defaultsecurity.gov")
# Store original method
original_filter = PublicContact.objects.filter
def mock_filter(*args, **kwargs):
# First call returns empty queryset to simulate contact not existing
result = original_filter(*args, **kwargs)
if kwargs.get('contact_type') == PublicContact.ContactTypeChoices.SECURITY:
# Create the duplicate contact after the check but before the save
duplicate = PublicContact(
domain=domain,
contact_type=PublicContact.ContactTypeChoices.SECURITY,
registry_id="defaultSec",
email="dotgov@cisa.dhs.gov",
name="Registry Customer Service"
)
duplicate.save(skip_epp_save=True)
return result
with patch.object(PublicContact.objects, 'filter', side_effect=mock_filter):
try:
security_email = domain.get_security_email()
except IntegrityError:
self.fail(
"IntegrityError was raised during contact creation due to a race condition. "
"This indicates that concurrent contact creation is not working in some cases. "
"The error occurs when two processes try to create the same contact simultaneously. "
"Expected behavior: gracefully handle duplicate creation and return existing contact."
)
# Verify only one contact exists
security_contacts = PublicContact.objects.filter(
domain=domain,
contact_type=PublicContact.ContactTypeChoices.SECURITY
)
self.assertEqual(security_contacts.count(), 1)
self.assertEqual(security_email, "dotgov@cisa.dhs.gov")
@boto3_mocking.patching @boto3_mocking.patching
def test_approved_domain_request_creates_domain_locally(self): def test_approved_domain_request_creates_domain_locally(self):