add try/catch and update model

This commit is contained in:
zandercymatics 2024-05-02 09:28:33 -06:00
parent e34dfbccdb
commit 059585d3e1
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
3 changed files with 45 additions and 16 deletions

View file

@ -0,0 +1,24 @@
# Generated by Django 4.2.10 on 2024-05-02 15:27
from django.db import migrations, models
import registrar.models.public_contact
class Migration(migrations.Migration):
dependencies = [
("registrar", "0089_user_verification_type"),
]
operations = [
migrations.AlterField(
model_name="publiccontact",
name="registry_id",
field=models.CharField(
default=registrar.models.public_contact.get_id,
help_text="Auto generated ID to track this contact in the registry",
max_length=16,
unique=True,
),
),
]

View file

@ -7,7 +7,7 @@ from typing import Optional
from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore
from django.db import models
from django.db import IntegrityError, models
from django.utils import timezone
from typing import Any
from registrar.models.host import Host
@ -758,7 +758,7 @@ class Domain(TimeStampedModel, DomainHelper):
so follow on additions will update the current registrant"""
logger.info("making registrant contact")
self._set_singleton_contact(contact=contact, expectedType=contact.ContactTypeChoices.REGISTRANT)
self.try_set_singleton_contact(contact=contact, expected_type=contact.ContactTypeChoices.REGISTRANT)
@Cache
def administrative_contact(self) -> PublicContact | None:
@ -769,10 +769,7 @@ class Domain(TimeStampedModel, DomainHelper):
@administrative_contact.setter # type: ignore
def administrative_contact(self, contact: PublicContact):
logger.info("making admin contact")
if contact.contact_type != contact.ContactTypeChoices.ADMINISTRATIVE:
raise ValueError("Cannot set a registrant contact with a different contact type")
self._make_contact_in_registry(contact=contact)
self._update_domain_with_contact(contact, rem=False)
self.try_set_singleton_contact(contact=contact, expected_type=contact.ContactTypeChoices.ADMINISTRATIVE)
def _update_epp_contact(self, contact: PublicContact):
"""Sends UpdateContact to update the actual contact object,
@ -832,6 +829,16 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error("Error changing to new registrant error code is %s, error is %s" % (e.code, e))
# TODO-error handling better here?
def try_set_singleton_contact(self, contact: PublicContact, expected_type: str): # noqa
"""Runs the _set_singleton_contact function, while catching an IntegrityError
from the DB and handling it appropriately"""
try:
self._set_singleton_contact(contact=contact, expectedType=expected_type)
except IntegrityError as err:
# If this error occurs, it indicates that our DB tried to create
# a duplicate PublicContact
logger.error(f"A contact with this registry ID already exists. Error: {err}")
def _set_singleton_contact(self, contact: PublicContact, expectedType: str): # noqa
"""Sets the contacts by adding them to the registry as new contacts,
updates the contact if it is already in epp,
@ -850,12 +857,13 @@ class Domain(TimeStampedModel, DomainHelper):
# get publicContact objects that have the matching
# domain and type but a different id
# like in highlander we there can only be one
hasOtherContact = (
duplicate_contacts = (
PublicContact.objects.exclude(registry_id=contact.registry_id)
.filter(domain=self, contact_type=contact.contact_type)
.exists()
)
# if no record exists with this contact type
# make contact in registry, duplicate and errors handled there
errorCode = self._make_contact_in_registry(contact)
@ -871,14 +879,10 @@ class Domain(TimeStampedModel, DomainHelper):
logger.info("_set_singleton_contact()-> contact has been added to the registry")
# if has conflicting contacts in our db remove them
if hasOtherContact:
if duplicate_contacts.exists():
logger.info("_set_singleton_contact()-> updating domain, removing old contact")
existing_contact = (
PublicContact.objects.exclude(registry_id=contact.registry_id)
.filter(domain=self, contact_type=contact.contact_type)
.get()
)
existing_contact = duplicate_contacts.get()
if isRegistrant:
# send update domain only for registant contacts
@ -925,7 +929,7 @@ class Domain(TimeStampedModel, DomainHelper):
from domain information (not domain request)
and should have the security email from DomainRequest"""
logger.info("making security contact in registry")
self._set_singleton_contact(contact, expectedType=contact.ContactTypeChoices.SECURITY)
self.try_set_singleton_contact(contact, expected_type=contact.ContactTypeChoices.SECURITY)
@Cache
def technical_contact(self) -> PublicContact | None:
@ -936,7 +940,7 @@ class Domain(TimeStampedModel, DomainHelper):
@technical_contact.setter # type: ignore
def technical_contact(self, contact: PublicContact):
logger.info("making technical contact")
self._set_singleton_contact(contact, expectedType=contact.ContactTypeChoices.TECHNICAL)
self.try_set_singleton_contact(contact, expected_type=contact.ContactTypeChoices.TECHNICAL)
def is_active(self) -> bool:
"""Currently just returns if the state is created,

View file

@ -48,6 +48,7 @@ class PublicContact(TimeStampedModel):
help_text="For which type of WHOIS contact",
)
registry_id = models.CharField(
unique=True,
max_length=16,
default=get_id,
null=False,