Merge pull request #2670 from cisagov/ag/2616-populate-suborg-and-portfolio-script

Ticket #2616: populate suborg and portfolio script
This commit is contained in:
zandercymatics 2024-09-12 09:25:23 -06:00 committed by GitHub
commit 91b69a34c7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 456 additions and 3 deletions

View file

@ -860,3 +860,55 @@ Example: `cf ssh getgov-za`
### Running locally
```docker-compose exec app ./manage.py populate_domain_request_dates```
## Create federal portfolio
This script takes the name of a `FederalAgency` (like 'AMTRAK') and does the following:
1. Creates the portfolio record based off of data on the federal agency object itself.
2. Creates suborganizations from existing DomainInformation records.
3. Associates the SeniorOfficial record (if it exists).
4. Adds this portfolio to DomainInformation / DomainRequests or both.
Errors:
1. ValueError: Federal agency not found in database.
2. Logged Warning: No senior official found for portfolio
3. Logged Error: No suborganizations found for portfolio.
4. Logged Warning: No new suborganizations to add.
5. Logged Warning: No valid DomainRequest records to update.
6. Logged Warning: No valid DomainInformation records to update.
### Running on sandboxes
#### Step 1: Login to CloudFoundry
```cf login -a api.fr.cloud.gov --sso```
#### Step 2: SSH into your environment
```cf ssh getgov-{space}```
Example: `cf ssh getgov-za`
#### Step 3: Create a shell instance
```/tmp/lifecycle/shell```
#### Step 4: Upload your csv to the desired sandbox
[Follow these steps](#use-scp-to-transfer-data-to-sandboxes) to upload the federal_cio csv to a sandbox of your choice.
#### Step 5: Running the script
```./manage.py create_federal_portfolio "{federal_agency_name}" --both```
Example (only requests): `./manage.py create_federal_portfolio "AMTRAK" --parse_requests`
### Running locally
#### Step 1: Running the script
```docker-compose exec app ./manage.py create_federal_portfolio "{federal_agency_name}" --both```
##### Parameters
| | Parameter | Description |
|:-:|:-------------------------- |:-------------------------------------------------------------------------------------------|
| 1 | **federal_agency_name** | Name of the FederalAgency record surrounded by quotes. For instance,"AMTRAK". |
| 2 | **both** | If True, runs parse_requests and parse_domains. |
| 3 | **parse_requests** | If True, then the created portfolio is added to all related DomainRequests. |
| 4 | **parse_domains** | If True, then the created portfolio is added to all related Domains. |
Note: Regarding parameters #2-#3, you cannot use `--both` while using these. You must specify either `--parse_requests` or `--parse_domains` seperately. While all of these parameters are optional in that you do not need to specify all of them,
you must specify at least one to run this script.

View file

@ -0,0 +1,255 @@
"""Loads files from /tmp into our sandboxes"""
import argparse
import logging
from django.core.management import BaseCommand, CommandError
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from registrar.models import DomainInformation, DomainRequest, FederalAgency, Suborganization, Portfolio, User
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Creates a federal portfolio given a FederalAgency name"
def add_arguments(self, parser):
"""Add three arguments:
1. agency_name => the value of FederalAgency.agency
2. --parse_requests => if true, adds the given portfolio to each related DomainRequest
3. --parse_domains => if true, adds the given portfolio to each related DomainInformation
"""
parser.add_argument(
"agency_name",
help="The name of the FederalAgency to add",
)
parser.add_argument(
"--parse_requests",
action=argparse.BooleanOptionalAction,
help="Adds portfolio to DomainRequests",
)
parser.add_argument(
"--parse_domains",
action=argparse.BooleanOptionalAction,
help="Adds portfolio to DomainInformation",
)
parser.add_argument(
"--both",
action=argparse.BooleanOptionalAction,
help="Adds portfolio to both requests and domains",
)
def handle(self, agency_name, **options):
parse_requests = options.get("parse_requests")
parse_domains = options.get("parse_domains")
both = options.get("both")
if not both:
if not parse_requests and not parse_domains:
raise CommandError("You must specify at least one of --parse_requests or --parse_domains.")
else:
if parse_requests or parse_domains:
raise CommandError("You cannot pass --parse_requests or --parse_domains when passing --both.")
federal_agency = FederalAgency.objects.filter(agency__iexact=agency_name).first()
if not federal_agency:
raise ValueError(
f"Cannot find the federal agency '{agency_name}' in our database. "
"The value you enter for `agency_name` must be "
"prepopulated in the FederalAgency table before proceeding."
)
portfolio = self.create_or_modify_portfolio(federal_agency)
self.create_suborganizations(portfolio, federal_agency)
if parse_requests or both:
self.handle_portfolio_requests(portfolio, federal_agency)
if parse_domains or both:
self.handle_portfolio_domains(portfolio, federal_agency)
def create_or_modify_portfolio(self, federal_agency):
"""Creates or modifies a portfolio record based on a federal agency."""
portfolio_args = {
"federal_agency": federal_agency,
"organization_name": federal_agency.agency,
"organization_type": DomainRequest.OrganizationChoices.FEDERAL,
"creator": User.get_default_user(),
"notes": "Auto-generated record",
}
if federal_agency.so_federal_agency.exists():
portfolio_args["senior_official"] = federal_agency.so_federal_agency.first()
portfolio, created = Portfolio.objects.get_or_create(
organization_name=portfolio_args.get("organization_name"), defaults=portfolio_args
)
if created:
message = f"Created portfolio '{portfolio}'"
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message)
if portfolio_args.get("senior_official"):
message = f"Added senior official '{portfolio_args['senior_official']}'"
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message)
else:
message = (
f"No senior official added to portfolio '{portfolio}'. "
"None was returned for the reverse relation `FederalAgency.so_federal_agency.first()`"
)
TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message)
else:
proceed = TerminalHelper.prompt_for_execution(
system_exit_on_terminate=False,
prompt_message=f"""
The given portfolio '{federal_agency.agency}' already exists in our DB.
If you cancel, the rest of the script will still execute but this record will not update.
""",
prompt_title="Do you wish to modify this record?",
)
if proceed:
# Don't override the creator and notes fields
if portfolio.creator:
portfolio_args.pop("creator")
if portfolio.notes:
portfolio_args.pop("notes")
# Update everything else
for key, value in portfolio_args.items():
setattr(portfolio, key, value)
portfolio.save()
message = f"Modified portfolio '{portfolio}'"
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
if portfolio_args.get("senior_official"):
message = f"Added/modified senior official '{portfolio_args['senior_official']}'"
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
return portfolio
def create_suborganizations(self, portfolio: Portfolio, federal_agency: FederalAgency):
"""Create Suborganizations tied to the given portfolio based on DomainInformation objects"""
valid_agencies = DomainInformation.objects.filter(
federal_agency=federal_agency, organization_name__isnull=False
)
org_names = set(valid_agencies.values_list("organization_name", flat=True))
if not org_names:
message = (
"Could not add any suborganizations."
f"\nNo suborganizations were found for '{federal_agency}' when filtering on this name, "
"and excluding null organization_name records."
)
TerminalHelper.colorful_logger(logger.warning, TerminalColors.FAIL, message)
return
# Check if we need to update any existing suborgs first. This step is optional.
existing_suborgs = Suborganization.objects.filter(name__in=org_names)
if existing_suborgs.exists():
self._update_existing_suborganizations(portfolio, existing_suborgs)
# Create new suborgs, as long as they don't exist in the db already
new_suborgs = []
for name in org_names - set(existing_suborgs.values_list("name", flat=True)):
# Stored in variables due to linter wanting type information here.
portfolio_name: str = portfolio.organization_name if portfolio.organization_name is not None else ""
if name is not None and name.lower() == portfolio_name.lower():
# You can use this to populate location information, when this occurs.
# However, this isn't needed for now so we can skip it.
message = (
f"Skipping suborganization create on record '{name}'. "
"The federal agency name is the same as the portfolio name."
)
TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, message)
else:
new_suborgs.append(Suborganization(name=name, portfolio=portfolio)) # type: ignore
if new_suborgs:
Suborganization.objects.bulk_create(new_suborgs)
TerminalHelper.colorful_logger(
logger.info, TerminalColors.OKGREEN, f"Added {len(new_suborgs)} suborganizations"
)
else:
TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, "No suborganizations added")
def _update_existing_suborganizations(self, portfolio, orgs_to_update):
"""
Update existing suborganizations with new portfolio.
Prompts for user confirmation before proceeding.
"""
proceed = TerminalHelper.prompt_for_execution(
system_exit_on_terminate=False,
prompt_message=f"""Some suborganizations already exist in our DB.
If you cancel, the rest of the script will still execute but these records will not update.
==Proposed Changes==
The following suborgs will be updated: {[org.name for org in orgs_to_update]}
""",
prompt_title="Do you wish to modify existing suborganizations?",
)
if proceed:
for org in orgs_to_update:
org.portfolio = portfolio
Suborganization.objects.bulk_update(orgs_to_update, ["portfolio"])
message = f"Updated {len(orgs_to_update)} suborganizations."
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
def handle_portfolio_requests(self, portfolio: Portfolio, federal_agency: FederalAgency):
"""
Associate portfolio with domain requests for a federal agency.
Updates all relevant domain request records.
"""
invalid_states = [
DomainRequest.DomainRequestStatus.STARTED,
DomainRequest.DomainRequestStatus.INELIGIBLE,
DomainRequest.DomainRequestStatus.REJECTED,
]
domain_requests = DomainRequest.objects.filter(federal_agency=federal_agency).exclude(status__in=invalid_states)
if not domain_requests.exists():
message = f"""
Portfolios not added to domain requests: no valid records found.
This means that a filter on DomainInformation for the federal_agency '{federal_agency}' returned no results.
Excluded statuses: STARTED, INELIGIBLE, REJECTED.
"""
TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message)
return None
# Get all suborg information and store it in a dict to avoid doing a db call
suborgs = Suborganization.objects.filter(portfolio=portfolio).in_bulk(field_name="name")
for domain_request in domain_requests:
domain_request.portfolio = portfolio
if domain_request.organization_name in suborgs:
domain_request.sub_organization = suborgs.get(domain_request.organization_name)
DomainRequest.objects.bulk_update(domain_requests, ["portfolio", "sub_organization"])
message = f"Added portfolio '{portfolio}' to {len(domain_requests)} domain requests."
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message)
def handle_portfolio_domains(self, portfolio: Portfolio, federal_agency: FederalAgency):
"""
Associate portfolio with domains for a federal agency.
Updates all relevant domain information records.
"""
domain_infos = DomainInformation.objects.filter(federal_agency=federal_agency)
if not domain_infos.exists():
message = f"""
Portfolios not added to domains: no valid records found.
This means that a filter on DomainInformation for the federal_agency '{federal_agency}' returned no results.
"""
TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message)
return None
# Get all suborg information and store it in a dict to avoid doing a db call
suborgs = Suborganization.objects.filter(portfolio=portfolio).in_bulk(field_name="name")
for domain_info in domain_infos:
domain_info.portfolio = portfolio
if domain_info.organization_name in suborgs:
domain_info.sub_organization = suborgs.get(domain_info.organization_name)
DomainInformation.objects.bulk_update(domain_infos, ["portfolio", "sub_organization"])
message = f"Added portfolio '{portfolio}' to {len(domain_infos)} domains"
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message)

