Better table + simplify get_or_create function

This commit is contained in:
zandercymatics 2025-01-13 15:06:44 -07:00
parent a0a53ea855
commit 828c47de45
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
2 changed files with 30 additions and 41 deletions

View file

@ -3835,6 +3835,9 @@ class PublicContactAdmin(ListHeaderAdmin, ImportExportModelAdmin):
change_form_template = "django/admin/email_clipboard_change_form.html" change_form_template = "django/admin/email_clipboard_change_form.html"
autocomplete_fields = ["domain"] autocomplete_fields = ["domain"]
list_display = ("domain", "email", "name", "contact_type", "id")
search_fields = ["email", "name", "id"]
search_help_text = "Search by email, name or id."
def changeform_view(self, request, object_id=None, form_url="", extra_context=None): def changeform_view(self, request, object_id=None, form_url="", extra_context=None):
if extra_context is None: if extra_context is None:

View file

@ -4,9 +4,9 @@ import ipaddress
import re import re
from datetime import date, timedelta from datetime import date, timedelta
from typing import Optional from typing import Optional
from django.db import transaction
from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore
from django.db import models, IntegrityError
from django.db import models
from django.utils import timezone from django.utils import timezone
from typing import Any from typing import Any
from registrar.models.host import Host from registrar.models.host import Host
@ -2077,49 +2077,35 @@ class Domain(TimeStampedModel, DomainHelper):
def _get_or_create_public_contact(self, public_contact: PublicContact): def _get_or_create_public_contact(self, public_contact: PublicContact):
"""Tries to find a PublicContact object in our DB. """Tries to find a PublicContact object in our DB.
If it can't, it'll create it. Returns PublicContact""" If it can't, it'll create it. Returns PublicContact"""
db_contact = PublicContact.objects.filter( try:
registry_id=public_contact.registry_id, with transaction.atomic():
contact_type=public_contact.contact_type, contact, _ = PublicContact.objects.get_or_create(
domain=self, registry_id=public_contact.registry_id,
) contact_type=public_contact.contact_type,
domain=self,
# If we find duplicates, log it and delete the oldest ones. defaults={
if db_contact.count() > 1: "email": public_contact.email,
logger.warning("_get_or_create_public_contact() -> Duplicate contacts found. Deleting duplicate.") "voice": public_contact.voice,
"fax": public_contact.fax,
newest_duplicate = db_contact.order_by("-created_at").first() "name": public_contact.name,
"org": public_contact.org,
duplicates_to_delete = db_contact.exclude(id=newest_duplicate.id) # type: ignore "pw": public_contact.pw,
"city": public_contact.city,
# Delete all duplicates "pc": public_contact.pc,
duplicates_to_delete.delete() "cc": public_contact.cc,
"sp": public_contact.sp,
# Do a second filter to grab the latest content "street1": public_contact.street1,
db_contact = PublicContact.objects.filter( "street2": public_contact.street2,
"street3": public_contact.street3,
}
)
except IntegrityError:
contact = PublicContact.objects.get(
registry_id=public_contact.registry_id, registry_id=public_contact.registry_id,
contact_type=public_contact.contact_type, contact_type=public_contact.contact_type,
domain=self, domain=self,
) )
return contact
# Save to DB if it doesn't exist already.
if db_contact.count() == 0:
# Doesn't run custom save logic, just saves to DB
public_contact.save(skip_epp_save=True)
logger.info(f"Created a new PublicContact: {public_contact}")
# Append the item we just created
return public_contact
existing_contact = db_contact.get()
# Does the item we're grabbing match what we have in our DB?
if existing_contact.email != public_contact.email or existing_contact.registry_id != public_contact.registry_id:
existing_contact.delete()
public_contact.save()
logger.warning("Requested PublicContact is out of sync " "with DB.")
return public_contact
# If it already exists, we can assume that the DB instance was updated during set, so we should just use that.
return existing_contact
def _registrant_to_public_contact(self, registry_id: str): def _registrant_to_public_contact(self, registry_id: str):
"""EPPLib returns the registrant as a string, """EPPLib returns the registrant as a string,