mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-08-01 07:26:34 +02:00
Slight perf improvement for fixtures
This commit is contained in:
parent
8b56d97712
commit
f8f0d7bced
4 changed files with 100 additions and 6 deletions
|
@ -1,7 +1,7 @@
|
||||||
import logging
|
import logging
|
||||||
import random
|
import random
|
||||||
from faker import Faker
|
from faker import Faker
|
||||||
|
from django.db import transaction
|
||||||
from registrar.models import (
|
from registrar.models import (
|
||||||
User,
|
User,
|
||||||
DomainApplication,
|
DomainApplication,
|
||||||
|
@ -184,6 +184,14 @@ class DomainApplicationFixture:
|
||||||
logger.warning(e)
|
logger.warning(e)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Lumped under .atomic to ensure we don't make redundant DB calls.
|
||||||
|
# This bundles them all together, and then saves it in a single call.
|
||||||
|
with transaction.atomic():
|
||||||
|
cls._create_applications(users)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _create_applications(cls, users):
|
||||||
|
"""Creates DomainApplications given a list of users"""
|
||||||
for user in users:
|
for user in users:
|
||||||
logger.debug("Loading domain applications for %s" % user)
|
logger.debug("Loading domain applications for %s" % user)
|
||||||
for app in cls.DA:
|
for app in cls.DA:
|
||||||
|
@ -211,8 +219,16 @@ class DomainFixture(DomainApplicationFixture):
|
||||||
logger.warning(e)
|
logger.warning(e)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Lumped under .atomic to ensure we don't make redundant DB calls.
|
||||||
|
# This bundles them all together, and then saves it in a single call.
|
||||||
|
with transaction.atomic():
|
||||||
|
# approve each user associated with `in review` status domains
|
||||||
|
DomainFixture._approve_applications(users)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _approve_applications(users):
|
||||||
|
"""Approves all provided applications if they are in the state in_review"""
|
||||||
for user in users:
|
for user in users:
|
||||||
# approve one of each users in review status domains
|
|
||||||
application = DomainApplication.objects.filter(
|
application = DomainApplication.objects.filter(
|
||||||
creator=user, status=DomainApplication.ApplicationStatus.IN_REVIEW
|
creator=user, status=DomainApplication.ApplicationStatus.IN_REVIEW
|
||||||
).last()
|
).last()
|
||||||
|
@ -220,5 +236,13 @@ class DomainFixture(DomainApplicationFixture):
|
||||||
|
|
||||||
# We don't want fixtures sending out real emails to
|
# We don't want fixtures sending out real emails to
|
||||||
# fake email addresses, so we just skip that and log it instead
|
# fake email addresses, so we just skip that and log it instead
|
||||||
|
|
||||||
|
# All approvals require an investigator, so if there is none,
|
||||||
|
# assign one.
|
||||||
|
if application.investigator is None:
|
||||||
|
# All "users" in fixtures have admin perms per prior config.
|
||||||
|
# No need to check for that.
|
||||||
|
application.investigator = random.choice(users)
|
||||||
|
|
||||||
application.approve(send_email=False)
|
application.approve(send_email=False)
|
||||||
application.save()
|
application.save()
|
|
@ -1,5 +1,6 @@
|
||||||
import logging
|
import logging
|
||||||
from faker import Faker
|
from faker import Faker
|
||||||
|
from django.db import transaction
|
||||||
|
|
||||||
from registrar.models import (
|
from registrar.models import (
|
||||||
User,
|
User,
|
||||||
|
@ -186,5 +187,12 @@ class UserFixture:
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def load(cls):
|
def load(cls):
|
||||||
cls.load_users(cls, cls.ADMINS, "full_access_group")
|
# Lumped under .atomic to ensure we don't make redundant DB calls.
|
||||||
cls.load_users(cls, cls.STAFF, "cisa_analysts_group")
|
# This bundles them all together, and then saves it in a single call.
|
||||||
|
# This is slightly different then bulk_create or bulk_update, in that
|
||||||
|
# you still get the same behaviour of .save(), but those incremental
|
||||||
|
# steps now do not need to close/reopen a db connection,
|
||||||
|
# instead they share one.
|
||||||
|
with transaction.atomic():
|
||||||
|
cls.load_users(cls, cls.ADMINS, "full_access_group")
|
||||||
|
cls.load_users(cls, cls.STAFF, "cisa_analysts_group")
|
||||||
|
|
|
@ -8,6 +8,7 @@ from django.db import models
|
||||||
from django_fsm import FSMField, transition # type: ignore
|
from django_fsm import FSMField, transition # type: ignore
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from registrar.models.domain import Domain
|
from registrar.models.domain import Domain
|
||||||
|
from registrar.utility.errors import ApplicationStatusError, FSMErrorCodes
|
||||||
|
|
||||||
from .utility.time_stamped_model import TimeStampedModel
|
from .utility.time_stamped_model import TimeStampedModel
|
||||||
from ..utility.email import send_templated_email, EmailSendingError
|
from ..utility.email import send_templated_email, EmailSendingError
|
||||||
|
@ -736,8 +737,23 @@ class DomainApplication(TimeStampedModel):
|
||||||
|
|
||||||
# create the domain
|
# create the domain
|
||||||
Domain = apps.get_model("registrar.Domain")
|
Domain = apps.get_model("registrar.Domain")
|
||||||
|
|
||||||
|
# == Check that the application is valid == #
|
||||||
if Domain.objects.filter(name=self.requested_domain.name).exists():
|
if Domain.objects.filter(name=self.requested_domain.name).exists():
|
||||||
raise ValueError("Cannot approve. Requested domain is already in use.")
|
raise ApplicationStatusError(code=FSMErrorCodes.APPROVE_DOMAIN_IN_USE)
|
||||||
|
|
||||||
|
# Check if an investigator is assigned. No approval is possible without one.
|
||||||
|
# TODO - add form level error
|
||||||
|
# TODO - maybe add modal if approver is not investigator as superuser
|
||||||
|
if self.investigator is None:
|
||||||
|
raise ApplicationStatusError(code=FSMErrorCodes.APPROVE_NO_INVESTIGATOR)
|
||||||
|
|
||||||
|
# Investigators must be staff users.
|
||||||
|
# This is handled elsewhere, but we should check here as a precaution.
|
||||||
|
if not self.investigator.is_staff:
|
||||||
|
raise ApplicationStatusError(code=FSMErrorCodes.APPROVE_INVESTIGATOR_NOT_STAFF)
|
||||||
|
|
||||||
|
# == Create the domain and related components == #
|
||||||
created_domain = Domain.objects.create(name=self.requested_domain.name)
|
created_domain = Domain.objects.create(name=self.requested_domain.name)
|
||||||
self.approved_domain = created_domain
|
self.approved_domain = created_domain
|
||||||
|
|
||||||
|
@ -751,6 +767,7 @@ class DomainApplication(TimeStampedModel):
|
||||||
user=self.creator, domain=created_domain, role=UserDomainRole.Roles.MANAGER
|
user=self.creator, domain=created_domain, role=UserDomainRole.Roles.MANAGER
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# == Send out an email == #
|
||||||
self._send_status_update_email(
|
self._send_status_update_email(
|
||||||
"application approved",
|
"application approved",
|
||||||
"emails/status_change_approved.txt",
|
"emails/status_change_approved.txt",
|
||||||
|
|
|
@ -70,6 +70,51 @@ class GenericError(Exception):
|
||||||
def get_error_message(self, code=None):
|
def get_error_message(self, code=None):
|
||||||
return self._error_mapping.get(code)
|
return self._error_mapping.get(code)
|
||||||
|
|
||||||
|
# (Q for reviewers) What should this be called?
|
||||||
|
# Not a fan of this name.
|
||||||
|
class FSMErrorCodes(IntEnum):
|
||||||
|
"""Used when doing FSM transitions.
|
||||||
|
Overview of generic error codes:
|
||||||
|
- 1 APPROVE_DOMAIN_IN_USE The domain is already in use
|
||||||
|
- 2 APPROVE_NO_INVESTIGATOR No investigator is assigned when approving
|
||||||
|
- 3 APPROVE_INVESTIGATOR_NOT_STAFF Investigator is a non-staff user
|
||||||
|
"""
|
||||||
|
APPROVE_DOMAIN_IN_USE = 1
|
||||||
|
APPROVE_NO_INVESTIGATOR = 2
|
||||||
|
APPROVE_INVESTIGATOR_NOT_STAFF = 3
|
||||||
|
|
||||||
|
|
||||||
|
class ApplicationStatusError(Exception):
|
||||||
|
"""
|
||||||
|
Used to raise exceptions when doing FSM Transitions.
|
||||||
|
Uses `FSMErrorCodes` as an enum.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_error_mapping = {
|
||||||
|
FSMErrorCodes.APPROVE_DOMAIN_IN_USE: (
|
||||||
|
"Cannot approve. Requested domain is already in use."
|
||||||
|
),
|
||||||
|
FSMErrorCodes.APPROVE_NO_INVESTIGATOR: (
|
||||||
|
"Cannot approve. No investigator was assigned."
|
||||||
|
),
|
||||||
|
FSMErrorCodes.APPROVE_INVESTIGATOR_NOT_STAFF: (
|
||||||
|
"Cannot approve. Investigator is not a staff user."
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, *args, code=None, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.code = code
|
||||||
|
if self.code in self._error_mapping:
|
||||||
|
self.message = self._error_mapping.get(self.code)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f"{self.message}"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_error_message(self, code=None):
|
||||||
|
return self._error_mapping.get(code)
|
||||||
|
|
||||||
|
|
||||||
class NameserverErrorCodes(IntEnum):
|
class NameserverErrorCodes(IntEnum):
|
||||||
"""Used in the NameserverError class for
|
"""Used in the NameserverError class for
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue