mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-07-22 02:36:02 +02:00
Revise fixtures to include portfolios
This commit is contained in:
parent
8adb33b430
commit
4cd97077ec
10 changed files with 799 additions and 287 deletions
|
@ -173,7 +173,7 @@ You can change the logging verbosity, if needed. Do a web search for "django log
|
||||||
|
|
||||||
## Mock data
|
## Mock data
|
||||||
|
|
||||||
[load.py](../../src/registrar/management/commands/load.py) called from docker-compose (locally) and reset-db.yml (upper) loads the fixtures from [fixtures_user.py](../../src/registrar/fixtures_users.py) and [fixtures_domain_requests.py](../../src/registrar/fixtures_domain_requests.py), giving you some test data to play with while developing.
|
[load.py](../../src/registrar/management/commands/load.py) called from docker-compose (locally) and reset-db.yml (upper) loads the fixtures from [fixtures_user.py](../../src/registrar/fixtures_users.py) and [fixtures_data.py](../../src/registrar/fixtures_data.py), giving you some test data to play with while developing.
|
||||||
|
|
||||||
See the [database-access README](./database-access.md) for information on how to pull data to update these fixtures.
|
See the [database-access README](./database-access.md) for information on how to pull data to update these fixtures.
|
||||||
|
|
||||||
|
|
118
src/registrar/fixtures/fixtures_domains.py
Normal file
118
src/registrar/fixtures/fixtures_domains.py
Normal file
|
@ -0,0 +1,118 @@
|
||||||
|
from datetime import timedelta
|
||||||
|
from django.utils import timezone
|
||||||
|
import logging
|
||||||
|
import random
|
||||||
|
from faker import Faker
|
||||||
|
from django.db import transaction
|
||||||
|
|
||||||
|
from registrar.fixtures.fixtures_requests import DomainRequestFixture
|
||||||
|
from registrar.fixtures.fixtures_users import UserFixture
|
||||||
|
from registrar.models import User, DomainRequest
|
||||||
|
from registrar.models.domain import Domain
|
||||||
|
|
||||||
|
fake = Faker()
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class DomainFixture(DomainRequestFixture):
|
||||||
|
"""Create two domains and permissions on them for each user.
|
||||||
|
One domain will have a past expiration date.
|
||||||
|
|
||||||
|
Depends on fixtures_requests.
|
||||||
|
|
||||||
|
Make sure this class' `load` method is called from `handle`
|
||||||
|
in management/commands/load.py, then use `./manage.py load`
|
||||||
|
to run this code.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def load(cls):
|
||||||
|
# Lumped under .atomic to ensure we don't make redundant DB calls.
|
||||||
|
# This bundles them all together, and then saves it in a single call.
|
||||||
|
with transaction.atomic():
|
||||||
|
try:
|
||||||
|
# Get the usernames of users created in the UserFixture
|
||||||
|
created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF]
|
||||||
|
|
||||||
|
# Filter users to only include those created by the fixture
|
||||||
|
users = list(User.objects.filter(username__in=created_usernames))
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(e)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Approve each user associated with `in review` status domains
|
||||||
|
cls._approve_domain_requests(users)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _generate_fake_expiration_date(days_in_future=365):
|
||||||
|
"""Generates a fake expiration date between 1 and 365 days in the future."""
|
||||||
|
current_date = timezone.now().date() # nosec
|
||||||
|
return current_date + timedelta(days=random.randint(1, days_in_future)) # nosec
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _generate_fake_expiration_date_in_past():
|
||||||
|
"""Generates a fake expiration date up to 365 days in the past."""
|
||||||
|
current_date = timezone.now().date() # nosec
|
||||||
|
return current_date + timedelta(days=random.randint(-365, -1)) # nosec
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _approve_request(cls, domain_request, users, is_expired=False):
|
||||||
|
"""Helper function to approve a domain request and set expiration dates."""
|
||||||
|
if not domain_request:
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
if domain_request.investigator is None:
|
||||||
|
# Assign random investigator if not already assigned
|
||||||
|
domain_request.investigator = random.choice(users) # nosec
|
||||||
|
|
||||||
|
# Approve the domain request
|
||||||
|
domain_request.approve(send_email=False)
|
||||||
|
|
||||||
|
# Set expiration date for domain
|
||||||
|
domain = None
|
||||||
|
if domain_request.requested_domain:
|
||||||
|
domain, _ = Domain.objects.get_or_create(name=domain_request.requested_domain.name)
|
||||||
|
domain.expiration_date = (
|
||||||
|
cls._generate_fake_expiration_date_in_past() if is_expired else cls._generate_fake_expiration_date()
|
||||||
|
)
|
||||||
|
|
||||||
|
return domain
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _approve_domain_requests(cls, users):
|
||||||
|
"""Approves one current and one expired request per user."""
|
||||||
|
domain_requests_to_update = []
|
||||||
|
domains_to_update = []
|
||||||
|
|
||||||
|
for user in users:
|
||||||
|
# Get the latest and second-to-last domain requests
|
||||||
|
domain_requests = DomainRequest.objects.filter(
|
||||||
|
creator=user, status=DomainRequest.DomainRequestStatus.IN_REVIEW
|
||||||
|
).order_by("-id")[:2]
|
||||||
|
|
||||||
|
# Latest domain request
|
||||||
|
domain_request = domain_requests[0] if domain_requests else None
|
||||||
|
# Second-to-last domain request (expired)
|
||||||
|
domain_request_expired = domain_requests[1] if len(domain_requests) > 1 else None
|
||||||
|
|
||||||
|
# Approve the current and expired domain requests
|
||||||
|
approved_domain = cls._approve_request(domain_request, users)
|
||||||
|
expired_domain = cls._approve_request(domain_request_expired, users, is_expired=True)
|
||||||
|
|
||||||
|
# Collect objects to update
|
||||||
|
if domain_request:
|
||||||
|
domain_requests_to_update.append(domain_request)
|
||||||
|
if domain_request_expired:
|
||||||
|
domain_requests_to_update.append(domain_request_expired)
|
||||||
|
if approved_domain:
|
||||||
|
domains_to_update.append(approved_domain)
|
||||||
|
if expired_domain:
|
||||||
|
domains_to_update.append(expired_domain)
|
||||||
|
|
||||||
|
# Bulk update approved domain requests
|
||||||
|
if domain_requests_to_update:
|
||||||
|
DomainRequest.objects.bulk_update(domain_requests_to_update, ["status", "investigator"])
|
||||||
|
|
||||||
|
# Bulk update domains with expiration dates
|
||||||
|
if domains_to_update:
|
||||||
|
Domain.objects.bulk_update(domains_to_update, ["expiration_date"])
|
125
src/registrar/fixtures/fixtures_portfolios.py
Normal file
125
src/registrar/fixtures/fixtures_portfolios.py
Normal file
|
@ -0,0 +1,125 @@
|
||||||
|
import logging
|
||||||
|
import random
|
||||||
|
from faker import Faker
|
||||||
|
from django.db import transaction
|
||||||
|
|
||||||
|
from registrar.models import User, DomainRequest, FederalAgency
|
||||||
|
from registrar.models.portfolio import Portfolio
|
||||||
|
from registrar.models.senior_official import SeniorOfficial
|
||||||
|
|
||||||
|
|
||||||
|
fake = Faker()
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class PortfolioFixture:
|
||||||
|
"""
|
||||||
|
Creates 2 pre-defined portfolios with the infrastructure to add more.
|
||||||
|
|
||||||
|
Make sure this class' `load` method is called from `handle`
|
||||||
|
in management/commands/load.py, then use `./manage.py load`
|
||||||
|
to run this code.
|
||||||
|
"""
|
||||||
|
|
||||||
|
PORTFOLIOS = [
|
||||||
|
{
|
||||||
|
"organization_name": "Hotel California",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"organization_name": "Wish You Were Here",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def fake_so(cls):
|
||||||
|
return {
|
||||||
|
"first_name": fake.first_name(),
|
||||||
|
"last_name": fake.last_name(),
|
||||||
|
"title": fake.job(),
|
||||||
|
"email": fake.ascii_safe_email(),
|
||||||
|
"phone": "201-555-5555",
|
||||||
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _set_non_foreign_key_fields(cls, portfolio: Portfolio, portfolio_dict: dict):
|
||||||
|
"""Helper method used by `load`."""
|
||||||
|
portfolio.organization_type = (
|
||||||
|
portfolio_dict["organization_type"]
|
||||||
|
if "organization_type" in portfolio_dict
|
||||||
|
else DomainRequest.OrganizationChoices.FEDERAL
|
||||||
|
)
|
||||||
|
portfolio.notes = portfolio_dict["notes"] if "notes" in portfolio_dict else None
|
||||||
|
portfolio.address_line1 = (
|
||||||
|
portfolio_dict["address_line1"] if "address_line1" in portfolio_dict else fake.street_address()
|
||||||
|
)
|
||||||
|
portfolio.address_line2 = portfolio_dict["address_line2"] if "address_line2" in portfolio_dict else None
|
||||||
|
portfolio.city = portfolio_dict["city"] if "city" in portfolio_dict else fake.city()
|
||||||
|
portfolio.state_territory = (
|
||||||
|
portfolio_dict["state_territory"] if "state_territory" in portfolio_dict else fake.state_abbr()
|
||||||
|
)
|
||||||
|
portfolio.zipcode = portfolio_dict["zipcode"] if "zipcode" in portfolio_dict else fake.postalcode()
|
||||||
|
portfolio.urbanization = portfolio_dict["urbanization"] if "urbanization" in portfolio_dict else None
|
||||||
|
portfolio.security_contact_email = (
|
||||||
|
portfolio_dict["security_contact_email"] if "security_contact_email" in portfolio_dict else fake.email()
|
||||||
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _set_foreign_key_fields(cls, portfolio: Portfolio, portfolio_dict: dict, user: User):
|
||||||
|
"""Helper method used by `load`."""
|
||||||
|
if not portfolio.senior_official:
|
||||||
|
if "senior_official" in portfolio_dict and portfolio_dict["senior_official"] is not None:
|
||||||
|
portfolio.senior_official, _ = SeniorOfficial.objects.get_or_create(**portfolio_dict["senior_official"])
|
||||||
|
else:
|
||||||
|
portfolio.senior_official = SeniorOfficial.objects.create(**cls.fake_so())
|
||||||
|
|
||||||
|
if not portfolio.federal_agency:
|
||||||
|
if "federal_agency" in portfolio_dict and portfolio_dict["federal_agency"] is not None:
|
||||||
|
portfolio.federal_agency, _ = FederalAgency.objects.get_or_create(name=portfolio_dict["federal_agency"])
|
||||||
|
else:
|
||||||
|
federal_agencies = FederalAgency.objects.all()
|
||||||
|
# Random choice of agency for selects, used as placeholders for testing.
|
||||||
|
portfolio.federal_agency = random.choice(federal_agencies) # nosec
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def load(cls):
|
||||||
|
"""Creates portfolios."""
|
||||||
|
logger.info("Going to load %s portfolios" % len(cls.PORTFOLIOS))
|
||||||
|
|
||||||
|
# Lumped under .atomic to ensure we don't make redundant DB calls.
|
||||||
|
# This bundles them all together, and then saves it in a single call.
|
||||||
|
with transaction.atomic():
|
||||||
|
try:
|
||||||
|
user = User.objects.all().last()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(e)
|
||||||
|
return
|
||||||
|
|
||||||
|
portfolios_to_create = []
|
||||||
|
for portfolio_data in cls.PORTFOLIOS:
|
||||||
|
organization_name = portfolio_data["organization_name"]
|
||||||
|
|
||||||
|
# Check if portfolio with the organization name already exists
|
||||||
|
if Portfolio.objects.filter(organization_name=organization_name).exists():
|
||||||
|
logger.info(
|
||||||
|
f"Portfolio with organization name '{organization_name}' already exists, skipping creation."
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
portfolio = Portfolio(
|
||||||
|
creator=user,
|
||||||
|
organization_name=portfolio_data["organization_name"],
|
||||||
|
)
|
||||||
|
cls._set_non_foreign_key_fields(portfolio, portfolio_data)
|
||||||
|
cls._set_foreign_key_fields(portfolio, portfolio_data, user)
|
||||||
|
portfolios_to_create.append(portfolio)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(e)
|
||||||
|
|
||||||
|
# Bulk create domain requests
|
||||||
|
if len(portfolios_to_create) > 0:
|
||||||
|
try:
|
||||||
|
Portfolio.objects.bulk_create(portfolios_to_create)
|
||||||
|
logger.info(f"Successfully created {len(portfolios_to_create)} portfolios")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error bulk creating portfolios: {e}")
|
313
src/registrar/fixtures/fixtures_requests.py
Normal file
313
src/registrar/fixtures/fixtures_requests.py
Normal file
|
@ -0,0 +1,313 @@
|
||||||
|
from datetime import timedelta
|
||||||
|
from django.utils import timezone
|
||||||
|
import logging
|
||||||
|
import random
|
||||||
|
from faker import Faker
|
||||||
|
from django.db import transaction
|
||||||
|
|
||||||
|
from registrar.fixtures.fixtures_users import UserFixture
|
||||||
|
from registrar.models import User, DomainRequest, DraftDomain, Contact, Website, FederalAgency
|
||||||
|
from registrar.models.portfolio import Portfolio
|
||||||
|
from registrar.models.suborganization import Suborganization
|
||||||
|
|
||||||
|
|
||||||
|
fake = Faker()
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class DomainRequestFixture:
|
||||||
|
"""
|
||||||
|
Creates domain requests for each user in the database,
|
||||||
|
assign portfolios and suborgs.
|
||||||
|
|
||||||
|
Creates 3 in_review requests, one for approving with an expired domain,
|
||||||
|
one for approving with a non-expired domain, and one for leaving in in_review.
|
||||||
|
|
||||||
|
Depends on fixtures_portfolios and fixtures_suborganizations.
|
||||||
|
|
||||||
|
Make sure this class' `load` method is called from `handle`
|
||||||
|
in management/commands/load.py, then use `./manage.py load`
|
||||||
|
to run this code.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# any fields not specified here will be filled in with fake data or defaults
|
||||||
|
# NOTE BENE: each fixture must have `organization_name` for uniqueness!
|
||||||
|
# Here is a more complete example as a template:
|
||||||
|
# {
|
||||||
|
# "status": "started",
|
||||||
|
# "organization_name": "Example - Just started",
|
||||||
|
# "generic_org_type": "federal",
|
||||||
|
# "federal_agency": None,
|
||||||
|
# "federal_type": None,
|
||||||
|
# "address_line1": None,
|
||||||
|
# "address_line2": None,
|
||||||
|
# "city": None,
|
||||||
|
# "state_territory": None,
|
||||||
|
# "zipcode": None,
|
||||||
|
# "urbanization": None,
|
||||||
|
# "purpose": None,
|
||||||
|
# "anything_else": None,
|
||||||
|
# "is_policy_acknowledged": None,
|
||||||
|
# "senior_official": None,
|
||||||
|
# "other_contacts": [],
|
||||||
|
# "current_websites": [],
|
||||||
|
# "alternative_domains": [],
|
||||||
|
# },
|
||||||
|
DOMAINREQUESTS = [
|
||||||
|
{
|
||||||
|
"status": DomainRequest.DomainRequestStatus.STARTED,
|
||||||
|
"organization_name": "Example - Finished but not submitted",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"status": DomainRequest.DomainRequestStatus.SUBMITTED,
|
||||||
|
"organization_name": "Example - Submitted but pending investigation",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"status": DomainRequest.DomainRequestStatus.IN_REVIEW,
|
||||||
|
"organization_name": "Example - In investigation",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"status": DomainRequest.DomainRequestStatus.IN_REVIEW,
|
||||||
|
"organization_name": "Example - Approved",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"status": DomainRequest.DomainRequestStatus.IN_REVIEW,
|
||||||
|
"organization_name": "Example - Approved, domain expired",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"status": DomainRequest.DomainRequestStatus.WITHDRAWN,
|
||||||
|
"organization_name": "Example - Withdrawn",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"status": DomainRequest.DomainRequestStatus.ACTION_NEEDED,
|
||||||
|
"organization_name": "Example - Action needed",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"status": "rejected",
|
||||||
|
"organization_name": "Example - Rejected",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def fake_contact(cls):
|
||||||
|
return {
|
||||||
|
"first_name": fake.first_name(),
|
||||||
|
"middle_name": None,
|
||||||
|
"last_name": fake.last_name(),
|
||||||
|
"title": fake.job(),
|
||||||
|
"email": fake.ascii_safe_email(),
|
||||||
|
"phone": "201-555-5555",
|
||||||
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def fake_dot_gov(cls):
|
||||||
|
return f"{fake.slug()}.gov"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def fake_expiration_date(cls):
|
||||||
|
"""Generates a fake expiration date between 0 and 1 year in the future."""
|
||||||
|
current_date = timezone.now().date()
|
||||||
|
days_in_future = random.randint(0, 365) # nosec
|
||||||
|
return current_date + timedelta(days=days_in_future)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _set_non_foreign_key_fields(cls, request: DomainRequest, request_dict: dict):
|
||||||
|
"""Helper method used by `load`."""
|
||||||
|
request.status = request_dict["status"] if "status" in request_dict else "started"
|
||||||
|
|
||||||
|
# TODO for a future ticket: Allow for more than just "federal" here
|
||||||
|
request.generic_org_type = request_dict["generic_org_type"] if "generic_org_type" in request_dict else "federal"
|
||||||
|
if request.status != "started":
|
||||||
|
request.last_submitted_date = fake.date()
|
||||||
|
request.federal_type = (
|
||||||
|
request_dict["federal_type"]
|
||||||
|
if "federal_type" in request_dict
|
||||||
|
else random.choice(["executive", "judicial", "legislative"]) # nosec
|
||||||
|
)
|
||||||
|
request.address_line1 = (
|
||||||
|
request_dict["address_line1"] if "address_line1" in request_dict else fake.street_address()
|
||||||
|
)
|
||||||
|
request.address_line2 = request_dict["address_line2"] if "address_line2" in request_dict else None
|
||||||
|
request.city = request_dict["city"] if "city" in request_dict else fake.city()
|
||||||
|
request.state_territory = (
|
||||||
|
request_dict["state_territory"] if "state_territory" in request_dict else fake.state_abbr()
|
||||||
|
)
|
||||||
|
request.zipcode = request_dict["zipcode"] if "zipcode" in request_dict else fake.postalcode()
|
||||||
|
request.urbanization = request_dict["urbanization"] if "urbanization" in request_dict else None
|
||||||
|
request.purpose = request_dict["purpose"] if "purpose" in request_dict else fake.paragraph()
|
||||||
|
request.has_cisa_representative = (
|
||||||
|
request_dict["has_cisa_representative"] if "has_cisa_representative" in request_dict else True
|
||||||
|
)
|
||||||
|
request.cisa_representative_email = (
|
||||||
|
request_dict["cisa_representative_email"] if "cisa_representative_email" in request_dict else fake.email()
|
||||||
|
)
|
||||||
|
request.cisa_representative_first_name = (
|
||||||
|
request_dict["cisa_representative_first_name"]
|
||||||
|
if "cisa_representative_first_name" in request_dict
|
||||||
|
else fake.first_name()
|
||||||
|
)
|
||||||
|
request.cisa_representative_last_name = (
|
||||||
|
request_dict["cisa_representative_last_name"]
|
||||||
|
if "cisa_representative_last_name" in request_dict
|
||||||
|
else fake.last_name()
|
||||||
|
)
|
||||||
|
request.has_anything_else_text = (
|
||||||
|
request_dict["has_anything_else_text"] if "has_anything_else_text" in request_dict else True
|
||||||
|
)
|
||||||
|
request.anything_else = request_dict["anything_else"] if "anything_else" in request_dict else fake.paragraph()
|
||||||
|
request.is_policy_acknowledged = (
|
||||||
|
request_dict["is_policy_acknowledged"] if "is_policy_acknowledged" in request_dict else True
|
||||||
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _set_foreign_key_fields(cls, request: DomainRequest, request_dict: dict, user: User):
|
||||||
|
"""Helper method used by `load`."""
|
||||||
|
request.investigator = cls._get_investigator(request, request_dict, user)
|
||||||
|
request.senior_official = cls._get_senior_official(request, request_dict)
|
||||||
|
request.requested_domain = cls._get_requested_domain(request, request_dict)
|
||||||
|
request.federal_agency = cls._get_federal_agency(request, request_dict)
|
||||||
|
request.portfolio = cls._get_portfolio(request, request_dict)
|
||||||
|
request.sub_organization = cls._get_sub_organization(request, request_dict)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _get_investigator(cls, request: DomainRequest, request_dict: dict, user: User):
|
||||||
|
if not request.investigator:
|
||||||
|
return User.objects.get(username=user.username) if "investigator" in request_dict else None
|
||||||
|
return request.investigator
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _get_senior_official(cls, request: DomainRequest, request_dict: dict):
|
||||||
|
if not request.senior_official:
|
||||||
|
if "senior_official" in request_dict and request_dict["senior_official"] is not None:
|
||||||
|
return Contact.objects.get_or_create(**request_dict["senior_official"])[0]
|
||||||
|
return Contact.objects.create(**cls.fake_contact())
|
||||||
|
return request.senior_official
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _get_requested_domain(cls, request: DomainRequest, request_dict: dict):
|
||||||
|
if not request.requested_domain:
|
||||||
|
if "requested_domain" in request_dict and request_dict["requested_domain"] is not None:
|
||||||
|
return DraftDomain.objects.get_or_create(name=request_dict["requested_domain"])[0]
|
||||||
|
return DraftDomain.objects.create(name=cls.fake_dot_gov())
|
||||||
|
return request.requested_domain
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _get_federal_agency(cls, request: DomainRequest, request_dict: dict):
|
||||||
|
if not request.federal_agency:
|
||||||
|
if "federal_agency" in request_dict and request_dict["federal_agency"] is not None:
|
||||||
|
return FederalAgency.objects.get_or_create(name=request_dict["federal_agency"])[0]
|
||||||
|
return random.choice(FederalAgency.objects.all()) # nosec
|
||||||
|
return request.federal_agency
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _get_portfolio(cls, request: DomainRequest, request_dict: dict):
|
||||||
|
if not request.portfolio:
|
||||||
|
if "portfolio" in request_dict and request_dict["portfolio"] is not None:
|
||||||
|
return Portfolio.objects.get_or_create(name=request_dict["portfolio"])[0]
|
||||||
|
return cls._get_random_portfolio()
|
||||||
|
return request.portfolio
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _get_sub_organization(cls, request: DomainRequest, request_dict: dict):
|
||||||
|
if not request.sub_organization:
|
||||||
|
if "sub_organization" in request_dict and request_dict["sub_organization"] is not None:
|
||||||
|
return Suborganization.objects.get_or_create(name=request_dict["sub_organization"])[0]
|
||||||
|
return cls._get_random_sub_organization()
|
||||||
|
return request.sub_organization
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _get_random_portfolio(cls):
|
||||||
|
try:
|
||||||
|
portfolio_options = [Portfolio.objects.first(), Portfolio.objects.last()]
|
||||||
|
return random.choice(portfolio_options) # nosec
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Expected fixture portfolio, did not find it: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _get_random_sub_organization(cls):
|
||||||
|
try:
|
||||||
|
suborg_options = [Suborganization.objects.first(), Suborganization.objects.last()]
|
||||||
|
return random.choice(suborg_options) # nosec
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Expected fixture sub_organization, did not find it: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _set_many_to_many_relations(cls, request: DomainRequest, request_dict: dict):
|
||||||
|
"""Helper method used by `load`."""
|
||||||
|
if "other_contacts" in request_dict:
|
||||||
|
for contact in request_dict["other_contacts"]:
|
||||||
|
request.other_contacts.add(Contact.objects.get_or_create(**contact)[0])
|
||||||
|
elif not request.other_contacts.exists():
|
||||||
|
other_contacts = [
|
||||||
|
Contact.objects.create(**cls.fake_contact()) for _ in range(random.randint(0, 3)) # nosec
|
||||||
|
]
|
||||||
|
request.other_contacts.add(*other_contacts)
|
||||||
|
|
||||||
|
if "current_websites" in request_dict:
|
||||||
|
for website in request_dict["current_websites"]:
|
||||||
|
request.current_websites.add(Website.objects.get_or_create(website=website)[0])
|
||||||
|
elif not request.current_websites.exists():
|
||||||
|
current_websites = [
|
||||||
|
Website.objects.create(website=fake.uri()) for _ in range(random.randint(0, 3)) # nosec
|
||||||
|
]
|
||||||
|
request.current_websites.add(*current_websites)
|
||||||
|
|
||||||
|
if "alternative_domains" in request_dict:
|
||||||
|
for domain in request_dict["alternative_domains"]:
|
||||||
|
request.alternative_domains.add(Website.objects.get_or_create(website=domain)[0])
|
||||||
|
elif not request.alternative_domains.exists():
|
||||||
|
alternative_domains = [
|
||||||
|
Website.objects.create(website=cls.fake_dot_gov()) for _ in range(random.randint(0, 3)) # nosec
|
||||||
|
]
|
||||||
|
request.alternative_domains.add(*alternative_domains)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def load(cls):
|
||||||
|
"""Creates domain requests for each user in the database."""
|
||||||
|
logger.info("Going to load %s domain requests" % len(cls.DOMAINREQUESTS))
|
||||||
|
|
||||||
|
# Lumped under .atomic to ensure we don't make redundant DB calls.
|
||||||
|
# This bundles them all together, and then saves it in a single call.
|
||||||
|
with transaction.atomic():
|
||||||
|
try:
|
||||||
|
# Get the usernames of users created in the UserFixture
|
||||||
|
created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF]
|
||||||
|
|
||||||
|
# Filter users to only include those created by the fixture
|
||||||
|
users = list(User.objects.filter(username__in=created_usernames))
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(e)
|
||||||
|
return
|
||||||
|
|
||||||
|
cls._create_domain_requests(users)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _create_domain_requests(cls, users):
|
||||||
|
"""Creates DomainRequests given a list of users."""
|
||||||
|
domain_requests_to_create = []
|
||||||
|
for user in users:
|
||||||
|
for request_data in cls.DOMAINREQUESTS:
|
||||||
|
# Prepare DomainRequest objects
|
||||||
|
try:
|
||||||
|
domain_request = DomainRequest(
|
||||||
|
creator=user,
|
||||||
|
organization_name=request_data["organization_name"],
|
||||||
|
)
|
||||||
|
cls._set_non_foreign_key_fields(domain_request, request_data)
|
||||||
|
cls._set_foreign_key_fields(domain_request, request_data, user)
|
||||||
|
domain_requests_to_create.append(domain_request)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(e)
|
||||||
|
|
||||||
|
# Bulk create domain requests
|
||||||
|
if len(domain_requests_to_create) > 0:
|
||||||
|
DomainRequest.objects.bulk_create(domain_requests_to_create)
|
||||||
|
|
||||||
|
# Now many-to-many relationships
|
||||||
|
for domain_request in domain_requests_to_create:
|
||||||
|
try:
|
||||||
|
cls._set_many_to_many_relations(domain_request, request_data)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(e)
|
86
src/registrar/fixtures/fixtures_suborganizations.py
Normal file
86
src/registrar/fixtures/fixtures_suborganizations.py
Normal file
|
@ -0,0 +1,86 @@
|
||||||
|
import logging
|
||||||
|
from faker import Faker
|
||||||
|
from django.db import transaction
|
||||||
|
|
||||||
|
from registrar.models.portfolio import Portfolio
|
||||||
|
from registrar.models.suborganization import Suborganization
|
||||||
|
|
||||||
|
|
||||||
|
fake = Faker()
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class SuborganizationFixture:
|
||||||
|
"""
|
||||||
|
Creates 2 pre-defined suborg with the infrastructure to add more.
|
||||||
|
|
||||||
|
Depends on fixtures_portfolios.
|
||||||
|
|
||||||
|
Make sure this class' `load` method is called from `handle`
|
||||||
|
in management/commands/load.py, then use `./manage.py load`
|
||||||
|
to run this code.
|
||||||
|
"""
|
||||||
|
|
||||||
|
SUBORGS = [
|
||||||
|
{
|
||||||
|
"name": "Take it Easy",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Welcome to the Machine",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def load(cls):
|
||||||
|
"""Creates suborganizations."""
|
||||||
|
logger.info(f"Going to load {len(cls.SUBORGS)} suborgs")
|
||||||
|
|
||||||
|
with transaction.atomic():
|
||||||
|
portfolios = cls._get_portfolios()
|
||||||
|
if not portfolios:
|
||||||
|
return
|
||||||
|
|
||||||
|
suborgs_to_create = cls._prepare_suborgs_to_create(portfolios)
|
||||||
|
cls._bulk_create_suborgs(suborgs_to_create)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _get_portfolios(cls):
|
||||||
|
"""Fetches the first and last portfolio and logs warnings if not found."""
|
||||||
|
try:
|
||||||
|
portfolio1 = Portfolio.objects.first()
|
||||||
|
portfolio2 = Portfolio.objects.last()
|
||||||
|
|
||||||
|
if not portfolio1 or not portfolio2:
|
||||||
|
logger.warning("One or both portfolios not found.")
|
||||||
|
return None
|
||||||
|
return portfolio1, portfolio2
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error fetching portfolios: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _prepare_suborgs_to_create(cls, portfolios):
|
||||||
|
"""Prepares a list of suborganizations to create, avoiding duplicates."""
|
||||||
|
portfolio1, portfolio2 = portfolios
|
||||||
|
suborgs_to_create = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
if not Suborganization.objects.filter(name=cls.SUBORGS[0]["name"]).exists():
|
||||||
|
suborgs_to_create.append(Suborganization(portfolio=portfolio1, name=cls.SUBORGS[0]["name"]))
|
||||||
|
|
||||||
|
if not Suborganization.objects.filter(name=cls.SUBORGS[1]["name"]).exists():
|
||||||
|
suborgs_to_create.append(Suborganization(portfolio=portfolio2, name=cls.SUBORGS[1]["name"]))
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error creating suborg objects: {e}")
|
||||||
|
|
||||||
|
return suborgs_to_create
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _bulk_create_suborgs(cls, suborgs_to_create):
|
||||||
|
"""Bulk creates suborganizations and logs success or errors."""
|
||||||
|
if suborgs_to_create:
|
||||||
|
try:
|
||||||
|
Suborganization.objects.bulk_create(suborgs_to_create)
|
||||||
|
logger.info(f"Successfully created {len(suborgs_to_create)} suborgs")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error bulk creating suborgs: {e}")
|
|
@ -0,0 +1,61 @@
|
||||||
|
import logging
|
||||||
|
from faker import Faker
|
||||||
|
from django.db import transaction
|
||||||
|
|
||||||
|
from registrar.fixtures.fixtures_users import UserFixture
|
||||||
|
from registrar.models import User
|
||||||
|
from registrar.models.portfolio import Portfolio
|
||||||
|
from registrar.models.user_portfolio_permission import UserPortfolioPermission
|
||||||
|
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
|
||||||
|
|
||||||
|
fake = Faker()
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class UserPortfolioPermissionFixture:
|
||||||
|
"""Create user portfolio permissions for each user.
|
||||||
|
Each user will be admin on 2 portfolios.
|
||||||
|
|
||||||
|
Depends on fixture_portfolios"""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def load(cls):
|
||||||
|
logger.info("Going to set user portfolio permissions")
|
||||||
|
|
||||||
|
# Lumped under .atomic to ensure we don't make redundant DB calls.
|
||||||
|
# This bundles them all together, and then saves it in a single call.
|
||||||
|
with transaction.atomic():
|
||||||
|
try:
|
||||||
|
# Get the usernames of users created in the UserFixture
|
||||||
|
created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF]
|
||||||
|
|
||||||
|
# Filter users to only include those created by the fixture
|
||||||
|
users = list(User.objects.filter(username__in=created_usernames))
|
||||||
|
|
||||||
|
portfolios = list(Portfolio.objects.all())
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(e)
|
||||||
|
return
|
||||||
|
|
||||||
|
user_portfolio_permissions_to_create = []
|
||||||
|
for user in users:
|
||||||
|
for portfolio in portfolios:
|
||||||
|
try:
|
||||||
|
if not UserPortfolioPermission.objects.filter(user=user, portfolio=portfolio).exists():
|
||||||
|
user_portfolio_permission = UserPortfolioPermission(
|
||||||
|
user=user,
|
||||||
|
portfolio=portfolio,
|
||||||
|
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
|
||||||
|
)
|
||||||
|
user_portfolio_permissions_to_create.append(user_portfolio_permission)
|
||||||
|
else:
|
||||||
|
logger.info(
|
||||||
|
f"Permission exists for user '{user.username}' "
|
||||||
|
"on portfolio '{portfolio.organization_name}'."
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(e)
|
||||||
|
|
||||||
|
# Bulk create domain requests
|
||||||
|
if len(user_portfolio_permissions_to_create) > 0:
|
||||||
|
UserPortfolioPermission.objects.bulk_create(user_portfolio_permissions_to_create)
|
|
@ -28,114 +28,134 @@ class UserFixture:
|
||||||
"first_name": "Jyoti",
|
"first_name": "Jyoti",
|
||||||
"last_name": "Bock",
|
"last_name": "Bock",
|
||||||
"email": "jyotibock@truss.works",
|
"email": "jyotibock@truss.works",
|
||||||
|
"title": "GIFted",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "aad084c3-66cc-4632-80eb-41cdf5c5bcbf",
|
"username": "aad084c3-66cc-4632-80eb-41cdf5c5bcbf",
|
||||||
"first_name": "Aditi",
|
"first_name": "Aditi",
|
||||||
"last_name": "Green",
|
"last_name": "Green",
|
||||||
"email": "aditidevelops+01@gmail.com",
|
"email": "aditidevelops+01@gmail.com",
|
||||||
|
"title": "Positive vibes",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "be17c826-e200-4999-9389-2ded48c43691",
|
"username": "be17c826-e200-4999-9389-2ded48c43691",
|
||||||
"first_name": "Matthew",
|
"first_name": "Matthew",
|
||||||
"last_name": "Spence",
|
"last_name": "Spence",
|
||||||
|
"title": "Hollywood hair",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "5f283494-31bd-49b5-b024-a7e7cae00848",
|
"username": "5f283494-31bd-49b5-b024-a7e7cae00848",
|
||||||
"first_name": "Rachid",
|
"first_name": "Rachid",
|
||||||
"last_name": "Mrad",
|
"last_name": "Mrad",
|
||||||
"email": "rachid.mrad@associates.cisa.dhs.gov",
|
"email": "rachid.mrad@associates.cisa.dhs.gov",
|
||||||
|
"title": "Common pirate",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "eb2214cd-fc0c-48c0-9dbd-bc4cd6820c74",
|
"username": "eb2214cd-fc0c-48c0-9dbd-bc4cd6820c74",
|
||||||
"first_name": "Alysia",
|
"first_name": "Alysia",
|
||||||
"last_name": "Broddrick",
|
"last_name": "Broddrick",
|
||||||
"email": "abroddrick@truss.works",
|
"email": "abroddrick@truss.works",
|
||||||
|
"title": "Professional coffee taster",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "8f8e7293-17f7-4716-889b-1990241cbd39",
|
"username": "8f8e7293-17f7-4716-889b-1990241cbd39",
|
||||||
"first_name": "Katherine",
|
"first_name": "Katherine",
|
||||||
"last_name": "Osos",
|
"last_name": "Osos",
|
||||||
"email": "kosos@truss.works",
|
"email": "kosos@truss.works",
|
||||||
|
"title": "Grove keeper",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "70488e0a-e937-4894-a28c-16f5949effd4",
|
"username": "70488e0a-e937-4894-a28c-16f5949effd4",
|
||||||
"first_name": "Gaby",
|
"first_name": "Gaby",
|
||||||
"last_name": "DiSarli",
|
"last_name": "DiSarli",
|
||||||
"email": "gaby@truss.works",
|
"email": "gaby@truss.works",
|
||||||
|
"title": "De Stijl",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "83c2b6dd-20a2-4cac-bb40-e22a72d2955c",
|
"username": "83c2b6dd-20a2-4cac-bb40-e22a72d2955c",
|
||||||
"first_name": "Cameron",
|
"first_name": "Cameron",
|
||||||
"last_name": "Dixon",
|
"last_name": "Dixon",
|
||||||
"email": "cameron.dixon@cisa.dhs.gov",
|
"email": "cameron.dixon@cisa.dhs.gov",
|
||||||
|
"title": "Supreme leader",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "0353607a-cbba-47d2-98d7-e83dcd5b90ea",
|
"username": "0353607a-cbba-47d2-98d7-e83dcd5b90ea",
|
||||||
"first_name": "Ryan",
|
"first_name": "Ryan",
|
||||||
"last_name": "Brooks",
|
"last_name": "Brooks",
|
||||||
|
"title": "Honorable discharge",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "30001ee7-0467-4df2-8db2-786e79606060",
|
"username": "30001ee7-0467-4df2-8db2-786e79606060",
|
||||||
"first_name": "Zander",
|
"first_name": "Zander",
|
||||||
"last_name": "Adkinson",
|
"last_name": "Adkinson",
|
||||||
|
"title": "ACME Specialist",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "2bf518c2-485a-4c42-ab1a-f5a8b0a08484",
|
"username": "2bf518c2-485a-4c42-ab1a-f5a8b0a08484",
|
||||||
"first_name": "Paul",
|
"first_name": "Paul",
|
||||||
"last_name": "Kuykendall",
|
"last_name": "Kuykendall",
|
||||||
|
"title": "Silvertongue",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "2a88a97b-be96-4aad-b99e-0b605b492c78",
|
"username": "2a88a97b-be96-4aad-b99e-0b605b492c78",
|
||||||
"first_name": "Rebecca",
|
"first_name": "Rebecca",
|
||||||
"last_name": "Hsieh",
|
"last_name": "Hsieh",
|
||||||
"email": "rebecca.hsieh@truss.works",
|
"email": "rebecca.hsieh@truss.works",
|
||||||
|
"title": "Catlady",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "fa69c8e8-da83-4798-a4f2-263c9ce93f52",
|
"username": "fa69c8e8-da83-4798-a4f2-263c9ce93f52",
|
||||||
"first_name": "David",
|
"first_name": "David",
|
||||||
"last_name": "Kennedy",
|
"last_name": "Kennedy",
|
||||||
"email": "david.kennedy@ecstech.com",
|
"email": "david.kennedy@ecstech.com",
|
||||||
|
"title": "Mean lean coding machine",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "f14433d8-f0e9-41bf-9c72-b99b110e665d",
|
"username": "f14433d8-f0e9-41bf-9c72-b99b110e665d",
|
||||||
"first_name": "Nicolle",
|
"first_name": "Nicolle",
|
||||||
"last_name": "LeClair",
|
"last_name": "LeClair",
|
||||||
"email": "nicolle.leclair@ecstech.com",
|
"email": "nicolle.leclair@ecstech.com",
|
||||||
|
"title": "Nightowl",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "24840450-bf47-4d89-8aa9-c612fe68f9da",
|
"username": "24840450-bf47-4d89-8aa9-c612fe68f9da",
|
||||||
"first_name": "Erin",
|
"first_name": "Erin",
|
||||||
"last_name": "Song",
|
"last_name": "Song",
|
||||||
|
"title": "Catlady 2",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "e0ea8b94-6e53-4430-814a-849a7ca45f21",
|
"username": "e0ea8b94-6e53-4430-814a-849a7ca45f21",
|
||||||
"first_name": "Kristina",
|
"first_name": "Kristina",
|
||||||
"last_name": "Yin",
|
"last_name": "Yin",
|
||||||
|
"title": "Hufflepuff prefect",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "ac49d7c1-368a-4e6b-8f1d-60250e20a16f",
|
"username": "ac49d7c1-368a-4e6b-8f1d-60250e20a16f",
|
||||||
"first_name": "Vicky",
|
"first_name": "Vicky",
|
||||||
"last_name": "Chin",
|
"last_name": "Chin",
|
||||||
"email": "szu.chin@associates.cisa.dhs.gov",
|
"email": "szu.chin@associates.cisa.dhs.gov",
|
||||||
|
"title": "Ze whip",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "66bb1a5a-a091-4d7f-a6cf-4d772b4711c7",
|
"username": "66bb1a5a-a091-4d7f-a6cf-4d772b4711c7",
|
||||||
"first_name": "Christina",
|
"first_name": "Christina",
|
||||||
"last_name": "Burnett",
|
"last_name": "Burnett",
|
||||||
"email": "christina.burnett@cisa.dhs.gov",
|
"email": "christina.burnett@cisa.dhs.gov",
|
||||||
|
"title": "Groovy",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "012f844d-8a0f-4225-9d82-cbf87bff1d3e",
|
"username": "012f844d-8a0f-4225-9d82-cbf87bff1d3e",
|
||||||
"first_name": "Riley",
|
"first_name": "Riley",
|
||||||
"last_name": "Orr",
|
"last_name": "Orr",
|
||||||
"email": "riley+320@truss.works",
|
"email": "riley+320@truss.works",
|
||||||
|
"title": "Honorable discharge",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"username": "76612d84-66b0-4ae9-9870-81e98b9858b6",
|
"username": "76612d84-66b0-4ae9-9870-81e98b9858b6",
|
||||||
"first_name": "Anna",
|
"first_name": "Anna",
|
||||||
"last_name": "Gingle",
|
"last_name": "Gingle",
|
||||||
"email": "annagingle@truss.works",
|
"email": "annagingle@truss.works",
|
||||||
|
"title": "Sweetwater sailor",
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -254,29 +274,58 @@ class UserFixture:
|
||||||
# Additional emails to add to the AllowedEmail whitelist.
|
# Additional emails to add to the AllowedEmail whitelist.
|
||||||
ADDITIONAL_ALLOWED_EMAILS: list[str] = ["davekenn4242@gmail.com", "rachid_mrad@hotmail.com"]
|
ADDITIONAL_ALLOWED_EMAILS: list[str] = ["davekenn4242@gmail.com", "rachid_mrad@hotmail.com"]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
def load_users(cls, users, group_name, are_superusers=False):
|
def load_users(cls, users, group_name, are_superusers=False):
|
||||||
logger.info(f"Going to load {len(users)} users in group {group_name}")
|
"""Loads the users into the database and assigns them to the specified group."""
|
||||||
for user_data in users:
|
logger.info(f"Going to load {len(users)} users for group {group_name}")
|
||||||
try:
|
|
||||||
user, _ = User.objects.get_or_create(username=user_data["username"])
|
|
||||||
user.is_superuser = are_superusers
|
|
||||||
user.first_name = user_data["first_name"]
|
|
||||||
user.last_name = user_data["last_name"]
|
|
||||||
if "email" in user_data:
|
|
||||||
user.email = user_data["email"]
|
|
||||||
user.is_staff = True
|
|
||||||
user.is_active = True
|
|
||||||
# This verification type will get reverted to "regular" (or whichever is applicables)
|
|
||||||
# once the user logs in for the first time (as they then got verified through different means).
|
|
||||||
# In the meantime, we can still describe how the user got here in the first place.
|
|
||||||
user.verification_type = User.VerificationTypeChoices.FIXTURE_USER
|
|
||||||
group = UserGroup.objects.get(name=group_name)
|
group = UserGroup.objects.get(name=group_name)
|
||||||
user.groups.add(group)
|
|
||||||
user.save()
|
# Prepare sets of existing usernames and IDs in one query
|
||||||
logger.debug(f"User object created for {user_data['first_name']}")
|
user_identifiers = [(user.get("username"), user.get("id")) for user in users]
|
||||||
except Exception as e:
|
existing_users = User.objects.filter(
|
||||||
logger.warning(e)
|
username__in=[user[0] for user in user_identifiers] + [user[1] for user in user_identifiers]
|
||||||
logger.info(f"All users in group {group_name} loaded.")
|
).values_list("username", "id")
|
||||||
|
|
||||||
|
existing_usernames = set(user[0] for user in existing_users)
|
||||||
|
existing_user_ids = set(user[1] for user in existing_users)
|
||||||
|
|
||||||
|
# Filter out users with existing IDs or usernames
|
||||||
|
new_users = [
|
||||||
|
User(
|
||||||
|
id=user_data.get("id"),
|
||||||
|
first_name=user_data.get("first_name"),
|
||||||
|
last_name=user_data.get("last_name"),
|
||||||
|
username=user_data.get("username"),
|
||||||
|
email=user_data.get("email", ""),
|
||||||
|
title=user_data.get("title", "Peon"),
|
||||||
|
phone=user_data.get("phone", "2022222222"),
|
||||||
|
is_active=user_data.get("is_active", True),
|
||||||
|
is_staff=True,
|
||||||
|
is_superuser=are_superusers,
|
||||||
|
)
|
||||||
|
for user_data in users
|
||||||
|
if user_data.get("username") not in existing_usernames and user_data.get("id") not in existing_user_ids
|
||||||
|
]
|
||||||
|
|
||||||
|
# Perform bulk creation for new users
|
||||||
|
if new_users:
|
||||||
|
User.objects.bulk_create(new_users)
|
||||||
|
logger.info(f"Created {len(new_users)} new users.")
|
||||||
|
else:
|
||||||
|
logger.info("No new users to create.")
|
||||||
|
|
||||||
|
# Get all users to be updated (both new and existing)
|
||||||
|
created_or_existing_users = User.objects.filter(username__in=[user.get("username") for user in users])
|
||||||
|
|
||||||
|
# Filter out users who are already in the group
|
||||||
|
users_not_in_group = created_or_existing_users.exclude(groups__id=group.id)
|
||||||
|
|
||||||
|
# Add only users who are not already in the group
|
||||||
|
if users_not_in_group.exists():
|
||||||
|
group.user_set.add(*users_not_in_group)
|
||||||
|
|
||||||
|
logger.info(f"Users loaded for group {group_name}.")
|
||||||
|
|
||||||
def load_allowed_emails(cls, users, additional_emails):
|
def load_allowed_emails(cls, users, additional_emails):
|
||||||
"""Populates a whitelist of allowed emails (as defined in this list)"""
|
"""Populates a whitelist of allowed emails (as defined in this list)"""
|
||||||
|
@ -284,37 +333,30 @@ class UserFixture:
|
||||||
if additional_emails:
|
if additional_emails:
|
||||||
logger.info(f"Going to load {len(additional_emails)} additional allowed emails")
|
logger.info(f"Going to load {len(additional_emails)} additional allowed emails")
|
||||||
|
|
||||||
# Load user emails
|
existing_emails = set(AllowedEmail.objects.values_list("email", flat=True))
|
||||||
allowed_emails = []
|
new_allowed_emails = []
|
||||||
|
|
||||||
for user_data in users:
|
for user_data in users:
|
||||||
user_email = user_data.get("email")
|
user_email = user_data.get("email")
|
||||||
if user_email and user_email not in allowed_emails:
|
if user_email and user_email not in existing_emails:
|
||||||
allowed_emails.append(AllowedEmail(email=user_email))
|
new_allowed_emails.append(AllowedEmail(email=user_email))
|
||||||
else:
|
|
||||||
first_name = user_data.get("first_name")
|
|
||||||
last_name = user_data.get("last_name")
|
|
||||||
logger.warning(f"Could not add email to whitelist for {first_name} {last_name}.")
|
|
||||||
|
|
||||||
# Load additional emails
|
# Load additional emails, only if they don't exist already
|
||||||
allowed_emails.extend([AllowedEmail(email=email) for email in additional_emails])
|
for email in additional_emails:
|
||||||
|
if email not in existing_emails:
|
||||||
|
new_allowed_emails.append(AllowedEmail(email=email))
|
||||||
|
|
||||||
if allowed_emails:
|
if new_allowed_emails:
|
||||||
AllowedEmail.objects.bulk_create(allowed_emails)
|
AllowedEmail.objects.bulk_create(new_allowed_emails)
|
||||||
logger.info(f"Loaded {len(allowed_emails)} allowed emails")
|
logger.info(f"Loaded {len(new_allowed_emails)} allowed emails")
|
||||||
else:
|
else:
|
||||||
logger.info("No allowed emails to load")
|
logger.info("No allowed emails to load")
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def load(cls):
|
def load(cls):
|
||||||
# Lumped under .atomic to ensure we don't make redundant DB calls.
|
|
||||||
# This bundles them all together, and then saves it in a single call.
|
|
||||||
# This is slightly different then bulk_create or bulk_update, in that
|
|
||||||
# you still get the same behaviour of .save(), but those incremental
|
|
||||||
# steps now do not need to close/reopen a db connection,
|
|
||||||
# instead they share one.
|
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
cls.load_users(cls, cls.ADMINS, "full_access_group", are_superusers=True)
|
cls.load_users(cls.ADMINS, "full_access_group", are_superusers=True)
|
||||||
cls.load_users(cls, cls.STAFF, "cisa_analysts_group")
|
cls.load_users(cls.STAFF, "cisa_analysts_group")
|
||||||
|
|
||||||
# Combine ADMINS and STAFF lists
|
# Combine ADMINS and STAFF lists
|
||||||
all_users = cls.ADMINS + cls.STAFF
|
all_users = cls.ADMINS + cls.STAFF
|
|
@ -1,236 +0,0 @@
|
||||||
import logging
|
|
||||||
import random
|
|
||||||
from faker import Faker
|
|
||||||
from django.db import transaction
|
|
||||||
|
|
||||||
from registrar.models import User, DomainRequest, DraftDomain, Contact, Website, FederalAgency
|
|
||||||
|
|
||||||
fake = Faker()
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class DomainRequestFixture:
|
|
||||||
"""
|
|
||||||
Load domain requests into the database.
|
|
||||||
|
|
||||||
Make sure this class' `load` method is called from `handle`
|
|
||||||
in management/commands/load.py, then use `./manage.py load`
|
|
||||||
to run this code.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# any fields not specified here will be filled in with fake data or defaults
|
|
||||||
# NOTE BENE: each fixture must have `organization_name` for uniqueness!
|
|
||||||
# Here is a more complete example as a template:
|
|
||||||
# {
|
|
||||||
# "status": "started",
|
|
||||||
# "organization_name": "Example - Just started",
|
|
||||||
# "generic_org_type": "federal",
|
|
||||||
# "federal_agency": None,
|
|
||||||
# "federal_type": None,
|
|
||||||
# "address_line1": None,
|
|
||||||
# "address_line2": None,
|
|
||||||
# "city": None,
|
|
||||||
# "state_territory": None,
|
|
||||||
# "zipcode": None,
|
|
||||||
# "urbanization": None,
|
|
||||||
# "purpose": None,
|
|
||||||
# "anything_else": None,
|
|
||||||
# "is_policy_acknowledged": None,
|
|
||||||
# "senior_official": None,
|
|
||||||
# "other_contacts": [],
|
|
||||||
# "current_websites": [],
|
|
||||||
# "alternative_domains": [],
|
|
||||||
# },
|
|
||||||
DA = [
|
|
||||||
{
|
|
||||||
"status": DomainRequest.DomainRequestStatus.STARTED,
|
|
||||||
"organization_name": "Example - Finished but not submitted",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"status": DomainRequest.DomainRequestStatus.SUBMITTED,
|
|
||||||
"organization_name": "Example - Submitted but pending investigation",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"status": DomainRequest.DomainRequestStatus.IN_REVIEW,
|
|
||||||
"organization_name": "Example - In investigation",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"status": DomainRequest.DomainRequestStatus.IN_REVIEW,
|
|
||||||
"organization_name": "Example - Approved",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"status": DomainRequest.DomainRequestStatus.WITHDRAWN,
|
|
||||||
"organization_name": "Example - Withdrawn",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"status": DomainRequest.DomainRequestStatus.ACTION_NEEDED,
|
|
||||||
"organization_name": "Example - Action needed",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"status": "rejected",
|
|
||||||
"organization_name": "Example - Rejected",
|
|
||||||
},
|
|
||||||
]
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def fake_contact(cls):
|
|
||||||
return {
|
|
||||||
"first_name": fake.first_name(),
|
|
||||||
"middle_name": None,
|
|
||||||
"last_name": fake.last_name(),
|
|
||||||
"title": fake.job(),
|
|
||||||
"email": fake.ascii_safe_email(),
|
|
||||||
"phone": "201-555-5555",
|
|
||||||
}
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def fake_dot_gov(cls):
|
|
||||||
return f"{fake.slug()}.gov"
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _set_non_foreign_key_fields(cls, da: DomainRequest, app: dict):
|
|
||||||
"""Helper method used by `load`."""
|
|
||||||
da.status = app["status"] if "status" in app else "started"
|
|
||||||
|
|
||||||
# TODO for a future ticket: Allow for more than just "federal" here
|
|
||||||
da.generic_org_type = app["generic_org_type"] if "generic_org_type" in app else "federal"
|
|
||||||
da.last_submitted_date = fake.date()
|
|
||||||
da.federal_type = (
|
|
||||||
app["federal_type"]
|
|
||||||
if "federal_type" in app
|
|
||||||
else random.choice(["executive", "judicial", "legislative"]) # nosec
|
|
||||||
)
|
|
||||||
da.address_line1 = app["address_line1"] if "address_line1" in app else fake.street_address()
|
|
||||||
da.address_line2 = app["address_line2"] if "address_line2" in app else None
|
|
||||||
da.city = app["city"] if "city" in app else fake.city()
|
|
||||||
da.state_territory = app["state_territory"] if "state_territory" in app else fake.state_abbr()
|
|
||||||
da.zipcode = app["zipcode"] if "zipcode" in app else fake.postalcode()
|
|
||||||
da.urbanization = app["urbanization"] if "urbanization" in app else None
|
|
||||||
da.purpose = app["purpose"] if "purpose" in app else fake.paragraph()
|
|
||||||
da.anything_else = app["anything_else"] if "anything_else" in app else None
|
|
||||||
da.is_policy_acknowledged = app["is_policy_acknowledged"] if "is_policy_acknowledged" in app else True
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _set_foreign_key_fields(cls, da: DomainRequest, app: dict, user: User):
|
|
||||||
"""Helper method used by `load`."""
|
|
||||||
if not da.investigator:
|
|
||||||
da.investigator = User.objects.get(username=user.username) if "investigator" in app else None
|
|
||||||
|
|
||||||
if not da.senior_official:
|
|
||||||
if "senior_official" in app and app["senior_official"] is not None:
|
|
||||||
da.senior_official, _ = Contact.objects.get_or_create(**app["senior_official"])
|
|
||||||
else:
|
|
||||||
da.senior_official = Contact.objects.create(**cls.fake_contact())
|
|
||||||
|
|
||||||
if not da.requested_domain:
|
|
||||||
if "requested_domain" in app and app["requested_domain"] is not None:
|
|
||||||
da.requested_domain, _ = DraftDomain.objects.get_or_create(name=app["requested_domain"])
|
|
||||||
else:
|
|
||||||
da.requested_domain = DraftDomain.objects.create(name=cls.fake_dot_gov())
|
|
||||||
if not da.federal_agency:
|
|
||||||
if "federal_agency" in app and app["federal_agency"] is not None:
|
|
||||||
da.federal_agency, _ = FederalAgency.objects.get_or_create(name=app["federal_agency"])
|
|
||||||
else:
|
|
||||||
federal_agencies = FederalAgency.objects.all()
|
|
||||||
# Random choice of agency for selects, used as placeholders for testing.
|
|
||||||
da.federal_agency = random.choice(federal_agencies) # nosec
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _set_many_to_many_relations(cls, da: DomainRequest, app: dict):
|
|
||||||
"""Helper method used by `load`."""
|
|
||||||
if "other_contacts" in app:
|
|
||||||
for contact in app["other_contacts"]:
|
|
||||||
da.other_contacts.add(Contact.objects.get_or_create(**contact)[0])
|
|
||||||
elif not da.other_contacts.exists():
|
|
||||||
other_contacts = [
|
|
||||||
Contact.objects.create(**cls.fake_contact()) for _ in range(random.randint(0, 3)) # nosec
|
|
||||||
]
|
|
||||||
da.other_contacts.add(*other_contacts)
|
|
||||||
|
|
||||||
if "current_websites" in app:
|
|
||||||
for website in app["current_websites"]:
|
|
||||||
da.current_websites.add(Website.objects.get_or_create(website=website)[0])
|
|
||||||
elif not da.current_websites.exists():
|
|
||||||
current_websites = [
|
|
||||||
Website.objects.create(website=fake.uri()) for _ in range(random.randint(0, 3)) # nosec
|
|
||||||
]
|
|
||||||
da.current_websites.add(*current_websites)
|
|
||||||
|
|
||||||
if "alternative_domains" in app:
|
|
||||||
for domain in app["alternative_domains"]:
|
|
||||||
da.alternative_domains.add(Website.objects.get_or_create(website=domain)[0])
|
|
||||||
elif not da.alternative_domains.exists():
|
|
||||||
alternative_domains = [
|
|
||||||
Website.objects.create(website=cls.fake_dot_gov()) for _ in range(random.randint(0, 3)) # nosec
|
|
||||||
]
|
|
||||||
da.alternative_domains.add(*alternative_domains)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def load(cls):
|
|
||||||
"""Creates domain requests for each user in the database."""
|
|
||||||
logger.info("Going to load %s domain requests" % len(cls.DA))
|
|
||||||
try:
|
|
||||||
users = list(User.objects.all()) # force evaluation to catch db errors
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(e)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Lumped under .atomic to ensure we don't make redundant DB calls.
|
|
||||||
# This bundles them all together, and then saves it in a single call.
|
|
||||||
with transaction.atomic():
|
|
||||||
cls._create_domain_requests(users)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _create_domain_requests(cls, users):
|
|
||||||
"""Creates DomainRequests given a list of users"""
|
|
||||||
for user in users:
|
|
||||||
logger.debug("Loading domain requests for %s" % user)
|
|
||||||
for app in cls.DA:
|
|
||||||
try:
|
|
||||||
da, _ = DomainRequest.objects.get_or_create(
|
|
||||||
creator=user,
|
|
||||||
organization_name=app["organization_name"],
|
|
||||||
)
|
|
||||||
cls._set_non_foreign_key_fields(da, app)
|
|
||||||
cls._set_foreign_key_fields(da, app, user)
|
|
||||||
da.save()
|
|
||||||
cls._set_many_to_many_relations(da, app)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(e)
|
|
||||||
|
|
||||||
|
|
||||||
class DomainFixture(DomainRequestFixture):
|
|
||||||
"""Create one domain and permissions on it for each user."""
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def load(cls):
|
|
||||||
try:
|
|
||||||
users = list(User.objects.all()) # force evaluation to catch db errors
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(e)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Lumped under .atomic to ensure we don't make redundant DB calls.
|
|
||||||
# This bundles them all together, and then saves it in a single call.
|
|
||||||
with transaction.atomic():
|
|
||||||
# approve each user associated with `in review` status domains
|
|
||||||
DomainFixture._approve_domain_requests(users)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _approve_domain_requests(users):
|
|
||||||
"""Approves all provided domain requests if they are in the state in_review"""
|
|
||||||
for user in users:
|
|
||||||
domain_request = DomainRequest.objects.filter(
|
|
||||||
creator=user, status=DomainRequest.DomainRequestStatus.IN_REVIEW
|
|
||||||
).last()
|
|
||||||
logger.debug(f"Approving {domain_request} for {user}")
|
|
||||||
|
|
||||||
# All approvals require an investigator, so if there is none,
|
|
||||||
# assign one.
|
|
||||||
if domain_request.investigator is None:
|
|
||||||
# All "users" in fixtures have admin perms per prior config.
|
|
||||||
# No need to check for that.
|
|
||||||
domain_request.investigator = random.choice(users) # nosec
|
|
||||||
|
|
||||||
domain_request.approve(send_email=False)
|
|
||||||
domain_request.save()
|
|
|
@ -1,11 +1,13 @@
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
from auditlog.context import disable_auditlog # type: ignore
|
from auditlog.context import disable_auditlog
|
||||||
|
from registrar.fixtures.fixtures_domains import DomainFixture
|
||||||
|
from registrar.fixtures.fixtures_portfolios import PortfolioFixture
|
||||||
from registrar.fixtures_users import UserFixture
|
from registrar.fixtures.fixtures_requests import DomainRequestFixture
|
||||||
from registrar.fixtures_domain_requests import DomainRequestFixture, DomainFixture
|
from registrar.fixtures.fixtures_suborganizations import SuborganizationFixture
|
||||||
|
from registrar.fixtures.fixtures_user_portfolio_permissions import UserPortfolioPermissionFixture
|
||||||
|
from registrar.fixtures.fixtures_users import UserFixture # type: ignore
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -16,6 +18,9 @@ class Command(BaseCommand):
|
||||||
# https://github.com/jazzband/django-auditlog/issues/17
|
# https://github.com/jazzband/django-auditlog/issues/17
|
||||||
with disable_auditlog():
|
with disable_auditlog():
|
||||||
UserFixture.load()
|
UserFixture.load()
|
||||||
|
PortfolioFixture.load()
|
||||||
|
SuborganizationFixture.load()
|
||||||
DomainRequestFixture.load()
|
DomainRequestFixture.load()
|
||||||
DomainFixture.load()
|
DomainFixture.load()
|
||||||
|
UserPortfolioPermissionFixture.load()
|
||||||
logger.info("All fixtures loaded.")
|
logger.info("All fixtures loaded.")
|
||||||
|
|
|
@ -113,7 +113,6 @@ class UserGroup(Group):
|
||||||
+ cisa_analysts_group.name
|
+ cisa_analysts_group.name
|
||||||
)
|
)
|
||||||
|
|
||||||
cisa_analysts_group.save()
|
|
||||||
logger.debug("CISA Analyst permissions added to group " + cisa_analysts_group.name)
|
logger.debug("CISA Analyst permissions added to group " + cisa_analysts_group.name)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error creating analyst permissions group: {e}")
|
logger.error(f"Error creating analyst permissions group: {e}")
|
||||||
|
@ -135,7 +134,6 @@ class UserGroup(Group):
|
||||||
# Assign all permissions to the group
|
# Assign all permissions to the group
|
||||||
full_access_group.permissions.add(*all_permissions)
|
full_access_group.permissions.add(*all_permissions)
|
||||||
|
|
||||||
full_access_group.save()
|
|
||||||
logger.debug("All permissions added to group " + full_access_group.name)
|
logger.debug("All permissions added to group " + full_access_group.name)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error creating full access group: {e}")
|
logger.error(f"Error creating full access group: {e}")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue