Merge pull request #3421 from cisagov/ms/3316-automatically-add-portfolio-members

#3316: automatically add domain managers as portfolio members - [MS]
This commit is contained in:
Matt-Spence 2025-02-11 15:54:51 -05:00 committed by GitHub
commit b1ef0b597d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 259 additions and 8 deletions

View file

@ -914,7 +914,8 @@ Example (only requests): `./manage.py create_federal_portfolio --branch "executi
| 3 | **both** | If True, runs parse_requests and parse_domains. |
| 4 | **parse_requests** | If True, then the created portfolio is added to all related DomainRequests. |
| 5 | **parse_domains** | If True, then the created portfolio is added to all related Domains. |
| 6 | **skip_existing_portfolios** | If True, then the script will only create suborganizations, modify DomainRequest, and modify DomainInformation records only when creating a new portfolio. Use this flag when you do not want to modify existing records. |
| 6 | **add_managers** | If True, then the created portfolio will add all managers of the portfolio domains as members of the portfolio, including invited managers. |
| 7 | **skip_existing_portfolios** | If True, then the script will only create suborganizations, modify DomainRequest, and modify DomainInformation records only when creating a new portfolio. Use this flag when you do not want to modify existing records. |
- Parameters #1-#2: Either `--agency_name` or `--branch` must be specified. Not both.
- Parameters #2-#3, you cannot use `--both` while using these. You must specify either `--parse_requests` or `--parse_domains` seperately. While all of these parameters are optional in that you do not need to specify all of them,

View file

@ -5,9 +5,16 @@ import logging
from django.core.management import BaseCommand, CommandError
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from registrar.models import DomainInformation, DomainRequest, FederalAgency, Suborganization, Portfolio, User
from registrar.models.domain import Domain
from registrar.models.domain_invitation import DomainInvitation
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user_domain_role import UserDomainRole
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.generic_helper import normalize_string
from django.db.models import F, Q
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
logger = logging.getLogger(__name__)
@ -21,6 +28,10 @@ class Command(BaseCommand):
self.updated_portfolios = set()
self.skipped_portfolios = set()
self.failed_portfolios = set()
self.added_managers = set()
self.added_invitations = set()
self.skipped_invitations = set()
self.failed_managers = set()
def add_arguments(self, parser):
"""Add command line arguments to create federal portfolios.
@ -38,6 +49,9 @@ class Command(BaseCommand):
Optional (mutually exclusive with parse options):
--both: Shorthand for using both --parse_requests and --parse_domains
Cannot be used with --parse_requests or --parse_domains
Optional:
--add_managers: Add all domain managers of the portfolio's domains to the organization.
"""
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
@ -64,23 +78,31 @@ class Command(BaseCommand):
action=argparse.BooleanOptionalAction,
help="Adds portfolio to both requests and domains",
)
parser.add_argument(
"--add_managers",
action=argparse.BooleanOptionalAction,
help="Add all domain managers of the portfolio's domains to the organization.",
)
parser.add_argument(
"--skip_existing_portfolios",
action=argparse.BooleanOptionalAction,
help="Only add suborganizations to newly created portfolios, skip existing ones.",
)
def handle(self, **options):
def handle(self, **options): # noqa: C901
agency_name = options.get("agency_name")
branch = options.get("branch")
parse_requests = options.get("parse_requests")
parse_domains = options.get("parse_domains")
both = options.get("both")
add_managers = options.get("add_managers")
skip_existing_portfolios = options.get("skip_existing_portfolios")
if not both:
if not parse_requests and not parse_domains:
raise CommandError("You must specify at least one of --parse_requests or --parse_domains.")
if not (parse_requests or parse_domains or add_managers):
raise CommandError(
"You must specify at least one of --parse_requests, --parse_domains, or --add_managers."
)
else:
if parse_requests or parse_domains:
raise CommandError("You cannot pass --parse_requests or --parse_domains when passing --both.")
@ -96,7 +118,6 @@ class Command(BaseCommand):
)
else:
raise CommandError(f"Cannot find '{branch}' federal agencies in our database.")
portfolios = []
for federal_agency in agencies:
message = f"Processing federal agency '{federal_agency.agency}'..."
@ -107,6 +128,8 @@ class Command(BaseCommand):
federal_agency, parse_domains, parse_requests, both, skip_existing_portfolios
)
portfolios.append(portfolio)
if add_managers:
self.add_managers_to_portfolio(portfolio)
except Exception as exec:
self.failed_portfolios.add(federal_agency)
logger.error(exec)
@ -127,6 +150,26 @@ class Command(BaseCommand):
display_as_str=True,
)
if add_managers:
TerminalHelper.log_script_run_summary(
self.added_managers,
self.failed_managers,
[], # can't skip managers, can only add or fail
log_header="----- MANAGERS ADDED -----",
debug=False,
display_as_str=True,
)
TerminalHelper.log_script_run_summary(
self.added_invitations,
[],
self.skipped_invitations,
log_header="----- INVITATIONS ADDED -----",
debug=False,
skipped_header="----- INVITATIONS SKIPPED (ALREADY EXISTED) -----",
display_as_str=True,
)
# POST PROCESSING STEP: Remove the federal agency if it matches the portfolio name.
# We only do this for started domain requests.
if parse_requests or both:
@ -147,6 +190,73 @@ class Command(BaseCommand):
)
self.post_process_started_domain_requests(agencies, portfolios)
def add_managers_to_portfolio(self, portfolio: Portfolio):
"""
Add all domain managers of the portfolio's domains to the organization.
This includes adding them to the correct group and creating portfolio invitations.
"""
logger.info(f"Adding managers for portfolio {portfolio}")
# Fetch all domains associated with the portfolio
domains = Domain.objects.filter(domain_info__portfolio=portfolio)
domain_managers: set[UserDomainRole] = set()
# Fetch all users with manager roles for the domains
# select_related means that a db query will not be occur when you do user_domain_role.user
# Its similar to a set or dict in that it costs slightly more upfront in exchange for perf later
user_domain_roles = UserDomainRole.objects.select_related("user").filter(
domain__in=domains, role=UserDomainRole.Roles.MANAGER
)
domain_managers.update(user_domain_roles)
invited_managers: set[str] = set()
# Get the emails of invited managers
domain_invitations = DomainInvitation.objects.filter(
domain__in=domains, status=DomainInvitation.DomainInvitationStatus.INVITED
).values_list("email", flat=True)
invited_managers.update(domain_invitations)
for user_domain_role in domain_managers:
try:
# manager is a user id
user = user_domain_role.user
_, created = UserPortfolioPermission.objects.get_or_create(
portfolio=portfolio,
user=user,
defaults={"roles": [UserPortfolioRoleChoices.ORGANIZATION_MEMBER]},
)
self.added_managers.add(user)
if created:
logger.info(f"Added manager '{user}' to portfolio '{portfolio}'")
else:
logger.info(f"Manager '{user}' already exists in portfolio '{portfolio}'")
except User.DoesNotExist:
self.failed_managers.add(user)
logger.debug(f"User '{user}' does not exist")
for email in invited_managers:
self.create_portfolio_invitation(portfolio, email)
def create_portfolio_invitation(self, portfolio: Portfolio, email: str):
"""
Create a portfolio invitation for the given email.
"""
_, created = PortfolioInvitation.objects.get_or_create(
portfolio=portfolio,
email=email,
defaults={
"status": PortfolioInvitation.PortfolioInvitationStatus.INVITED,
"roles": [UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
},
)
if created:
self.added_invitations.add(email)
logger.info(f"Created portfolio invitation for '{email}' to portfolio '{portfolio}'")
else:
self.skipped_invitations.add(email)
logger.info(f"Found existing portfolio invitation for '{email}' to portfolio '{portfolio}'")
def post_process_started_domain_requests(self, agencies, portfolios):
"""
Removes duplicate organization data by clearing federal_agency when it matches the portfolio name.
@ -160,6 +270,7 @@ class Command(BaseCommand):
# 2. Said portfolio (or portfolios) are only the ones specified at the start of the script.
# 3. The domain request is in status "started".
# Note: Both names are normalized so excess spaces are stripped and the string is lowercased.
domain_requests_to_update = DomainRequest.objects.filter(
federal_agency__in=agencies,
federal_agency__agency__isnull=False,

View file

@ -7,6 +7,7 @@ from registrar.models.domain_group import DomainGroup
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.senior_official import SeniorOfficial
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
from registrar.utility.constants import BranchChoices
from django.utils import timezone
from django.utils.module_loading import import_string
@ -1465,6 +1466,7 @@ class TestCreateFederalPortfolio(TestCase):
self.executive_so_2 = SeniorOfficial.objects.create(
first_name="first", last_name="last", email="mango@igorville.gov", federal_agency=self.executive_agency_2
)
with boto3_mocking.clients.handler_for("sesv2", self.mock_client):
self.domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
@ -1474,6 +1476,7 @@ class TestCreateFederalPortfolio(TestCase):
)
self.domain_request.approve()
self.domain_info = DomainInformation.objects.filter(domain_request=self.domain_request).get()
self.domain = Domain.objects.get(name="city.gov")
self.domain_request_2 = completed_domain_request(
name="icecreamforigorville.gov",
@ -1517,7 +1520,6 @@ class TestCreateFederalPortfolio(TestCase):
FederalAgency.objects.all().delete()
User.objects.all().delete()
@less_console_noise_decorator
def run_create_federal_portfolio(self, **kwargs):
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit",
@ -1812,12 +1814,12 @@ class TestCreateFederalPortfolio(TestCase):
# We expect a error to be thrown when we dont pass parse requests or domains
with self.assertRaisesRegex(
CommandError, "You must specify at least one of --parse_requests or --parse_domains."
CommandError, "You must specify at least one of --parse_requests, --parse_domains, or --add_managers."
):
self.run_create_federal_portfolio(branch="executive")
with self.assertRaisesRegex(
CommandError, "You must specify at least one of --parse_requests or --parse_domains."
CommandError, "You must specify at least one of --parse_requests, --parse_domains, or --add_managers."
):
self.run_create_federal_portfolio(agency_name="test")
@ -1854,6 +1856,143 @@ class TestCreateFederalPortfolio(TestCase):
self.assertEqual(existing_portfolio.notes, "Old notes")
self.assertEqual(existing_portfolio.creator, self.user)
@less_console_noise_decorator
def test_add_managers_from_domains(self):
"""Test that all domain managers are added as portfolio managers."""
# Create users and assign them as domain managers
manager1 = User.objects.create(username="manager1", email="manager1@example.com")
manager2 = User.objects.create(username="manager2", email="manager2@example.com")
UserDomainRole.objects.create(user=manager1, domain=self.domain, role=UserDomainRole.Roles.MANAGER)
UserDomainRole.objects.create(user=manager2, domain=self.domain, role=UserDomainRole.Roles.MANAGER)
# Run the management command
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_domains=True, add_managers=True)
# Check that the portfolio was created
self.portfolio = Portfolio.objects.get(federal_agency=self.federal_agency)
# Check that the users have been added as portfolio managers
permissions = UserPortfolioPermission.objects.filter(portfolio=self.portfolio, user__in=[manager1, manager2])
# Check that the users have been added as portfolio managers
self.assertEqual(permissions.count(), 2)
for perm in permissions:
self.assertIn(UserPortfolioRoleChoices.ORGANIZATION_MEMBER, perm.roles)
@less_console_noise_decorator
def test_add_invited_managers(self):
"""Test that invited domain managers receive portfolio invitations."""
# create a domain invitation for the manager
_ = DomainInvitation.objects.create(
domain=self.domain, email="manager1@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
)
# Run the management command
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_domains=True, add_managers=True)
# Check that the portfolio was created
self.portfolio = Portfolio.objects.get(federal_agency=self.federal_agency)
# Check that a PortfolioInvitation has been created for the invited email
invitation = PortfolioInvitation.objects.get(email="manager1@example.com", portfolio=self.portfolio)
# Verify the status of the invitation remains INVITED
self.assertEqual(
invitation.status,
PortfolioInvitation.PortfolioInvitationStatus.INVITED,
"PortfolioInvitation status should remain INVITED for non-existent users.",
)
# Verify that no duplicate invitations are created
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True
)
invitations = PortfolioInvitation.objects.filter(email="manager1@example.com", portfolio=self.portfolio)
self.assertEqual(
invitations.count(),
1,
"Duplicate PortfolioInvitation should not be created for the same email and portfolio.",
)
@less_console_noise_decorator
def test_no_duplicate_managers_added(self):
"""Test that duplicate managers are not added multiple times."""
# Create a manager
manager = User.objects.create(username="manager", email="manager@example.com")
UserDomainRole.objects.create(user=manager, domain=self.domain, role=UserDomainRole.Roles.MANAGER)
# Create a pre-existing portfolio
self.portfolio = Portfolio.objects.create(
organization_name=self.federal_agency.agency, federal_agency=self.federal_agency, creator=self.user
)
# Manually add the manager to the portfolio
UserPortfolioPermission.objects.create(
portfolio=self.portfolio, user=manager, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER]
)
# Run the management command
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True
)
# Ensure that the manager is not duplicated
permissions = UserPortfolioPermission.objects.filter(portfolio=self.portfolio, user=manager)
self.assertEqual(permissions.count(), 1)
@less_console_noise_decorator
def test_add_managers_skip_existing_portfolios(self):
"""Test that managers are skipped when the portfolio already exists."""
# Create a pre-existing portfolio
self.portfolio = Portfolio.objects.create(
organization_name=self.federal_agency.agency, federal_agency=self.federal_agency, creator=self.user
)
domain_request_1 = completed_domain_request(
name="domain1.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
generic_org_type=DomainRequest.OrganizationChoices.CITY,
federal_agency=self.federal_agency,
user=self.user,
portfolio=self.portfolio,
)
domain_request_1.approve()
domain1 = Domain.objects.get(name="domain1.gov")
domain_request_2 = completed_domain_request(
name="domain2.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
generic_org_type=DomainRequest.OrganizationChoices.CITY,
federal_agency=self.federal_agency,
user=self.user,
portfolio=self.portfolio,
)
domain_request_2.approve()
domain2 = Domain.objects.get(name="domain2.gov")
# Create users and assign them as domain managers
manager1 = User.objects.create(username="manager1", email="manager1@example.com")
manager2 = User.objects.create(username="manager2", email="manager2@example.com")
UserDomainRole.objects.create(user=manager1, domain=domain1, role=UserDomainRole.Roles.MANAGER)
UserDomainRole.objects.create(user=manager2, domain=domain2, role=UserDomainRole.Roles.MANAGER)
# Run the management command
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency,
parse_requests=True,
add_managers=True,
skip_existing_portfolios=True,
)
# Check that managers were added to the portfolio
permissions = UserPortfolioPermission.objects.filter(portfolio=self.portfolio, user__in=[manager1, manager2])
self.assertEqual(permissions.count(), 2)
for perm in permissions:
self.assertIn(UserPortfolioRoleChoices.ORGANIZATION_MEMBER, perm.roles)
def test_skip_existing_portfolios(self):
"""Tests the skip_existing_portfolios to ensure that it doesn't add
suborgs, domain requests, and domain info."""