mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-08-14 13:34:10 +02:00
Logic to remove orphaned contacts
This commit is contained in:
parent
11b48eb030
commit
d1daefd42c
1 changed files with 68 additions and 1 deletions
|
@ -1,5 +1,5 @@
|
||||||
import logging
|
import logging
|
||||||
|
from collections import defaultdict
|
||||||
from django.http import Http404, HttpResponse, HttpResponseRedirect
|
from django.http import Http404, HttpResponse, HttpResponseRedirect
|
||||||
from django.shortcuts import redirect, render
|
from django.shortcuts import redirect, render
|
||||||
from django.urls import resolve, reverse
|
from django.urls import resolve, reverse
|
||||||
|
@ -10,6 +10,7 @@ from django.contrib import messages
|
||||||
|
|
||||||
from registrar.forms import application_wizard as forms
|
from registrar.forms import application_wizard as forms
|
||||||
from registrar.models import DomainApplication
|
from registrar.models import DomainApplication
|
||||||
|
from registrar.models.contact import Contact
|
||||||
from registrar.models.user import User
|
from registrar.models.user import User
|
||||||
from registrar.utility import StrEnum
|
from registrar.utility import StrEnum
|
||||||
from registrar.views.utility import StepsHelper
|
from registrar.views.utility import StepsHelper
|
||||||
|
@ -641,3 +642,69 @@ class DomainApplicationDeleteView(DomainApplicationPermissionDeleteView):
|
||||||
def get_success_url(self):
|
def get_success_url(self):
|
||||||
"""After a delete is successful, redirect to home"""
|
"""After a delete is successful, redirect to home"""
|
||||||
return reverse("home")
|
return reverse("home")
|
||||||
|
|
||||||
|
def post(self, request, *args, **kwargs):
|
||||||
|
# Grab all orphaned contacts
|
||||||
|
application: DomainApplication = self.get_object()
|
||||||
|
contacts_to_delete, duplicates = self._get_orphaned_contacts(application)
|
||||||
|
|
||||||
|
# Delete the DomainApplication
|
||||||
|
response = super().post(request, *args, **kwargs)
|
||||||
|
|
||||||
|
x = Contact.objects.filter(id__in=contacts_to_delete, user=None)
|
||||||
|
print(f"These contacts will be deleted: {x}")
|
||||||
|
|
||||||
|
# Delete orphaned contacts - but only for if they are not associated with a user
|
||||||
|
Contact.objects.filter(id__in=contacts_to_delete, user=None).delete()
|
||||||
|
|
||||||
|
# After a delete occurs, do a second sweep on any returned duplicates.
|
||||||
|
# This determines if any of these three fields share a contact, which is used for
|
||||||
|
# the edge case where the same user may be an AO, and a submitter, for example.
|
||||||
|
if len(duplicates) > 0:
|
||||||
|
duplicates_to_delete, _ = self._get_orphaned_contacts(application)
|
||||||
|
a = Contact.objects.filter(id__in=duplicates_to_delete, user=None)
|
||||||
|
print(f"These other contacts will be deleted: {a}")
|
||||||
|
Contact.objects.filter(id__in=duplicates_to_delete, user=None).delete()
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def _get_orphaned_contacts(self, application: DomainApplication, check_db=True):
|
||||||
|
"""Collects all orphaned contacts"""
|
||||||
|
contacts_to_delete = []
|
||||||
|
|
||||||
|
# Get each contact object on the DomainApplication object
|
||||||
|
ao = application.authorizing_official
|
||||||
|
submitter = application.submitter
|
||||||
|
other_contacts = list(application.other_contacts.all())
|
||||||
|
other_contact_ids = application.other_contacts.all().values_list("id", flat=True)
|
||||||
|
|
||||||
|
# Check if the desired item still exists in the DB
|
||||||
|
if check_db:
|
||||||
|
ao = self._get_contacts_by_id([ao.id]).first()
|
||||||
|
submitter = self._get_contacts_by_id([submitter.id]).first()
|
||||||
|
other_contacts = self._get_contacts_by_id(other_contact_ids)
|
||||||
|
|
||||||
|
# Pair each contact with its related name
|
||||||
|
checked_contacts = [(ao, "authorizing_official"), (submitter, "submitted_applications")]
|
||||||
|
checked_contacts.extend((contact, "contact_applications") for contact in other_contacts)
|
||||||
|
|
||||||
|
for contact, related_name in checked_contacts:
|
||||||
|
if contact is not None and not contact.has_more_than_one_join(related_name):
|
||||||
|
contacts_to_delete.append(contact.id)
|
||||||
|
|
||||||
|
return (contacts_to_delete, self._get_duplicates(checked_contacts))
|
||||||
|
|
||||||
|
def _get_contacts_by_id(self, contact_ids):
|
||||||
|
"""Given a list of ids, grab contacts if it exists"""
|
||||||
|
contacts = Contact.objects.filter(id__in=contact_ids)
|
||||||
|
return contacts
|
||||||
|
|
||||||
|
def _get_duplicates(self, objects):
|
||||||
|
"""Given a list of objects, return a list of which items were duplicates"""
|
||||||
|
# Gets the occurence count
|
||||||
|
object_dict = defaultdict(int)
|
||||||
|
for contact, _ in objects:
|
||||||
|
object_dict[contact] += 1
|
||||||
|
|
||||||
|
duplicates = [item for item, count in object_dict.items() if count > 1]
|
||||||
|
return duplicates
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue