Fix logic bug / tests

This commit is contained in:
zandercymatics 2023-09-21 10:02:25 -06:00
parent dd57cf2ffd
commit 39d4646369
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
3 changed files with 107 additions and 99 deletions

View file

@ -530,16 +530,6 @@ class Domain(TimeStampedModel, DomainHelper):
"Raising error after removing and adding a new contact"
)
raise (err)
elif alreadyExistsInRegistry:
# If this item already exists in the registry,
# but doesn't have other contacts, we want to
# delete the old value
filtered_contacts = PublicContact.objects.filter(
registry_id=contact.registry_id
)
if(filtered_contacts.count() > 1):
filtered_contacts.order_by('id').first().delete()
# update domain with contact or update the contact itself
if not isEmptySecurity:
@ -547,7 +537,9 @@ class Domain(TimeStampedModel, DomainHelper):
self._update_domain_with_contact(contact=contact, rem=False)
# if already exists just update
elif alreadyExistsInRegistry:
current_contact = filtered_contacts.get()
current_contact = PublicContact.objects.filter(
registry_id=contact.registry_id
).get()
logger.debug(f"current contact was accessed {current_contact}")
if current_contact.email != contact.email:
@ -737,14 +729,21 @@ class Domain(TimeStampedModel, DomainHelper):
sp=addr.sp,
**streets,
)
db_contact = PublicContact.objects.filter(registry_id=contact_id, contact_type=contact_type, domain=self)
# Saves to DB
if(create_object):
create = PublicContact.objects.filter(registry_id=contact_id, contact_type=contact_type, domain=self)
if(create.count() == 0 and contact_type != PublicContact.ContactTypeChoices.REGISTRANT):
desired_contact.save()
return desired_contact
if(create_object and db_contact.count() == 0):
desired_contact.save()
logger.debug(f"Created a new PublicContact: {desired_contact}")
return desired_contact
if(db_contact.count() == 1):
#if(desired_contact != db_contact):
#current = desired_contact
return db_contact.get()
# If it doesn't exist and we don't
# want to create it...
return desired_contact
def _request_contact_info(self, contact: PublicContact):
try:
req = commands.InfoContact(id=contact.registry_id)
@ -797,6 +796,17 @@ class Domain(TimeStampedModel, DomainHelper):
cache_contact_helper(PublicContact.ContactTypeChoices.SECURITY),
or cache_contact_helper("security")
"""
# registrant_contact(s) are an edge case. They exist on
# the "registrant" property as opposed to contacts.
desired_property = "contacts"
if contact_type_choice == PublicContact.ContactTypeChoices.REGISTRANT:
desired_property = "registrant"
# If it exists in our cache, grab that
if(self._cache and desired_property in self._cache):
return self.grab_contact_in_keys(self._cache[desired_property], contact_type_choice)
# If not, check in our DB
items = PublicContact.objects.filter(domain=self, contact_type=contact_type_choice)
if(items.count() > 1):
raise ValueError(f"Multiple contacts exist for {contact_type_choice}")
@ -809,22 +819,13 @@ class Domain(TimeStampedModel, DomainHelper):
# If we have an item in our DB,
# and if contacts hasn't been cleared (meaning data was set)...
if(current_contact is not None):
if("contacts" not in self._cache):
logger.info("Contact was not found in cache but was found in DB")
# TODO - Should we sync with EppLib in this event?
# map_epp_contact_to_public_contact will grab any changes
# made in the setter,
logger.info("Contact was not found in cache but was found in DB")
return current_contact
try:
# registrant_contact(s) are an edge case. They exist on
# the "registrant" property as opposed to contacts.
desired_property = "contacts"
if contact_type_choice == PublicContact.ContactTypeChoices.REGISTRANT:
desired_property = "registrant"
# If it for some reason doesn't exist in our local DB,
# but exists in our cache, grab that
if(self._cache and desired_property in self._cache):
return self.grab_contact_in_keys(self._cache[desired_property], contact_type_choice)
# Finally, if all else fails, grab from the registry
contacts = self._get_property(desired_property)

View file

@ -21,7 +21,7 @@
<button
type="submit"
class="usa-button"
>{% if domain.security_email is None or domain.security_email.email == 'dotgov@cisa.dhs.gov'%}Add security email{% else %}Save{% endif %}</button>
>{% if domain.security_email is None or domain.security_email.email == 'testdotgov@cisa.dhs.gov'%}Add security email{% else %}Save{% endif %}</button>
</form>
{% endblock %} {# domain_content #}

View file

@ -27,12 +27,14 @@ logger = logging.getLogger(__name__)
class TestDomainCache(MockEppLib):
def tearDown(self):
PublicContact.objects.all().delete()
def test_cache_sets_resets(self):
"""Cache should be set on getter and reset on setter calls"""
domain, _ = Domain.objects.get_or_create(name="igorville.gov")
self.maxDiff = None
# trigger getter
_ = domain.creation_date
logger.debug(f"what is the cache here? {domain._cache}")
domain._get_property("contacts")
# getter should set the domain cache with a InfoDomain object
# (see InfoDomainResult)
@ -43,29 +45,14 @@ class TestDomainCache(MockEppLib):
# using a setter should clear the cache
domain.expiration_date = datetime.date.today()
self.assertEquals(domain._cache, {})
expectedCreateContact = self._convertPublicContactToEpp(domain.security_contact, False, createContact=True)
expectedCreateContact = self._convertPublicContactToEpp(domain.security_contact, True, createContact=True)
# send should have been called only once
self.mockedSendFunction.assert_has_calls(
[
call(commands.InfoDomain(name='igorville.gov', auth_info=None), cleaned=True),
call(commands.InfoContact(id='123', auth_info=None), cleaned=True),
call(expectedCreateContact),
call(commands.UpdateDomain(
name='igorville.gov',
add=[
common.DomainContact(
contact='123',
type=PublicContact.ContactTypeChoices.SECURITY
)
],
rem=[],
nsset=None,
keyset=None,
registrant=None,
auth_info=None
),
cleaned=True
),
call(expectedCreateContact, cleaned=True),
call(commands.UpdateDomain(name='igorville.gov', add=[common.DomainContact(contact='123', type=PublicContact.ContactTypeChoices.SECURITY)], rem=[], nsset=None, keyset=None, registrant=None, auth_info=None), cleaned=True),
call(commands.InfoHost(name='fake.host.com'), cleaned=True)
]
)
@ -83,7 +70,8 @@ class TestDomainCache(MockEppLib):
# value should still be set correctly
self.assertEqual(cr_date, self.mockDataInfoDomain.cr_date)
self.assertEqual(domain._cache["cr_date"], self.mockDataInfoDomain.cr_date)
d = domain._cache["contacts"]
logger.debug(f"????? questions {d}")
# send was only called once & not on the second getter call
expectedCalls = [
call(
@ -100,8 +88,6 @@ class TestDomainCache(MockEppLib):
def test_cache_nested_elements(self):
"""Cache works correctly with the nested objects cache and hosts"""
domain, _ = Domain.objects.get_or_create(name="igorville.gov")
self.maxDiff = None
# The contact list will initally contain objects of type 'DomainContact'
# this is then transformed into PublicContact, and cache should NOT
# hold onto the DomainContact object
@ -128,20 +114,11 @@ class TestDomainCache(MockEppLib):
# The contact list should not contain what is sent by the registry by default,
# as _fetch_cache will transform the type to PublicContact
self.assertNotEqual(domain._cache["contacts"], expectedUnfurledContactsList)
# Assert that what we get from cache is inline with our mock
# Since our cache creates new items inside of our contact list,
# as we need to map DomainContact -> PublicContact, our mocked items
# will point towards a different location in memory (as they are different objects).
# This should be a problem only exclusive to our mocks, since we are not
# replicating the same item twice outside this context. That said, we want to check
# for data integrity, but do not care if they are of the same _state or not
for cached_contact, expected_contact in zip(
domain._cache["contacts"], expectedContactsList
):
self.assertEqual(
{k: v for k, v in vars(cached_contact).items() if k != "_state"},
{k: v for k, v in vars(expected_contact).items() if k != "_state"},
)
self.assertEqual(
domain._cache["contacts"],
expectedContactsList
)
# get and check hosts is set correctly
domain._get_property("hosts")
@ -149,10 +126,60 @@ class TestDomainCache(MockEppLib):
# Clear the cache
domain._invalidate_cache()
@skip("Not implemented yet")
def test_map_epp_contact_to_public_contact(self):
self.maxDiff = None
# Tests that the mapper is working how we expect
raise
domain, _ = Domain.objects.get_or_create(name="registry.gov")
mapped = domain.map_epp_contact_to_public_contact(
self.mockDataInfoContact,
self.mockDataInfoContact.id,
PublicContact.ContactTypeChoices.SECURITY
)
expected_contact = PublicContact(
id=1,
domain=domain,
contact_type=PublicContact.ContactTypeChoices.SECURITY,
registry_id="123",
email="123@mail.gov",
voice="+1.8882820870",
fax="+1-212-9876543",
pw="lastPw",
name="Registry Customer Service",
org="Cybersecurity and Infrastructure Security Agency",
city="Arlington",
pc="22201",
cc="US",
sp="VA",
street1="4200 Wilson Blvd."
)
# Match when these both were updated/created
expected_contact.updated_at = mapped.updated_at
expected_contact.created_at = mapped.created_at
# Mapped object is what we expect
self.assertEqual(mapped, expected_contact)
in_db = PublicContact.objects.filter(
registry_id=domain.security_contact.registry_id,
contact_type = PublicContact.ContactTypeChoices.SECURITY
).get()
# DB Object is the same as the mapped object
self.assertEqual(mapped, in_db)
mapped_second = domain.map_epp_contact_to_public_contact(
self.mockDataInfoContact,
self.mockDataInfoContact.id,
PublicContact.ContactTypeChoices.SECURITY
)
in_db_once = PublicContact.objects.filter(
registry_id=domain.security_contact.registry_id,
contact_type = PublicContact.ContactTypeChoices.SECURITY
)
self.assertEqual(mapped_second, in_db)
# If mapper is called a second time,
# it just grabs existing data rather than
# a new object
self.assertTrue(in_db_once.count() == 1)
class TestDomainCreation(TestCase):
@ -462,6 +489,8 @@ class TestRegistrantContacts(MockEppLib):
security_contact.email = "originalUserEmail@gmail.com"
security_contact.registry_id = "fail"
security_contact.save()
self.domain.security_contact = security_contact
expectedCreateCommand = self._convertPublicContactToEpp(
security_contact, disclose_email=True
)
@ -476,6 +505,7 @@ class TestRegistrantContacts(MockEppLib):
)
security_contact.email = "changedEmail@email.com"
security_contact.save()
self.domain.security_contact = security_contact
expectedSecondCreateCommand = self._convertPublicContactToEpp(
security_contact, disclose_email=True
)
@ -498,13 +528,6 @@ class TestRegistrantContacts(MockEppLib):
current_item = PublicContact.objects.filter(domain=self.domain).get()
self.assertEqual(current_item.email, "changedEmail@email.com")
# Check if cache stored it correctly...
self.assertTrue("contacts" in self.domain._cache)
cached_item = self.domain._cache["contacts"]
self.assertTrue(cached_item[0] == current_item)
@skip("not implemented yet")
def test_update_is_unsuccessful(self):
"""
@ -528,11 +551,6 @@ class TestRegistrantContacts(MockEppLib):
def test_contact_getter_security(self):
# Create prexisting object...
security = PublicContact.get_default_security()
security.email = "security@mail.gov"
security.domain = self.domain_contact
self.domain_contact.security_contact = security
expected_security_contact = PublicContact.objects.filter(
registry_id=self.domain_contact.security_contact.registry_id,
contact_type = PublicContact.ContactTypeChoices.SECURITY
@ -556,7 +574,7 @@ class TestRegistrantContacts(MockEppLib):
security.email = "security@mail.gov"
security.domain = self.domain_contact
self.domain_contact.security_contact = security
expected_security_contact = PublicContact.objects.filter(
registry_id=self.domain_contact.security_contact.registry_id,
contact_type = PublicContact.ContactTypeChoices.SECURITY
@ -572,12 +590,16 @@ class TestRegistrantContacts(MockEppLib):
),
]
)
# Call getter...
_ = self.domain_contact.security_contact
# Checks if we are recieving the cache we expect...
self.assertEqual(self.domain_contact._cache["contacts"][0], expected_security_contact)
# Setter functions properly...
self.domain_contact.security_contact.email = "converge@mail.com"
expected_security_contact.email = "converge@mail.com"
security.email = "123@mail.com"
security.save()
self.domain_contact.security_contact = security
expected_security_contact.email = "123@mail.com"
self.assertEqual(
self.domain_contact.security_contact.email, expected_security_contact.email
@ -590,11 +612,6 @@ class TestRegistrantContacts(MockEppLib):
raise
def test_contact_getter_technical(self):
contact = PublicContact.get_default_technical()
contact.email = "technical@mail.gov"
contact.domain = self.domain_contact
self.domain_contact.technical_contact = contact
logger.debug(f"here is the reason {self.domain_contact.technical_contact}")
expected_contact = PublicContact.objects.filter(
registry_id=self.domain_contact.technical_contact.registry_id,
contact_type = PublicContact.ContactTypeChoices.TECHNICAL
@ -614,11 +631,6 @@ class TestRegistrantContacts(MockEppLib):
self.assertEqual(self.domain_contact._cache["contacts"][1], expected_contact)
def test_contact_getter_administrative(self):
contact = PublicContact.get_default_administrative()
contact.email = "admin@mail.gov"
contact.domain = self.domain_contact
self.domain_contact.administrative_contact = contact
expected_contact = PublicContact.objects.filter(
registry_id=self.domain_contact.administrative_contact.registry_id,
contact_type = PublicContact.ContactTypeChoices.ADMINISTRATIVE
@ -638,11 +650,6 @@ class TestRegistrantContacts(MockEppLib):
self.assertEqual(self.domain_contact._cache["contacts"][2], expected_contact)
def test_contact_getter_registrant(self):
contact = PublicContact.get_default_registrant()
contact.email = "registrant@mail.gov"
contact.domain = self.domain_contact
self.domain_contact.registrant_contact = contact
expected_contact = PublicContact.objects.filter(
registry_id=self.domain_contact.registrant_contact.registry_id,
contact_type = PublicContact.ContactTypeChoices.REGISTRANT