Merge pull request #2116 from cisagov/za/1987-unknown-from-dns-needed

Ticket #1987: Domain went from dns_needed -> unknown - bug fix
This commit is contained in:
zandercymatics 2024-05-08 13:51:59 -06:00 committed by GitHub
commit c9a735bf6a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 63 additions and 25 deletions

View file

@ -0,0 +1,17 @@
# Generated by Django 4.2.10 on 2024-05-08 17:35
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("registrar", "0092_rename_updated_federal_agency_domaininformation_federal_agency_and_more"),
]
operations = [
migrations.AlterUniqueTogether(
name="publiccontact",
unique_together={("contact_type", "registry_id", "domain")},
),
]

View file

@ -769,10 +769,7 @@ class Domain(TimeStampedModel, DomainHelper):
@administrative_contact.setter # type: ignore @administrative_contact.setter # type: ignore
def administrative_contact(self, contact: PublicContact): def administrative_contact(self, contact: PublicContact):
logger.info("making admin contact") logger.info("making admin contact")
if contact.contact_type != contact.ContactTypeChoices.ADMINISTRATIVE: self._set_singleton_contact(contact=contact, expectedType=contact.ContactTypeChoices.ADMINISTRATIVE)
raise ValueError("Cannot set a registrant contact with a different contact type")
self._make_contact_in_registry(contact=contact)
self._update_domain_with_contact(contact, rem=False)
def _update_epp_contact(self, contact: PublicContact): def _update_epp_contact(self, contact: PublicContact):
"""Sends UpdateContact to update the actual contact object, """Sends UpdateContact to update the actual contact object,
@ -849,11 +846,9 @@ class Domain(TimeStampedModel, DomainHelper):
# get publicContact objects that have the matching # get publicContact objects that have the matching
# domain and type but a different id # domain and type but a different id
# like in highlander we there can only be one # like in highlander where there can only be one
hasOtherContact = ( duplicate_contacts = PublicContact.objects.exclude(registry_id=contact.registry_id).filter(
PublicContact.objects.exclude(registry_id=contact.registry_id) domain=self, contact_type=contact.contact_type
.filter(domain=self, contact_type=contact.contact_type)
.exists()
) )
# if no record exists with this contact type # if no record exists with this contact type
@ -871,14 +866,10 @@ class Domain(TimeStampedModel, DomainHelper):
logger.info("_set_singleton_contact()-> contact has been added to the registry") logger.info("_set_singleton_contact()-> contact has been added to the registry")
# if has conflicting contacts in our db remove them # if has conflicting contacts in our db remove them
if hasOtherContact: if duplicate_contacts.exists():
logger.info("_set_singleton_contact()-> updating domain, removing old contact") logger.info("_set_singleton_contact()-> updating domain, removing old contact")
existing_contact = ( existing_contact = duplicate_contacts.get()
PublicContact.objects.exclude(registry_id=contact.registry_id)
.filter(domain=self, contact_type=contact.contact_type)
.get()
)
if isRegistrant: if isRegistrant:
# send update domain only for registant contacts # send update domain only for registant contacts
@ -1216,24 +1207,28 @@ class Domain(TimeStampedModel, DomainHelper):
def get_default_security_contact(self): def get_default_security_contact(self):
"""Gets the default security contact.""" """Gets the default security contact."""
logger.info("get_default_security_contact() -> Adding default security contact")
contact = PublicContact.get_default_security() contact = PublicContact.get_default_security()
contact.domain = self contact.domain = self
return contact return contact
def get_default_administrative_contact(self): def get_default_administrative_contact(self):
"""Gets the default administrative contact.""" """Gets the default administrative contact."""
logger.info("get_default_security_contact() -> Adding administrative security contact")
contact = PublicContact.get_default_administrative() contact = PublicContact.get_default_administrative()
contact.domain = self contact.domain = self
return contact return contact
def get_default_technical_contact(self): def get_default_technical_contact(self):
"""Gets the default technical contact.""" """Gets the default technical contact."""
logger.info("get_default_security_contact() -> Adding technical security contact")
contact = PublicContact.get_default_technical() contact = PublicContact.get_default_technical()
contact.domain = self contact.domain = self
return contact return contact
def get_default_registrant_contact(self): def get_default_registrant_contact(self):
"""Gets the default registrant contact.""" """Gets the default registrant contact."""
logger.info("get_default_security_contact() -> Adding default registrant contact")
contact = PublicContact.get_default_registrant() contact = PublicContact.get_default_registrant()
contact.domain = self contact.domain = self
return contact return contact
@ -1247,6 +1242,7 @@ class Domain(TimeStampedModel, DomainHelper):
Returns: Returns:
PublicContact | None PublicContact | None
""" """
logger.info(f"get_contact_in_keys() -> Grabbing a {contact_type} contact from cache")
# Registrant doesn't exist as an array, and is of # Registrant doesn't exist as an array, and is of
# a special data type, so we need to handle that. # a special data type, so we need to handle that.
if contact_type == PublicContact.ContactTypeChoices.REGISTRANT: if contact_type == PublicContact.ContactTypeChoices.REGISTRANT:
@ -1309,6 +1305,7 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error(e.code) logger.error(e.code)
raise e raise e
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST and self.state == Domain.State.UNKNOWN: if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST and self.state == Domain.State.UNKNOWN:
logger.info("_get_or_create_domain() -> Switching to dns_needed from unknown")
# avoid infinite loop # avoid infinite loop
already_tried_to_create = True already_tried_to_create = True
self.dns_needed_from_unknown() self.dns_needed_from_unknown()
@ -1319,6 +1316,7 @@ class Domain(TimeStampedModel, DomainHelper):
raise e raise e
def addRegistrant(self): def addRegistrant(self):
"""Adds a default registrant contact"""
registrant = PublicContact.get_default_registrant() registrant = PublicContact.get_default_registrant()
registrant.domain = self registrant.domain = self
registrant.save() # calls the registrant_contact.setter registrant.save() # calls the registrant_contact.setter
@ -1346,6 +1344,8 @@ class Domain(TimeStampedModel, DomainHelper):
self.addAllDefaults() self.addAllDefaults()
def addAllDefaults(self): def addAllDefaults(self):
"""Adds default security, technical, and administrative contacts"""
logger.info("addAllDefaults() -> Adding default security, technical, and administrative contacts")
security_contact = self.get_default_security_contact() security_contact = self.get_default_security_contact()
security_contact.save() security_contact.save()
@ -1360,7 +1360,7 @@ class Domain(TimeStampedModel, DomainHelper):
"""place a clienthold on a domain (no longer should resolve) """place a clienthold on a domain (no longer should resolve)
ignoreEPP (boolean) - set to true to by-pass EPP (used for transition domains) ignoreEPP (boolean) - set to true to by-pass EPP (used for transition domains)
""" """
# TODO - ensure all requirements for client hold are made here
# (check prohibited statuses) # (check prohibited statuses)
logger.info("clientHold()-> inside clientHold") logger.info("clientHold()-> inside clientHold")
@ -1368,7 +1368,6 @@ class Domain(TimeStampedModel, DomainHelper):
# include this ignoreEPP flag # include this ignoreEPP flag
if not ignoreEPP: if not ignoreEPP:
self._place_client_hold() self._place_client_hold()
# TODO -on the client hold ticket any additional error handling here
@transition(field="state", source=[State.READY, State.ON_HOLD], target=State.READY) @transition(field="state", source=[State.READY, State.ON_HOLD], target=State.READY)
def revert_client_hold(self, ignoreEPP=False): def revert_client_hold(self, ignoreEPP=False):
@ -1570,6 +1569,7 @@ class Domain(TimeStampedModel, DomainHelper):
def _get_or_create_contact(self, contact: PublicContact): def _get_or_create_contact(self, contact: PublicContact):
"""Try to fetch info about a contact. Create it if it does not exist.""" """Try to fetch info about a contact. Create it if it does not exist."""
logger.info("_get_or_create_contact() -> Fetching contact info")
try: try:
return self._request_contact_info(contact) return self._request_contact_info(contact)
except RegistryError as e: except RegistryError as e:
@ -1962,10 +1962,23 @@ class Domain(TimeStampedModel, DomainHelper):
domain=self, domain=self,
) )
# Raise an error if we find duplicates. # If we find duplicates, log it and delete the oldest ones.
# This should not occur
if db_contact.count() > 1: if db_contact.count() > 1:
raise Exception(f"Multiple contacts found for {public_contact.contact_type}") logger.warning("_get_or_create_public_contact() -> Duplicate contacts found. Deleting duplicate.")
newest_duplicate = db_contact.order_by("-created_at").first()
duplicates_to_delete = db_contact.exclude(id=newest_duplicate.id) # type: ignore
# Delete all duplicates
duplicates_to_delete.delete()
# Do a second filter to grab the latest content
db_contact = PublicContact.objects.filter(
registry_id=public_contact.registry_id,
contact_type=public_contact.contact_type,
domain=self,
)
# Save to DB if it doesn't exist already. # Save to DB if it doesn't exist already.
if db_contact.count() == 0: if db_contact.count() == 0:
@ -1977,16 +1990,14 @@ class Domain(TimeStampedModel, DomainHelper):
existing_contact = db_contact.get() existing_contact = db_contact.get()
# Does the item we're grabbing match # Does the item we're grabbing match what we have in our DB?
# what we have in our DB?
if existing_contact.email != public_contact.email or existing_contact.registry_id != public_contact.registry_id: if existing_contact.email != public_contact.email or existing_contact.registry_id != public_contact.registry_id:
existing_contact.delete() existing_contact.delete()
public_contact.save() public_contact.save()
logger.warning("Requested PublicContact is out of sync " "with DB.") logger.warning("Requested PublicContact is out of sync " "with DB.")
return public_contact return public_contact
# If it already exists, we can
# assume that the DB instance was updated # If it already exists, we can assume that the DB instance was updated during set, so we should just use that.
# during set, so we should just use that.
return existing_contact return existing_contact
def _registrant_to_public_contact(self, registry_id: str): def _registrant_to_public_contact(self, registry_id: str):

View file

@ -19,6 +19,16 @@ def get_id():
class PublicContact(TimeStampedModel): class PublicContact(TimeStampedModel):
"""Contact information intended to be published in WHOIS.""" """Contact information intended to be published in WHOIS."""
class Meta:
"""Contains meta info about this class"""
# Creates a composite primary key with these fields.
# We can share the same registry id, but only if the contact type is
# different or if the domain is different.
# For instance - we don't desire to have two admin contacts with the same id
# on the same domain.
unique_together = [("contact_type", "registry_id", "domain")]
class ContactTypeChoices(models.TextChoices): class ContactTypeChoices(models.TextChoices):
"""These are the types of contacts accepted by the registry.""" """These are the types of contacts accepted by the registry."""