merge main

This commit is contained in:
Rachid Mrad 2024-10-02 16:31:56 -04:00
commit 8aa919cc65
No known key found for this signature in database
16 changed files with 978 additions and 375 deletions

View file

@ -173,7 +173,7 @@ You can change the logging verbosity, if needed. Do a web search for "django log
## Mock data
[load.py](../../src/registrar/management/commands/load.py) called from docker-compose (locally) and reset-db.yml (upper) loads the fixtures from [fixtures_user.py](../../src/registrar/fixtures_users.py) and [fixtures_domain_requests.py](../../src/registrar/fixtures_domain_requests.py), giving you some test data to play with while developing.
[load.py](../../src/registrar/management/commands/load.py) called from docker-compose (locally) and reset-db.yml (upper) loads the fixtures from [fixtures_user.py](../../src/registrar/fixtures/fixtures_users.py) and the rest of the data-loading fixtures in that fixtures folder, giving you some test data to play with while developing.
See the [database-access README](./database-access.md) for information on how to pull data to update these fixtures.

View file

@ -9,17 +9,16 @@ Simple scripts are provided as detailed below.
### Export
To export from the source environment, run the following command from src directory:
manage.py export_tables
Connect to the source sandbox and run the command:
cf ssh {source-app}
/tmp/lifecycle/shell
./manage.py export_tables
`cf ssh {source-app}`
`/tmp/lifecycle/shell`
`./manage.py export_tables`
example exporting from getgov-stable:
cf ssh getgov-stable
/tmp/lifecycle/shell
./manage.py export_tables
`cf ssh getgov-stable`
`/tmp/lifecycle/shell`
`./manage.py export_tables`
This exports a file, exported_tables.zip, to the tmp directory
@ -42,14 +41,16 @@ After exporting the file from the target environment, scp the exported_tables.zi
file from the target environment to local. Run the below commands from local.
Get passcode by running:
cf ssh-code
`cf ssh-code`
scp file from source app to local file:
scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app {source-app} --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip {local_file_path}
`scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app {source-app} --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip {local_file_path}`
when prompted, supply the passcode retrieved in the 'cf ssh-code' command
example copying from stable to local cwd:
scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-stable --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip .
`scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-stable --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip .`
`scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-stable --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip .`
### Import
@ -63,14 +64,14 @@ that there are no database conflicts on import.
In order to delete all rows from the appropriate tables, run the following
command:
cf ssh {target-app}
/tmp/lifecycle/shell
./manage.py clean_tables
`cf ssh {target-app}`
`/tmp/lifecycle/shell`
`./manage.py clean_tables`
example cleaning getgov-backup:
cf ssh getgov-backup
/tmp/lifecycle/backup
./manage.py clean_tables
`cf ssh getgov-backup`
`/tmp/lifecycle/shell`
`./manage.py clean_tables`
For reference, this deletes all rows from the following tables:
@ -96,28 +97,30 @@ with --skipEppSave option set to False. If you set to False, it will attempt to
records to the registry on load. If this is unset, or set to True, it will load the database and not
attempt to update the registry on load.
Please note that there is currently a bug (missing batch importing, see #2862) so this may not work
smoothly right now currently.
To scp the exported_tables.zip file from local to the sandbox, run the following:
Get passcode by running:
cf ssh-code
`cf ssh-code`
scp file from local to target app:
scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app {target-app} --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 {local_file_path} ssh.fr.cloud.gov:app/tmp/exported_tables.zip
`scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app {target-app} --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 {local_file_path} ssh.fr.cloud.gov:app/tmp/exported_tables.zip`
when prompted, supply the passcode retrieved in the 'cf ssh-code' command
example copy of local file in tmp to getgov-backup:
scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-backup --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 tmp/exported_tables.zip ssh.fr.cloud.gov:app/tmp/exported_tables.zip
`scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-backup --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 exported_tables.zip ssh.fr.cloud.gov:app/tmp/exported_tables.zip`
Then connect to a shell in the target environment, and run the following import command:
cf ssh {target-app}
/tmp/lifecycle/shell
./manage.py import_tables
`cf ssh {target-app}`
`/tmp/lifecycle/shell`
`./manage.py import_tables`
example cleaning getgov-backup:
cf ssh getgov-backup
/tmp/lifecycle/backup
./manage.py import_tables --no-skipEppSave
`cf ssh getgov-backup`
`/tmp/lifecycle/shell`
`./manage.py import_tables --no-skipEppSave`
For reference, this imports tables in the following order:

View file

@ -0,0 +1,66 @@
"""Test the domain rdap lookup API."""
import json
from django.contrib.auth import get_user_model
from django.test import RequestFactory
from django.test import TestCase
from ..views import rdap
API_BASE_PATH = "/api/v1/rdap/?domain="
class RdapViewTest(TestCase):
"""Test that the RDAP view function works as expected"""
def setUp(self):
super().setUp()
self.user = get_user_model().objects.create(username="username")
self.factory = RequestFactory()
def test_rdap_get_no_tld(self):
"""RDAP API successfully fetches RDAP for domain without a TLD"""
request = self.factory.get(API_BASE_PATH + "whitehouse")
request.user = self.user
response = rdap(request, domain="whitehouse")
# contains the right text
self.assertContains(response, "rdap")
# can be parsed into JSON with appropriate keys
response_object = json.loads(response.content)
self.assertIn("rdapConformance", response_object)
def test_rdap_invalid_domain(self):
"""RDAP API accepts invalid domain queries and returns JSON response
with appropriate error codes"""
request = self.factory.get(API_BASE_PATH + "whitehouse.com")
request.user = self.user
response = rdap(request, domain="whitehouse.com")
self.assertContains(response, "errorCode")
response_object = json.loads(response.content)
self.assertIn("errorCode", response_object)
class RdapAPITest(TestCase):
"""Test that the API can be called as expected."""
def setUp(self):
super().setUp()
username = "test_user"
first_name = "First"
last_name = "Last"
email = "info@example.com"
title = "title"
phone = "8080102431"
self.user = get_user_model().objects.create(
username=username, title=title, first_name=first_name, last_name=last_name, email=email, phone=phone
)
def test_rdap_get(self):
"""Can call RDAP API"""
self.client.force_login(self.user)
response = self.client.get(API_BASE_PATH + "whitehouse.gov")
self.assertContains(response, "rdap")
response_object = json.loads(response.content)
self.assertIn("rdapConformance", response_object)

View file

@ -2,7 +2,7 @@
from django.apps import apps
from django.views.decorators.http import require_http_methods
from django.http import HttpResponse
from django.http import HttpResponse, JsonResponse
from django.utils.safestring import mark_safe
from registrar.templatetags.url_helpers import public_site_url
@ -18,7 +18,7 @@ from cachetools.func import ttl_cache
from registrar.utility.s3_bucket import S3ClientError, S3ClientHelper
DOMAIN_FILE_URL = "https://raw.githubusercontent.com/cisagov/dotgov-data/main/current-full.csv"
RDAP_URL = "https://rdap.cloudflareregistry.com/rdap/domain/{domain}"
DOMAIN_API_MESSAGES = {
@ -41,30 +41,6 @@ DOMAIN_API_MESSAGES = {
}
# this file doesn't change that often, nor is it that big, so cache the result
# in memory for ten minutes
@ttl_cache(ttl=600)
def _domains():
"""Return a list of the current .gov domains.
Fetch a file from DOMAIN_FILE_URL, parse the CSV for the domain,
lowercase everything and return the list.
"""
DraftDomain = apps.get_model("registrar.DraftDomain")
# 5 second timeout
file_contents = requests.get(DOMAIN_FILE_URL, timeout=5).text
domains = set()
# skip the first line
for line in file_contents.splitlines()[1:]:
# get the domain before the first comma
domain = line.split(",", 1)[0]
# sanity-check the string we got from the file here
if DraftDomain.string_could_be_domain(domain):
# lowercase everything when we put it in domains
domains.add(domain.lower())
return domains
def check_domain_available(domain):
"""Return true if the given domain is available.
@ -99,6 +75,22 @@ def available(request, domain=""):
return json_response
@require_http_methods(["GET"])
@login_not_required
# Since we cache domain RDAP data, cache time may need to be re-evaluated this if we encounter any memory issues
@ttl_cache(ttl=600)
def rdap(request, domain=""):
"""Returns JSON dictionary of a domain's RDAP data from Cloudflare API"""
domain = request.GET.get("domain", "")
# If inputted domain doesn't have a TLD, append .gov to it
if "." not in domain:
domain = f"{domain}.gov"
rdap_data = requests.get(RDAP_URL.format(domain=domain), timeout=5).json()
return JsonResponse(rdap_data)
@require_http_methods(["GET"])
@login_not_required
def get_current_full(request, file_name="current-full.csv"):

View file

@ -126,7 +126,8 @@ html[data-theme="light"] {
body.dashboard,
body.change-list,
body.change-form,
.custom-admin-template, dt {
.custom-admin-template,
.dl-dja dt {
color: var(--body-fg);
}
.usa-table td {
@ -155,7 +156,8 @@ html[data-theme="dark"] {
body.dashboard,
body.change-list,
body.change-form,
.custom-admin-template, dt {
.custom-admin-template,
.dl-dja dt {
color: var(--body-fg);
}
.usa-table td {

View file

@ -33,7 +33,7 @@ from registrar.views.utility.api_views import (
)
from registrar.views.domains_json import get_domains_json
from registrar.views.utility import always_404
from api.views import available, get_current_federal, get_current_full
from api.views import available, rdap, get_current_federal, get_current_full
DOMAIN_REQUEST_NAMESPACE = views.DomainRequestWizard.URL_NAMESPACE
@ -200,6 +200,7 @@ urlpatterns = [
path("openid/", include("djangooidc.urls")),
path("request/", include((domain_request_urls, DOMAIN_REQUEST_NAMESPACE))),
path("api/v1/available/", available, name="available"),
path("api/v1/rdap/", rdap, name="rdap"),
path("api/v1/get-report/current-federal", get_current_federal, name="get-current-federal"),
path("api/v1/get-report/current-full", get_current_full, name="get-current-full"),
path(

View file

@ -0,0 +1,138 @@
from datetime import timedelta
from django.utils import timezone
import logging
import random
from faker import Faker
from django.db import transaction
from registrar.fixtures.fixtures_requests import DomainRequestFixture
from registrar.fixtures.fixtures_users import UserFixture
from registrar.models import User, DomainRequest
from registrar.models.domain import Domain
fake = Faker()
logger = logging.getLogger(__name__)
class DomainFixture(DomainRequestFixture):
"""Create two domains and permissions on them for each user.
One domain will have a past expiration date.
Depends on fixtures_requests.
Make sure this class' `load` method is called from `handle`
in management/commands/load.py, then use `./manage.py load`
to run this code.
"""
@classmethod
def load(cls):
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
with transaction.atomic():
try:
# Get the usernames of users created in the UserFixture
created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF]
# Filter users to only include those created by the fixture
users = list(User.objects.filter(username__in=created_usernames))
except Exception as e:
logger.warning(e)
return
# Approve each user associated with `in review` status domains
cls._approve_domain_requests(users)
@staticmethod
def _generate_fake_expiration_date(days_in_future=365):
"""Generates a fake expiration date between 1 and 365 days in the future."""
current_date = timezone.now().date() # nosec
return current_date + timedelta(days=random.randint(1, days_in_future)) # nosec
@staticmethod
def _generate_fake_expiration_date_in_past():
"""Generates a fake expiration date up to 365 days in the past."""
current_date = timezone.now().date() # nosec
return current_date + timedelta(days=random.randint(-365, -1)) # nosec
@classmethod
def _approve_request(cls, domain_request, users):
"""Helper function to approve a domain request."""
if not domain_request:
return None
if domain_request.investigator is None:
# Assign random investigator if not already assigned
domain_request.investigator = random.choice(users) # nosec
# Approve the domain request
domain_request.approve(send_email=False)
return domain_request
@classmethod
def _approve_domain_requests(cls, users):
"""Approves one current and one expired request per user."""
domain_requests_to_update = []
expired_requests = []
for user in users:
# Get the latest and second-to-last domain requests
domain_requests = DomainRequest.objects.filter(
creator=user, status=DomainRequest.DomainRequestStatus.IN_REVIEW
).order_by("-id")[:2]
# Latest domain request
domain_request = domain_requests[0] if domain_requests else None
# Second-to-last domain request (expired)
domain_request_expired = domain_requests[1] if len(domain_requests) > 1 else None
# Approve the current domain request
if domain_request:
cls._approve_request(domain_request, users)
domain_requests_to_update.append(domain_request)
# Approve the expired domain request
if domain_request_expired:
cls._approve_request(domain_request_expired, users)
domain_requests_to_update.append(domain_request_expired)
expired_requests.append(domain_request_expired)
# Perform bulk update for the domain requests
cls._bulk_update_requests(domain_requests_to_update)
# Retrieve all domains associated with the domain requests
domains_to_update = Domain.objects.filter(domain_info__domain_request__in=domain_requests_to_update)
# Loop through and update expiration dates for domains
for domain in domains_to_update:
domain_request = domain.domain_info.domain_request
# Set the expiration date based on whether the request is expired
if domain_request in expired_requests:
domain.expiration_date = cls._generate_fake_expiration_date_in_past()
else:
domain.expiration_date = cls._generate_fake_expiration_date()
# Perform bulk update for the domains
cls._bulk_update_domains(domains_to_update)
@classmethod
def _bulk_update_requests(cls, domain_requests_to_update):
"""Bulk update domain requests."""
if domain_requests_to_update:
try:
DomainRequest.objects.bulk_update(domain_requests_to_update, ["status", "investigator"])
logger.info(f"Successfully updated {len(domain_requests_to_update)} requests.")
except Exception as e:
logger.error(f"Unexpected error during requests bulk update: {e}")
@classmethod
def _bulk_update_domains(cls, domains_to_update):
"""Bulk update domains with expiration dates."""
if domains_to_update:
try:
Domain.objects.bulk_update(domains_to_update, ["expiration_date"])
logger.info(f"Successfully updated {len(domains_to_update)} domains.")
except Exception as e:
logger.error(f"Unexpected error during domains bulk update: {e}")

View file

@ -0,0 +1,125 @@
import logging
import random
from faker import Faker
from django.db import transaction
from registrar.models import User, DomainRequest, FederalAgency
from registrar.models.portfolio import Portfolio
from registrar.models.senior_official import SeniorOfficial
fake = Faker()
logger = logging.getLogger(__name__)
class PortfolioFixture:
"""
Creates 2 pre-defined portfolios with the infrastructure to add more.
Make sure this class' `load` method is called from `handle`
in management/commands/load.py, then use `./manage.py load`
to run this code.
"""
PORTFOLIOS = [
{
"organization_name": "Hotel California",
},
{
"organization_name": "Wish You Were Here",
},
]
@classmethod
def fake_so(cls):
return {
"first_name": fake.first_name(),
"last_name": fake.last_name(),
"title": fake.job(),
"email": fake.ascii_safe_email(),
"phone": "201-555-5555",
}
@classmethod
def _set_non_foreign_key_fields(cls, portfolio: Portfolio, portfolio_dict: dict):
"""Helper method used by `load`."""
portfolio.organization_type = (
portfolio_dict["organization_type"]
if "organization_type" in portfolio_dict
else DomainRequest.OrganizationChoices.FEDERAL
)
portfolio.notes = portfolio_dict["notes"] if "notes" in portfolio_dict else None
portfolio.address_line1 = (
portfolio_dict["address_line1"] if "address_line1" in portfolio_dict else fake.street_address()
)
portfolio.address_line2 = portfolio_dict["address_line2"] if "address_line2" in portfolio_dict else None
portfolio.city = portfolio_dict["city"] if "city" in portfolio_dict else fake.city()
portfolio.state_territory = (
portfolio_dict["state_territory"] if "state_territory" in portfolio_dict else fake.state_abbr()
)
portfolio.zipcode = portfolio_dict["zipcode"] if "zipcode" in portfolio_dict else fake.postalcode()
portfolio.urbanization = portfolio_dict["urbanization"] if "urbanization" in portfolio_dict else None
portfolio.security_contact_email = (
portfolio_dict["security_contact_email"] if "security_contact_email" in portfolio_dict else fake.email()
)
@classmethod
def _set_foreign_key_fields(cls, portfolio: Portfolio, portfolio_dict: dict, user: User):
"""Helper method used by `load`."""
if not portfolio.senior_official:
if portfolio_dict.get("senior_official") is not None:
portfolio.senior_official, _ = SeniorOfficial.objects.get_or_create(**portfolio_dict["senior_official"])
else:
portfolio.senior_official = SeniorOfficial.objects.create(**cls.fake_so())
if not portfolio.federal_agency:
if portfolio_dict.get("federal_agency") is not None:
portfolio.federal_agency, _ = FederalAgency.objects.get_or_create(name=portfolio_dict["federal_agency"])
else:
federal_agencies = FederalAgency.objects.all()
# Random choice of agency for selects, used as placeholders for testing.
portfolio.federal_agency = random.choice(federal_agencies) # nosec
@classmethod
def load(cls):
"""Creates portfolios."""
logger.info("Going to load %s portfolios" % len(cls.PORTFOLIOS))
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
with transaction.atomic():
try:
user = User.objects.all().last()
except Exception as e:
logger.warning(e)
return
portfolios_to_create = []
for portfolio_data in cls.PORTFOLIOS:
organization_name = portfolio_data["organization_name"]
# Check if portfolio with the organization name already exists
if Portfolio.objects.filter(organization_name=organization_name).exists():
logger.info(
f"Portfolio with organization name '{organization_name}' already exists, skipping creation."
)
continue
try:
portfolio = Portfolio(
creator=user,
organization_name=portfolio_data["organization_name"],
)
cls._set_non_foreign_key_fields(portfolio, portfolio_data)
cls._set_foreign_key_fields(portfolio, portfolio_data, user)
portfolios_to_create.append(portfolio)
except Exception as e:
logger.warning(e)
# Bulk create domain requests
if len(portfolios_to_create) > 0:
try:
Portfolio.objects.bulk_create(portfolios_to_create)
logger.info(f"Successfully created {len(portfolios_to_create)} portfolios")
except Exception as e:
logger.warning(f"Error bulk creating portfolios: {e}")

View file

@ -0,0 +1,325 @@
from datetime import timedelta
from django.utils import timezone
import logging
import random
from faker import Faker
from django.db import transaction
from registrar.fixtures.fixtures_portfolios import PortfolioFixture
from registrar.fixtures.fixtures_users import UserFixture
from registrar.models import User, DomainRequest, DraftDomain, Contact, Website, FederalAgency
from registrar.models.portfolio import Portfolio
from registrar.models.suborganization import Suborganization
fake = Faker()
logger = logging.getLogger(__name__)
class DomainRequestFixture:
"""
Creates domain requests for each user in the database,
assign portfolios and suborgs.
Creates 3 in_review requests, one for approving with an expired domain,
one for approving with a non-expired domain, and one for leaving in in_review.
Depends on fixtures_portfolios and fixtures_suborganizations.
Make sure this class' `load` method is called from `handle`
in management/commands/load.py, then use `./manage.py load`
to run this code.
"""
# any fields not specified here will be filled in with fake data or defaults
# NOTE BENE: each fixture must have `organization_name` for uniqueness!
# Here is a more complete example as a template:
# {
# "status": "started",
# "organization_name": "Example - Just started",
# "generic_org_type": "federal",
# "federal_agency": None,
# "federal_type": None,
# "address_line1": None,
# "address_line2": None,
# "city": None,
# "state_territory": None,
# "zipcode": None,
# "urbanization": None,
# "purpose": None,
# "anything_else": None,
# "is_policy_acknowledged": None,
# "senior_official": None,
# "other_contacts": [],
# "current_websites": [],
# "alternative_domains": [],
# },
DOMAINREQUESTS = [
{
"status": DomainRequest.DomainRequestStatus.STARTED,
"organization_name": "Example - Finished but not submitted",
},
{
"status": DomainRequest.DomainRequestStatus.SUBMITTED,
"organization_name": "Example - Submitted but pending investigation",
},
{
"status": DomainRequest.DomainRequestStatus.IN_REVIEW,
"organization_name": "Example - In investigation",
},
{
"status": DomainRequest.DomainRequestStatus.IN_REVIEW,
"organization_name": "Example - Approved",
},
{
"status": DomainRequest.DomainRequestStatus.IN_REVIEW,
"organization_name": "Example - Approved, domain expired",
},
{
"status": DomainRequest.DomainRequestStatus.WITHDRAWN,
"organization_name": "Example - Withdrawn",
},
{
"status": DomainRequest.DomainRequestStatus.ACTION_NEEDED,
"organization_name": "Example - Action needed",
},
{
"status": "rejected",
"organization_name": "Example - Rejected",
},
]
@classmethod
def fake_contact(cls):
return {
"first_name": fake.first_name(),
"middle_name": None,
"last_name": fake.last_name(),
"title": fake.job(),
"email": fake.ascii_safe_email(),
"phone": "201-555-5555",
}
@classmethod
def fake_dot_gov(cls):
return f"{fake.slug()}.gov"
@classmethod
def fake_expiration_date(cls):
"""Generates a fake expiration date between 0 and 1 year in the future."""
current_date = timezone.now().date()
days_in_future = random.randint(0, 365) # nosec
return current_date + timedelta(days=days_in_future)
@classmethod
def _set_non_foreign_key_fields(cls, request: DomainRequest, request_dict: dict):
"""Helper method used by `load`."""
request.status = request_dict["status"] if "status" in request_dict else "started"
# TODO for a future ticket: Allow for more than just "federal" here
request.generic_org_type = request_dict["generic_org_type"] if "generic_org_type" in request_dict else "federal"
if request.status != "started":
request.last_submitted_date = fake.date()
request.federal_type = (
request_dict["federal_type"]
if "federal_type" in request_dict
else random.choice(["executive", "judicial", "legislative"]) # nosec
)
request.address_line1 = (
request_dict["address_line1"] if "address_line1" in request_dict else fake.street_address()
)
request.address_line2 = request_dict["address_line2"] if "address_line2" in request_dict else None
request.city = request_dict["city"] if "city" in request_dict else fake.city()
request.state_territory = (
request_dict["state_territory"] if "state_territory" in request_dict else fake.state_abbr()
)
request.zipcode = request_dict["zipcode"] if "zipcode" in request_dict else fake.postalcode()
request.urbanization = request_dict["urbanization"] if "urbanization" in request_dict else None
request.purpose = request_dict["purpose"] if "purpose" in request_dict else fake.paragraph()
request.has_cisa_representative = (
request_dict["has_cisa_representative"] if "has_cisa_representative" in request_dict else True
)
request.cisa_representative_email = (
request_dict["cisa_representative_email"] if "cisa_representative_email" in request_dict else fake.email()
)
request.cisa_representative_first_name = (
request_dict["cisa_representative_first_name"]
if "cisa_representative_first_name" in request_dict
else fake.first_name()
)
request.cisa_representative_last_name = (
request_dict["cisa_representative_last_name"]
if "cisa_representative_last_name" in request_dict
else fake.last_name()
)
request.has_anything_else_text = (
request_dict["has_anything_else_text"] if "has_anything_else_text" in request_dict else True
)
request.anything_else = request_dict["anything_else"] if "anything_else" in request_dict else fake.paragraph()
request.is_policy_acknowledged = (
request_dict["is_policy_acknowledged"] if "is_policy_acknowledged" in request_dict else True
)
@classmethod
def _set_foreign_key_fields(cls, request: DomainRequest, request_dict: dict, user: User):
"""Helper method used by `load`."""
request.investigator = cls._get_investigator(request, request_dict, user)
request.senior_official = cls._get_senior_official(request, request_dict)
request.requested_domain = cls._get_requested_domain(request, request_dict)
request.federal_agency = cls._get_federal_agency(request, request_dict)
request.portfolio = cls._get_portfolio(request, request_dict)
request.sub_organization = cls._get_sub_organization(request, request_dict)
@classmethod
def _get_investigator(cls, request: DomainRequest, request_dict: dict, user: User):
if not request.investigator:
return User.objects.get(username=user.username) if "investigator" in request_dict else None
return request.investigator
@classmethod
def _get_senior_official(cls, request: DomainRequest, request_dict: dict):
if not request.senior_official:
if "senior_official" in request_dict and request_dict["senior_official"] is not None:
return Contact.objects.get_or_create(**request_dict["senior_official"])[0]
return Contact.objects.create(**cls.fake_contact())
return request.senior_official
@classmethod
def _get_requested_domain(cls, request: DomainRequest, request_dict: dict):
if not request.requested_domain:
if "requested_domain" in request_dict and request_dict["requested_domain"] is not None:
return DraftDomain.objects.get_or_create(name=request_dict["requested_domain"])[0]
return DraftDomain.objects.create(name=cls.fake_dot_gov())
return request.requested_domain
@classmethod
def _get_federal_agency(cls, request: DomainRequest, request_dict: dict):
if not request.federal_agency:
if "federal_agency" in request_dict and request_dict["federal_agency"] is not None:
return FederalAgency.objects.get_or_create(name=request_dict["federal_agency"])[0]
return random.choice(FederalAgency.objects.all()) # nosec
return request.federal_agency
@classmethod
def _get_portfolio(cls, request: DomainRequest, request_dict: dict):
if not request.portfolio:
if "portfolio" in request_dict and request_dict["portfolio"] is not None:
return Portfolio.objects.get_or_create(name=request_dict["portfolio"])[0]
return cls._get_random_portfolio()
return request.portfolio
@classmethod
def _get_sub_organization(cls, request: DomainRequest, request_dict: dict):
if not request.sub_organization:
if "sub_organization" in request_dict and request_dict["sub_organization"] is not None:
return Suborganization.objects.get_or_create(name=request_dict["sub_organization"])[0]
return cls._get_random_sub_organization()
return request.sub_organization
@classmethod
def _get_random_portfolio(cls):
try:
organization_names = [portfolio["organization_name"] for portfolio in PortfolioFixture.PORTFOLIOS]
portfolio_options = Portfolio.objects.filter(organization_name__in=organization_names)
return random.choice(portfolio_options) if portfolio_options.exists() else None # nosec
except Exception as e:
logger.warning(f"Expected fixture portfolio, did not find it: {e}")
return None
@classmethod
def _get_random_sub_organization(cls):
try:
suborg_options = [Suborganization.objects.first(), Suborganization.objects.last()]
return random.choice(suborg_options) # nosec
except Exception as e:
logger.warning(f"Expected fixture sub_organization, did not find it: {e}")
return None
@classmethod
def _set_many_to_many_relations(cls, request: DomainRequest, request_dict: dict):
"""Helper method used by `load`."""
if "other_contacts" in request_dict:
for contact in request_dict["other_contacts"]:
request.other_contacts.add(Contact.objects.get_or_create(**contact)[0])
elif not request.other_contacts.exists():
other_contacts = [
Contact.objects.create(**cls.fake_contact()) for _ in range(random.randint(1, 3)) # nosec
]
request.other_contacts.add(*other_contacts)
if "current_websites" in request_dict:
for website in request_dict["current_websites"]:
request.current_websites.add(Website.objects.get_or_create(website=website)[0])
elif not request.current_websites.exists():
current_websites = [
Website.objects.create(website=fake.uri()) for _ in range(random.randint(0, 3)) # nosec
]
request.current_websites.add(*current_websites)
if "alternative_domains" in request_dict:
for domain in request_dict["alternative_domains"]:
request.alternative_domains.add(Website.objects.get_or_create(website=domain)[0])
elif not request.alternative_domains.exists():
alternative_domains = [
Website.objects.create(website=cls.fake_dot_gov()) for _ in range(random.randint(0, 3)) # nosec
]
request.alternative_domains.add(*alternative_domains)
@classmethod
def load(cls):
"""Creates domain requests for each user in the database."""
logger.info("Going to load %s domain requests" % len(cls.DOMAINREQUESTS))
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
with transaction.atomic():
try:
# Get the usernames of users created in the UserFixture
created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF]
# Filter users to only include those created by the fixture
users = list(User.objects.filter(username__in=created_usernames))
except Exception as e:
logger.warning(e)
return
cls._create_domain_requests(users)
@classmethod
def _create_domain_requests(cls, users):
"""Creates DomainRequests given a list of users."""
domain_requests_to_create = []
for user in users:
for request_data in cls.DOMAINREQUESTS:
# Prepare DomainRequest objects
try:
domain_request = DomainRequest(
creator=user,
organization_name=request_data["organization_name"],
)
cls._set_non_foreign_key_fields(domain_request, request_data)
cls._set_foreign_key_fields(domain_request, request_data, user)
domain_requests_to_create.append(domain_request)
except Exception as e:
logger.warning(e)
# Bulk create domain requests
cls._bulk_create_requests(domain_requests_to_create)
# Now many-to-many relationships
for domain_request in domain_requests_to_create:
try:
cls._set_many_to_many_relations(domain_request, request_data)
except Exception as e:
logger.warning(e)
@classmethod
def _bulk_create_requests(cls, domain_requests_to_create):
"""Bulk create domain requests."""
if len(domain_requests_to_create) > 0:
try:
DomainRequest.objects.bulk_create(domain_requests_to_create)
logger.info(f"Successfully created {len(domain_requests_to_create)} requests.")
except Exception as e:
logger.error(f"Unexpected error during requests bulk creation: {e}")

View file

@ -0,0 +1,87 @@
import logging
from faker import Faker
from django.db import transaction
from registrar.models.portfolio import Portfolio
from registrar.models.suborganization import Suborganization
fake = Faker()
logger = logging.getLogger(__name__)
class SuborganizationFixture:
"""
Creates 2 pre-defined suborg with the infrastructure to add more.
Depends on fixtures_portfolios.
Make sure this class' `load` method is called from `handle`
in management/commands/load.py, then use `./manage.py load`
to run this code.
"""
SUBORGS = [
{
"name": "Take it Easy",
},
{
"name": "Welcome to the Machine",
},
]
@classmethod
def load(cls):
"""Creates suborganizations."""
logger.info(f"Going to load {len(cls.SUBORGS)} suborgs")
with transaction.atomic():
portfolios = cls._get_portfolios()
if not portfolios:
return
suborgs_to_create = cls._prepare_suborgs_to_create(portfolios)
cls._bulk_create_suborgs(suborgs_to_create)
@classmethod
def _get_portfolios(cls):
"""Fetches portfolios with organization_name 'Hotel California' and 'Wish You Were Here'
and logs warnings if not found."""
try:
portfolio1 = Portfolio.objects.filter(organization_name="Hotel California").first()
portfolio2 = Portfolio.objects.filter(organization_name="Wish You Were Here").first()
if not portfolio1 or not portfolio2:
logger.warning("One or both portfolios not found.")
return None
return portfolio1, portfolio2
except Exception as e:
logger.warning(f"Error fetching portfolios: {e}")
return None
@classmethod
def _prepare_suborgs_to_create(cls, portfolios):
"""Prepares a list of suborganizations to create, avoiding duplicates."""
portfolio1, portfolio2 = portfolios
suborgs_to_create = []
try:
if not Suborganization.objects.filter(name=cls.SUBORGS[0]["name"]).exists():
suborgs_to_create.append(Suborganization(portfolio=portfolio1, name=cls.SUBORGS[0]["name"]))
if not Suborganization.objects.filter(name=cls.SUBORGS[1]["name"]).exists():
suborgs_to_create.append(Suborganization(portfolio=portfolio2, name=cls.SUBORGS[1]["name"]))
except Exception as e:
logger.warning(f"Error creating suborg objects: {e}")
return suborgs_to_create
@classmethod
def _bulk_create_suborgs(cls, suborgs_to_create):
"""Bulk creates suborganizations and logs success or errors."""
if suborgs_to_create:
try:
Suborganization.objects.bulk_create(suborgs_to_create)
logger.info(f"Successfully created {len(suborgs_to_create)} suborgs")
except Exception as e:
logger.warning(f"Error bulk creating suborgs: {e}")

View file

@ -0,0 +1,86 @@
import logging
from faker import Faker
from django.db import transaction
from registrar.fixtures.fixtures_portfolios import PortfolioFixture
from registrar.fixtures.fixtures_users import UserFixture
from registrar.models import User
from registrar.models.portfolio import Portfolio
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
fake = Faker()
logger = logging.getLogger(__name__)
class UserPortfolioPermissionFixture:
"""Create user portfolio permissions for each user.
Each user will be admin on 2 portfolios.
Depends on fixture_portfolios"""
@classmethod
def load(cls):
logger.info("Going to set user portfolio permissions")
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
with transaction.atomic():
try:
# Get the usernames of users created in the UserFixture
created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF]
# Filter users to only include those created by the fixture
users = list(User.objects.filter(username__in=created_usernames))
organization_names = [portfolio["organization_name"] for portfolio in PortfolioFixture.PORTFOLIOS]
portfolios = list(Portfolio.objects.filter(organization_name__in=organization_names))
if not users:
logger.warning("User fixtures missing.")
return
if not portfolios:
logger.warning("Portfolio fixtures missing.")
return
except Exception as e:
logger.warning(e)
return
user_portfolio_permissions_to_create = []
for user in users:
for portfolio in portfolios:
try:
if not UserPortfolioPermission.objects.filter(user=user, portfolio=portfolio).exists():
user_portfolio_permission = UserPortfolioPermission(
user=user,
portfolio=portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
)
user_portfolio_permissions_to_create.append(user_portfolio_permission)
else:
logger.info(
f"Permission exists for user '{user.username}' "
f"on portfolio '{portfolio.organization_name}'."
)
except Exception as e:
logger.warning(e)
# Bulk create permissions
cls._bulk_create_permissions(user_portfolio_permissions_to_create)
@classmethod
def _bulk_create_permissions(cls, user_portfolio_permissions_to_create):
"""Bulk creates permissions and logs success or errors."""
if user_portfolio_permissions_to_create:
try:
UserPortfolioPermission.objects.bulk_create(user_portfolio_permissions_to_create)
logger.info(
f"Successfully created {len(user_portfolio_permissions_to_create)} user portfolio permissions."
)
except Exception as e:
logger.error(f"Unexpected error during portfolio permission bulk creation: {e}")
else:
logger.info("No new user portfolio permissions to create.")

View file

@ -23,129 +23,123 @@ class UserFixture:
"""
ADMINS = [
{
"username": "43a7fa8d-0550-4494-a6fe-81500324d590",
"first_name": "Jyoti",
"last_name": "Bock",
"email": "jyotibock@truss.works",
},
{
"username": "aad084c3-66cc-4632-80eb-41cdf5c5bcbf",
"first_name": "Aditi",
"last_name": "Green",
"email": "aditidevelops+01@gmail.com",
"title": "Positive vibes",
},
{
"username": "be17c826-e200-4999-9389-2ded48c43691",
"first_name": "Matthew",
"last_name": "Spence",
"title": "Hollywood hair",
},
{
"username": "5f283494-31bd-49b5-b024-a7e7cae00848",
"first_name": "Rachid",
"last_name": "Mrad",
"email": "rachid.mrad@associates.cisa.dhs.gov",
"title": "Common pirate",
},
{
"username": "eb2214cd-fc0c-48c0-9dbd-bc4cd6820c74",
"first_name": "Alysia",
"last_name": "Broddrick",
"email": "abroddrick@truss.works",
"title": "I drink coffee and know things",
},
{
"username": "8f8e7293-17f7-4716-889b-1990241cbd39",
"first_name": "Katherine",
"last_name": "Osos",
"email": "kosos@truss.works",
"title": "Grove keeper",
},
{
"username": "70488e0a-e937-4894-a28c-16f5949effd4",
"first_name": "Gaby",
"last_name": "DiSarli",
"email": "gaby@truss.works",
"title": "De Stijl",
},
{
"username": "83c2b6dd-20a2-4cac-bb40-e22a72d2955c",
"first_name": "Cameron",
"last_name": "Dixon",
"email": "cameron.dixon@cisa.dhs.gov",
},
{
"username": "0353607a-cbba-47d2-98d7-e83dcd5b90ea",
"first_name": "Ryan",
"last_name": "Brooks",
"title": "Product owner",
},
{
"username": "30001ee7-0467-4df2-8db2-786e79606060",
"first_name": "Zander",
"last_name": "Adkinson",
"title": "ACME specialist",
},
{
"username": "2bf518c2-485a-4c42-ab1a-f5a8b0a08484",
"first_name": "Paul",
"last_name": "Kuykendall",
"title": "Dr. Silvertongue",
},
{
"username": "2a88a97b-be96-4aad-b99e-0b605b492c78",
"first_name": "Rebecca",
"last_name": "Hsieh",
"email": "rebecca.hsieh@truss.works",
"title": "Catlady",
},
{
"username": "fa69c8e8-da83-4798-a4f2-263c9ce93f52",
"first_name": "David",
"last_name": "Kennedy",
"email": "david.kennedy@ecstech.com",
"title": "Mean lean coding machine",
},
{
"username": "f14433d8-f0e9-41bf-9c72-b99b110e665d",
"first_name": "Nicolle",
"last_name": "LeClair",
"email": "nicolle.leclair@ecstech.com",
"title": "Nightowl",
},
{
"username": "24840450-bf47-4d89-8aa9-c612fe68f9da",
"first_name": "Erin",
"last_name": "Song",
"title": "Catlady 2",
},
{
"username": "e0ea8b94-6e53-4430-814a-849a7ca45f21",
"first_name": "Kristina",
"last_name": "Yin",
"title": "Hufflepuff prefect",
},
{
"username": "ac49d7c1-368a-4e6b-8f1d-60250e20a16f",
"first_name": "Vicky",
"last_name": "Chin",
"email": "szu.chin@associates.cisa.dhs.gov",
"title": "Ze whip",
},
{
"username": "66bb1a5a-a091-4d7f-a6cf-4d772b4711c7",
"first_name": "Christina",
"last_name": "Burnett",
"email": "christina.burnett@cisa.dhs.gov",
},
{
"username": "012f844d-8a0f-4225-9d82-cbf87bff1d3e",
"first_name": "Riley",
"last_name": "Orr",
"email": "riley+320@truss.works",
"title": "Groovy",
},
{
"username": "76612d84-66b0-4ae9-9870-81e98b9858b6",
"first_name": "Anna",
"last_name": "Gingle",
"email": "annagingle@truss.works",
"title": "Sweetwater sailor",
},
]
STAFF = [
{
"username": "a5906815-dd80-4c64-aebe-2da6a4c9d7a4",
"first_name": "Jyoti-Analyst",
"last_name": "Bock-Analyst",
"email": "jyotibock+1@truss.works",
},
{
"username": "ffec5987-aa84-411b-a05a-a7ee5cbcde54",
"first_name": "Aditi-Analyst",
@ -231,18 +225,6 @@ class UserFixture:
"last_name": "Burnett-Analyst",
"email": "christina.burnett@gwe.cisa.dhs.gov",
},
{
"username": "d9839768-0c17-4fa2-9c8e-36291eef5c11",
"first_name": "Alex-Analyst",
"last_name": "Mcelya-Analyst",
"email": "ALEXANDER.MCELYA@cisa.dhs.gov",
},
{
"username": "082a066f-e0a4-45f6-8672-4343a1208a36",
"first_name": "Riley-Analyst",
"last_name": "Orr-Analyst",
"email": "riley+321@truss.works",
},
{
"username": "e1e350b1-cfc1-4753-a6cb-3ae6d912f99c",
"first_name": "Anna-Analyst",
@ -254,29 +236,61 @@ class UserFixture:
# Additional emails to add to the AllowedEmail whitelist.
ADDITIONAL_ALLOWED_EMAILS: list[str] = ["davekenn4242@gmail.com", "rachid_mrad@hotmail.com"]
@classmethod
def load_users(cls, users, group_name, are_superusers=False):
logger.info(f"Going to load {len(users)} users in group {group_name}")
for user_data in users:
"""Loads the users into the database and assigns them to the specified group."""
logger.info(f"Going to load {len(users)} users for group {group_name}")
group = UserGroup.objects.get(name=group_name)
# Prepare sets of existing usernames and IDs in one query
user_identifiers = [(user.get("username"), user.get("id")) for user in users]
existing_users = User.objects.filter(
username__in=[user[0] for user in user_identifiers] + [user[1] for user in user_identifiers]
).values_list("username", "id")
existing_usernames = set(user[0] for user in existing_users)
existing_user_ids = set(user[1] for user in existing_users)
# Filter out users with existing IDs or usernames
new_users = [
User(
id=user_data.get("id"),
first_name=user_data.get("first_name"),
last_name=user_data.get("last_name"),
username=user_data.get("username"),
email=user_data.get("email", ""),
title=user_data.get("title", "Peon"),
phone=user_data.get("phone", "2022222222"),
is_active=user_data.get("is_active", True),
is_staff=True,
is_superuser=are_superusers,
)
for user_data in users
if user_data.get("username") not in existing_usernames and user_data.get("id") not in existing_user_ids
]
# Perform bulk creation for new users
if new_users:
try:
user, _ = User.objects.get_or_create(username=user_data["username"])
user.is_superuser = are_superusers
user.first_name = user_data["first_name"]
user.last_name = user_data["last_name"]
if "email" in user_data:
user.email = user_data["email"]
user.is_staff = True
user.is_active = True
# This verification type will get reverted to "regular" (or whichever is applicables)
# once the user logs in for the first time (as they then got verified through different means).
# In the meantime, we can still describe how the user got here in the first place.
user.verification_type = User.VerificationTypeChoices.FIXTURE_USER
group = UserGroup.objects.get(name=group_name)
user.groups.add(group)
user.save()
logger.debug(f"User object created for {user_data['first_name']}")
User.objects.bulk_create(new_users)
logger.info(f"Created {len(new_users)} new users.")
except Exception as e:
logger.warning(e)
logger.info(f"All users in group {group_name} loaded.")
logger.error(f"Unexpected error during user bulk creation: {e}")
else:
logger.info("No new users to create.")
# Get all users to be updated (both new and existing)
created_or_existing_users = User.objects.filter(username__in=[user.get("username") for user in users])
# Filter out users who are already in the group
users_not_in_group = created_or_existing_users.exclude(groups__id=group.id)
# Add only users who are not already in the group
if users_not_in_group.exists():
group.user_set.add(*users_not_in_group)
logger.info(f"Users loaded for group {group_name}.")
def load_allowed_emails(cls, users, additional_emails):
"""Populates a whitelist of allowed emails (as defined in this list)"""
@ -284,37 +298,33 @@ class UserFixture:
if additional_emails:
logger.info(f"Going to load {len(additional_emails)} additional allowed emails")
# Load user emails
allowed_emails = []
existing_emails = set(AllowedEmail.objects.values_list("email", flat=True))
new_allowed_emails = []
for user_data in users:
user_email = user_data.get("email")
if user_email and user_email not in allowed_emails:
allowed_emails.append(AllowedEmail(email=user_email))
else:
first_name = user_data.get("first_name")
last_name = user_data.get("last_name")
logger.warning(f"Could not add email to whitelist for {first_name} {last_name}.")
if user_email and user_email not in existing_emails:
new_allowed_emails.append(AllowedEmail(email=user_email))
# Load additional emails
allowed_emails.extend([AllowedEmail(email=email) for email in additional_emails])
# Load additional emails, only if they don't exist already
for email in additional_emails:
if email not in existing_emails:
new_allowed_emails.append(AllowedEmail(email=email))
if allowed_emails:
AllowedEmail.objects.bulk_create(allowed_emails)
logger.info(f"Loaded {len(allowed_emails)} allowed emails")
if new_allowed_emails:
try:
AllowedEmail.objects.bulk_create(new_allowed_emails)
logger.info(f"Loaded {len(new_allowed_emails)} allowed emails")
except Exception as e:
logger.error(f"Unexpected error during allowed emails bulk creation: {e}")
else:
logger.info("No allowed emails to load")
@classmethod
def load(cls):
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
# This is slightly different then bulk_create or bulk_update, in that
# you still get the same behaviour of .save(), but those incremental
# steps now do not need to close/reopen a db connection,
# instead they share one.
with transaction.atomic():
cls.load_users(cls, cls.ADMINS, "full_access_group", are_superusers=True)
cls.load_users(cls, cls.STAFF, "cisa_analysts_group")
cls.load_users(cls.ADMINS, "full_access_group", are_superusers=True)
cls.load_users(cls.STAFF, "cisa_analysts_group")
# Combine ADMINS and STAFF lists
all_users = cls.ADMINS + cls.STAFF

View file

@ -1,236 +0,0 @@
import logging
import random
from faker import Faker
from django.db import transaction
from registrar.models import User, DomainRequest, DraftDomain, Contact, Website, FederalAgency
fake = Faker()
logger = logging.getLogger(__name__)
class DomainRequestFixture:
"""
Load domain requests into the database.
Make sure this class' `load` method is called from `handle`
in management/commands/load.py, then use `./manage.py load`
to run this code.
"""
# any fields not specified here will be filled in with fake data or defaults
# NOTE BENE: each fixture must have `organization_name` for uniqueness!
# Here is a more complete example as a template:
# {
# "status": "started",
# "organization_name": "Example - Just started",
# "generic_org_type": "federal",
# "federal_agency": None,
# "federal_type": None,
# "address_line1": None,
# "address_line2": None,
# "city": None,
# "state_territory": None,
# "zipcode": None,
# "urbanization": None,
# "purpose": None,
# "anything_else": None,
# "is_policy_acknowledged": None,
# "senior_official": None,
# "other_contacts": [],
# "current_websites": [],
# "alternative_domains": [],
# },
DA = [
{
"status": DomainRequest.DomainRequestStatus.STARTED,
"organization_name": "Example - Finished but not submitted",
},
{
"status": DomainRequest.DomainRequestStatus.SUBMITTED,
"organization_name": "Example - Submitted but pending investigation",
},
{
"status": DomainRequest.DomainRequestStatus.IN_REVIEW,
"organization_name": "Example - In investigation",
},
{
"status": DomainRequest.DomainRequestStatus.IN_REVIEW,
"organization_name": "Example - Approved",
},
{
"status": DomainRequest.DomainRequestStatus.WITHDRAWN,
"organization_name": "Example - Withdrawn",
},
{
"status": DomainRequest.DomainRequestStatus.ACTION_NEEDED,
"organization_name": "Example - Action needed",
},
{
"status": "rejected",
"organization_name": "Example - Rejected",
},
]
@classmethod
def fake_contact(cls):
return {
"first_name": fake.first_name(),
"middle_name": None,
"last_name": fake.last_name(),
"title": fake.job(),
"email": fake.ascii_safe_email(),
"phone": "201-555-5555",
}
@classmethod
def fake_dot_gov(cls):
return f"{fake.slug()}.gov"
@classmethod
def _set_non_foreign_key_fields(cls, da: DomainRequest, app: dict):
"""Helper method used by `load`."""
da.status = app["status"] if "status" in app else "started"
# TODO for a future ticket: Allow for more than just "federal" here
da.generic_org_type = app["generic_org_type"] if "generic_org_type" in app else "federal"
da.last_submitted_date = fake.date()
da.federal_type = (
app["federal_type"]
if "federal_type" in app
else random.choice(["executive", "judicial", "legislative"]) # nosec
)
da.address_line1 = app["address_line1"] if "address_line1" in app else fake.street_address()
da.address_line2 = app["address_line2"] if "address_line2" in app else None
da.city = app["city"] if "city" in app else fake.city()
da.state_territory = app["state_territory"] if "state_territory" in app else fake.state_abbr()
da.zipcode = app["zipcode"] if "zipcode" in app else fake.postalcode()
da.urbanization = app["urbanization"] if "urbanization" in app else None
da.purpose = app["purpose"] if "purpose" in app else fake.paragraph()
da.anything_else = app["anything_else"] if "anything_else" in app else None
da.is_policy_acknowledged = app["is_policy_acknowledged"] if "is_policy_acknowledged" in app else True
@classmethod
def _set_foreign_key_fields(cls, da: DomainRequest, app: dict, user: User):
"""Helper method used by `load`."""
if not da.investigator:
da.investigator = User.objects.get(username=user.username) if "investigator" in app else None
if not da.senior_official:
if "senior_official" in app and app["senior_official"] is not None:
da.senior_official, _ = Contact.objects.get_or_create(**app["senior_official"])
else:
da.senior_official = Contact.objects.create(**cls.fake_contact())
if not da.requested_domain:
if "requested_domain" in app and app["requested_domain"] is not None:
da.requested_domain, _ = DraftDomain.objects.get_or_create(name=app["requested_domain"])
else:
da.requested_domain = DraftDomain.objects.create(name=cls.fake_dot_gov())
if not da.federal_agency:
if "federal_agency" in app and app["federal_agency"] is not None:
da.federal_agency, _ = FederalAgency.objects.get_or_create(name=app["federal_agency"])
else:
federal_agencies = FederalAgency.objects.all()
# Random choice of agency for selects, used as placeholders for testing.
da.federal_agency = random.choice(federal_agencies) # nosec
@classmethod
def _set_many_to_many_relations(cls, da: DomainRequest, app: dict):
"""Helper method used by `load`."""
if "other_contacts" in app:
for contact in app["other_contacts"]:
da.other_contacts.add(Contact.objects.get_or_create(**contact)[0])
elif not da.other_contacts.exists():
other_contacts = [
Contact.objects.create(**cls.fake_contact()) for _ in range(random.randint(0, 3)) # nosec
]
da.other_contacts.add(*other_contacts)
if "current_websites" in app:
for website in app["current_websites"]:
da.current_websites.add(Website.objects.get_or_create(website=website)[0])
elif not da.current_websites.exists():
current_websites = [
Website.objects.create(website=fake.uri()) for _ in range(random.randint(0, 3)) # nosec
]
da.current_websites.add(*current_websites)
if "alternative_domains" in app:
for domain in app["alternative_domains"]:
da.alternative_domains.add(Website.objects.get_or_create(website=domain)[0])
elif not da.alternative_domains.exists():
alternative_domains = [
Website.objects.create(website=cls.fake_dot_gov()) for _ in range(random.randint(0, 3)) # nosec
]
da.alternative_domains.add(*alternative_domains)
@classmethod
def load(cls):
"""Creates domain requests for each user in the database."""
logger.info("Going to load %s domain requests" % len(cls.DA))
try:
users = list(User.objects.all()) # force evaluation to catch db errors
except Exception as e:
logger.warning(e)
return
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
with transaction.atomic():
cls._create_domain_requests(users)
@classmethod
def _create_domain_requests(cls, users):
"""Creates DomainRequests given a list of users"""
for user in users:
logger.debug("Loading domain requests for %s" % user)
for app in cls.DA:
try:
da, _ = DomainRequest.objects.get_or_create(
creator=user,
organization_name=app["organization_name"],
)
cls._set_non_foreign_key_fields(da, app)
cls._set_foreign_key_fields(da, app, user)
da.save()
cls._set_many_to_many_relations(da, app)
except Exception as e:
logger.warning(e)
class DomainFixture(DomainRequestFixture):
"""Create one domain and permissions on it for each user."""
@classmethod
def load(cls):
try:
users = list(User.objects.all()) # force evaluation to catch db errors
except Exception as e:
logger.warning(e)
return
# Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call.
with transaction.atomic():
# approve each user associated with `in review` status domains
DomainFixture._approve_domain_requests(users)
@staticmethod
def _approve_domain_requests(users):
"""Approves all provided domain requests if they are in the state in_review"""
for user in users:
domain_request = DomainRequest.objects.filter(
creator=user, status=DomainRequest.DomainRequestStatus.IN_REVIEW
).last()
logger.debug(f"Approving {domain_request} for {user}")
# All approvals require an investigator, so if there is none,
# assign one.
if domain_request.investigator is None:
# All "users" in fixtures have admin perms per prior config.
# No need to check for that.
domain_request.investigator = random.choice(users) # nosec
domain_request.approve(send_email=False)
domain_request.save()

View file

@ -1,11 +1,13 @@
import logging
from django.core.management.base import BaseCommand
from auditlog.context import disable_auditlog # type: ignore
from registrar.fixtures_users import UserFixture
from registrar.fixtures_domain_requests import DomainRequestFixture, DomainFixture
from auditlog.context import disable_auditlog
from registrar.fixtures.fixtures_domains import DomainFixture
from registrar.fixtures.fixtures_portfolios import PortfolioFixture
from registrar.fixtures.fixtures_requests import DomainRequestFixture
from registrar.fixtures.fixtures_suborganizations import SuborganizationFixture
from registrar.fixtures.fixtures_user_portfolio_permissions import UserPortfolioPermissionFixture
from registrar.fixtures.fixtures_users import UserFixture # type: ignore
logger = logging.getLogger(__name__)
@ -16,6 +18,9 @@ class Command(BaseCommand):
# https://github.com/jazzband/django-auditlog/issues/17
with disable_auditlog():
UserFixture.load()
PortfolioFixture.load()
SuborganizationFixture.load()
DomainRequestFixture.load()
DomainFixture.load()
UserPortfolioPermissionFixture.load()
logger.info("All fixtures loaded.")

View file

@ -113,7 +113,6 @@ class UserGroup(Group):
+ cisa_analysts_group.name
)
cisa_analysts_group.save()
logger.debug("CISA Analyst permissions added to group " + cisa_analysts_group.name)
except Exception as e:
logger.error(f"Error creating analyst permissions group: {e}")
@ -135,7 +134,6 @@ class UserGroup(Group):
# Assign all permissions to the group
full_access_group.permissions.add(*all_permissions)
full_access_group.save()
logger.debug("All permissions added to group " + full_access_group.name)
except Exception as e:
logger.error(f"Error creating full access group: {e}")

View file

@ -116,6 +116,7 @@ class TestURLAuth(TestCase):
"/api/v1/available/",
"/api/v1/get-report/current-federal",
"/api/v1/get-report/current-full",
"/api/v1/rdap/",
"/health",
]