View file

@ -423,7 +423,7 @@ class Command(BaseCommand):
valid_fed_type = fed_type in fed_choices
valid_fed_agency = fed_agency in agency_choices
default_creator, _ = User.objects.get_or_create(username="System")
default_creator = User.get_default_user()
new_domain_info_data = {
"domain": domain,

View file

@ -131,6 +131,12 @@ class User(AbstractUser):
else:
return self.username
@classmethod
def get_default_user(cls):
"""Returns the default "system" user"""
default_creator, _ = User.objects.get_or_create(username="System")
return default_creator
def restrict_user(self):
self.status = self.RESTRICTED
self.save()

View file

@ -911,6 +911,7 @@ def completed_domain_request( # noqa
federal_type=None,
action_needed_reason=None,
portfolio=None,
organization_name=None,
):
"""A completed domain request."""
if not user:
@ -954,7 +955,7 @@ def completed_domain_request( # noqa
federal_type="executive",
purpose="Purpose of the site",
is_policy_acknowledged=True,
organization_name="Testorg",
organization_name=organization_name if organization_name else "Testorg",
address_line1="address 1",
address_line2="address 2",
state_territory="NY",

View file

@ -1,4 +1,5 @@
import copy
import boto3_mocking # type: ignore
from datetime import date, datetime, time
from django.core.management import call_command
from django.test import TestCase, override_settings
@ -8,6 +9,7 @@ from django.utils import timezone
from django.utils.module_loading import import_string
import logging
import pyzipper
from django.core.management.base import CommandError
from registrar.management.commands.clean_tables import Command as CleanTablesCommand
from registrar.management.commands.export_tables import Command as ExportTablesCommand
from registrar.models import (
@ -23,14 +25,17 @@ from registrar.models import (
VerifiedByStaff,
PublicContact,
FederalAgency,
Portfolio,
Suborganization,
)
import tablib
from unittest.mock import patch, call, MagicMock, mock_open
from epplibwrapper import commands, common
from .common import MockEppLib, less_console_noise, completed_domain_request
from .common import MockEppLib, less_console_noise, completed_domain_request, MockSESClient
from api.tests.common import less_console_noise_decorator
logger = logging.getLogger(__name__)
@ -1408,3 +1413,137 @@ class TestPopulateFederalAgencyInitialsAndFceb(TestCase):
missing_agency.refresh_from_db()
self.assertIsNone(missing_agency.initials)
self.assertIsNone(missing_agency.is_fceb)
class TestCreateFederalPortfolio(TestCase):
@less_console_noise_decorator
def setUp(self):
self.mock_client = MockSESClient()
self.user = User.objects.create(username="testuser")
self.federal_agency = FederalAgency.objects.create(agency="Test Federal Agency")
self.senior_official = SeniorOfficial.objects.create(
first_name="first", last_name="last", email="testuser@igorville.gov", federal_agency=self.federal_agency
)
with boto3_mocking.clients.handler_for("sesv2", self.mock_client):
self.domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
generic_org_type=DomainRequest.OrganizationChoices.CITY,
federal_agency=self.federal_agency,
user=self.user,
)
self.domain_request.approve()
self.domain_info = DomainInformation.objects.filter(domain_request=self.domain_request).get()
self.domain_request_2 = completed_domain_request(
name="sock@igorville.org",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
generic_org_type=DomainRequest.OrganizationChoices.CITY,
federal_agency=self.federal_agency,
user=self.user,
organization_name="Test Federal Agency",
)
self.domain_request_2.approve()
self.domain_info_2 = DomainInformation.objects.filter(domain_request=self.domain_request_2).get()
def tearDown(self):
DomainInformation.objects.all().delete()
DomainRequest.objects.all().delete()
Suborganization.objects.all().delete()
Portfolio.objects.all().delete()
SeniorOfficial.objects.all().delete()
FederalAgency.objects.all().delete()
User.objects.all().delete()
@less_console_noise_decorator
def run_create_federal_portfolio(self, agency_name, parse_requests=False, parse_domains=False):
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit",
return_value=True,
):
call_command(
"create_federal_portfolio", agency_name, parse_requests=parse_requests, parse_domains=parse_domains
)
def test_create_or_modify_portfolio(self):
"""Test portfolio creation and modification with suborg and senior official."""
self.run_create_federal_portfolio("Test Federal Agency", parse_requests=True)
portfolio = Portfolio.objects.get(federal_agency=self.federal_agency)
self.assertEqual(portfolio.organization_name, self.federal_agency.agency)
self.assertEqual(portfolio.organization_type, DomainRequest.OrganizationChoices.FEDERAL)
self.assertEqual(portfolio.creator, User.get_default_user())
self.assertEqual(portfolio.notes, "Auto-generated record")
# Test the suborgs
suborganizations = Suborganization.objects.filter(portfolio__federal_agency=self.federal_agency)
self.assertEqual(suborganizations.count(), 1)
self.assertEqual(suborganizations.first().name, "Testorg")
# Test the senior official
self.assertEqual(portfolio.senior_official, self.senior_official)
def test_handle_portfolio_requests(self):
"""Verify portfolio association with domain requests."""
self.run_create_federal_portfolio("Test Federal Agency", parse_requests=True)
self.domain_request.refresh_from_db()
self.assertIsNotNone(self.domain_request.portfolio)
self.assertEqual(self.domain_request.portfolio.federal_agency, self.federal_agency)
self.assertEqual(self.domain_request.sub_organization.name, "Testorg")
def test_handle_portfolio_domains(self):
"""Check portfolio association with domain information."""
self.run_create_federal_portfolio("Test Federal Agency", parse_domains=True)
self.domain_info.refresh_from_db()
self.assertIsNotNone(self.domain_info.portfolio)
self.assertEqual(self.domain_info.portfolio.federal_agency, self.federal_agency)
self.assertEqual(self.domain_info.sub_organization.name, "Testorg")
def test_handle_parse_both(self):
"""Ensure correct parsing of both requests and domains."""
self.run_create_federal_portfolio("Test Federal Agency", parse_requests=True, parse_domains=True)
self.domain_request.refresh_from_db()
self.domain_info.refresh_from_db()
self.assertIsNotNone(self.domain_request.portfolio)
self.assertIsNotNone(self.domain_info.portfolio)
self.assertEqual(self.domain_request.portfolio, self.domain_info.portfolio)
def test_command_error_no_parse_options(self):
"""Verify error when no parse options are provided."""
with self.assertRaisesRegex(
CommandError, "You must specify at least one of --parse_requests or --parse_domains."
):
self.run_create_federal_portfolio("Test Federal Agency")
def test_command_error_agency_not_found(self):
"""Check error handling for non-existent agency."""
expected_message = (
"Cannot find the federal agency 'Non-existent Agency' in our database. "
"The value you enter for `agency_name` must be prepopulated in the FederalAgency table before proceeding."
)
with self.assertRaisesRegex(ValueError, expected_message):
self.run_create_federal_portfolio("Non-existent Agency", parse_requests=True)
def test_update_existing_portfolio(self):
"""Test updating an existing portfolio."""
# Create an existing portfolio
existing_portfolio = Portfolio.objects.create(
federal_agency=self.federal_agency,
organization_name="Test Federal Agency",
organization_type=DomainRequest.OrganizationChoices.CITY,
creator=self.user,
notes="Old notes",
)
self.run_create_federal_portfolio("Test Federal Agency", parse_requests=True)
existing_portfolio.refresh_from_db()
self.assertEqual(existing_portfolio.organization_name, self.federal_agency.agency)
self.assertEqual(existing_portfolio.organization_type, DomainRequest.OrganizationChoices.FEDERAL)
# Notes and creator should be untouched
self.assertEqual(existing_portfolio.notes, "Old notes")
self.assertEqual(existing_portfolio.creator, self.user)