Fix managers portion

This commit is contained in:
zandercymatics 2025-03-21 13:17:47 -06:00
parent 8f7959a023
commit 2e119c2600
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
2 changed files with 113 additions and 140 deletions

View file

@ -5,6 +5,7 @@ import logging
from django.core.management import BaseCommand, CommandError
from registrar.management.commands.utility.terminal_helper import ScriptDataHelper, TerminalColors, TerminalHelper
from registrar.models import DomainInformation, DomainRequest, FederalAgency, Suborganization, Portfolio, User
from registrar.models.domain import Domain
from registrar.models.domain_invitation import DomainInvitation
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user_domain_role import UserDomainRole
@ -87,7 +88,6 @@ class Command(BaseCommand):
Optional:
--skip_existing_portfolios: Does not perform substeps on a portfolio if it already exists.
-- clear_federal_agency_on_started_domain_requests: Parses started domain requests
--debug: Increases log verbosity
"""
group = parser.add_mutually_exclusive_group(required=True)
@ -120,11 +120,6 @@ class Command(BaseCommand):
action=argparse.BooleanOptionalAction,
help="Only parses newly created portfolios, skippubg existing ones.",
)
parser.add_argument(
"--clear_federal_agency_on_started_domain_requests",
action=argparse.BooleanOptionalAction,
help="Clears the federal agency field on started domain requests under the given portfolio",
)
parser.add_argument(
"--debug",
action=argparse.BooleanOptionalAction,
@ -138,7 +133,6 @@ class Command(BaseCommand):
parse_domains = options.get("parse_domains")
parse_managers = options.get("parse_managers")
skip_existing_portfolios = options.get("skip_existing_portfolios")
clear_federal_agency_on_started_domain_requests = options.get("clear_federal_agency_on_started_domain_requests")
debug = options.get("debug")
# Parse script params
@ -160,24 +154,26 @@ class Command(BaseCommand):
else:
raise CommandError(f"Cannot find '{branch}' federal agencies in our database.")
# Store all portfolios and agencies in a dict to avoid extra db calls
existing_portfolios = Portfolio.objects.filter(
organization_name__in=agencies.values_list("agency", flat=True), organization_name__isnull=False
)
existing_portfolios_dict = {normalize_string(p.organization_name): p for p in existing_portfolios}
agencies_dict = {normalize_string(agency.agency): agency for agency in agencies}
# NOTE: exceptions to portfolio and suborg are intentionally uncaught.
# parse domains, requests, and managers all rely on these fields to function.
# An error here means everything down the line is compromised.
# The individual parse steps, however, are independent from eachother.
# == Handle portfolios == #
# TODO - some kind of duplicate check on agencies and existing portfolios
existing_portfolios = Portfolio.objects.filter(
organization_name__in=agencies.values_list("agency", flat=True), organization_name__isnull=False
)
existing_portfolios_set = {normalize_string(p.organization_name): p for p in existing_portfolios}
agencies_dict = {normalize_string(agency.agency): agency for agency in agencies}
# Loop through every agency we want to add and create a portfolio if the record is new.
for federal_agency in agencies_dict.values():
portfolio_name = normalize_string(federal_agency.agency, lowercase=False)
portfolio = existing_portfolios_set.get(portfolio_name, None)
norm_agency_name = normalize_string(federal_agency.agency)
portfolio = existing_portfolios_dict.get(norm_agency_name, None)
if portfolio is None:
portfolio = Portfolio(
organization_name=portfolio_name,
organization_name=federal_agency.agency,
federal_agency=federal_agency,
organization_type=DomainRequest.OrganizationChoices.FEDERAL,
creator=User.get_default_user(),
@ -192,13 +188,13 @@ class Command(BaseCommand):
self.portfolio_changes.skip.append(portfolio)
# Create portfolios
portfolios_to_use = self.portfolio_changes.bulk_create()
self.portfolio_changes.bulk_create()
# After create, get the list of all portfolios to use
portfolios_to_use = set(self.portfolio_changes.create)
if not skip_existing_portfolios:
portfolios_to_use.update(set(existing_portfolios))
portfolios_to_use_dict = {normalize_string(p.organization_name): p for p in portfolios_to_use}
# == Handle suborganizations == #
@ -207,19 +203,18 @@ class Command(BaseCommand):
self.suborganization_changes.create.extend(created_suborgs.values())
self.suborganization_changes.bulk_create()
# == Handle domains, requests, and managers == #
# == Handle domains and requests == #
for portfolio_org_name, portfolio in portfolios_to_use_dict.items():
federal_agency = agencies_dict.get(portfolio_org_name)
suborgs = portfolio.portfolio_suborganizations.in_bulk(field_name="name")
if parse_domains:
self.handle_portfolio_domains(portfolio, federal_agency)
updated_domains = self.update_domains(portfolio, federal_agency, suborgs, debug)
self.domain_info_changes.update.extend(updated_domains)
if parse_requests:
self.handle_portfolio_requests(
portfolio, federal_agency, clear_federal_agency_on_started_domain_requests, debug
)
if parse_managers:
self.handle_portfolio_managers(portfolio, debug)
updated_domain_requests = self.update_requests(portfolio, federal_agency, suborgs, debug)
self.domain_request_changes.update.extend(updated_domain_requests)
# Update DomainInformation
try:
@ -244,19 +239,18 @@ class Command(BaseCommand):
logger.error(f"{TerminalColors.FAIL}Could not bulk update domain requests.{TerminalColors.ENDC}")
logger.error(err, exc_info=True)
# Create UserPortfolioPermission
try:
self.user_portfolio_perm_changes.bulk_create()
except Exception as err:
logger.error(f"{TerminalColors.FAIL}Could not bulk create user portfolio permissions.{TerminalColors.ENDC}")
logger.error(err, exc_info=True)
# == Handle managers (no bulk_create) == #
if parse_managers:
domain_infos = DomainInformation.objects.filter(
portfolio__in=portfolios_to_use
)
domains = Domain.objects.filter(domain_info__in=domain_infos)
# Create PortfolioInvitation
try:
self.portfolio_invitation_changes.bulk_create()
except Exception as err:
logger.error(f"{TerminalColors.FAIL}Could not bulk create portfolio invitations.{TerminalColors.ENDC}")
logger.error(err, exc_info=True)
# Create UserPortfolioPermission
self.create_user_portfolio_permissions(domains)
# Create PortfolioInvitation
self.create_portfolio_invitations(domains)
# == PRINT RUN SUMMARY == #
self.print_final_run_summary(parse_domains, parse_requests, parse_managers, debug)
@ -359,6 +353,7 @@ class Command(BaseCommand):
# Normalize all suborg names so we don't add duplicate data unintentionally.
for portfolio_name, portfolio in portfolio_dict.items():
for norm_org_name, domains in domains_dict.items():
# Don't add the record if the suborg name would equal the portfolio name
if norm_org_name == portfolio_name:
continue
@ -381,7 +376,6 @@ class Command(BaseCommand):
suborg = Suborganization(name=new_suborg_name, portfolio=portfolio)
self.set_suborganization_location(suborg, domains, requests)
created_suborgs[norm_org_name] = suborg
return created_suborgs
def set_suborganization_location(self, suborg, domains, requests):
@ -454,139 +448,121 @@ class Command(BaseCommand):
suborg.city = normalize_string(request.city, lowercase=False)
suborg.state_territory = request.state_territory
def handle_portfolio_domains(self, portfolio: Portfolio, federal_agency: FederalAgency):
def update_domains(self, portfolio, federal_agency, suborgs, debug):
"""
Associate portfolio with domains for a federal agency.
Updates all relevant domain information records.
Returns a queryset of DomainInformation objects, or None if nothing changed.
"""
updated_domains = set()
domain_infos = federal_agency.domaininformation_set.all()
if not domain_infos.exists():
return None
# Get all suborg information and store it in a dict to avoid doing a db call
suborgs = Suborganization.objects.filter(portfolio=portfolio).in_bulk(field_name="name")
for domain_info in domain_infos:
org_name = normalize_string(domain_info.organization_name, lowercase=False)
domain_info.portfolio = portfolio
domain_info.sub_organization = suborgs.get(org_name, None)
self.domain_info_changes.update.append(domain_info)
updated_domains.add(domain_info)
def handle_portfolio_requests(
if not updated_domains and debug:
message = (
f"Portfolio '{portfolio}' not added to domains: nothing to add found."
)
logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
return updated_domains
def update_requests(
self,
portfolio: Portfolio,
federal_agency: FederalAgency,
clear_federal_agency_on_started_domain_requests: bool,
debug: bool,
portfolio,
federal_agency,
suborgs,
debug,
):
"""
Associate portfolio with domain requests for a federal agency.
Updates all relevant domain request records.
"""
updated_domain_requests = set()
invalid_states = [
DomainRequest.DomainRequestStatus.STARTED,
DomainRequest.DomainRequestStatus.INELIGIBLE,
DomainRequest.DomainRequestStatus.REJECTED,
]
domain_requests = DomainRequest.objects.filter(federal_agency=federal_agency).exclude(status__in=invalid_states)
if not domain_requests.exists():
if debug:
message = (
f"Portfolio '{portfolio}' not added to domain requests: nothing to add found."
"Excluded statuses: STARTED, INELIGIBLE, REJECTED."
)
logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
return None
domain_requests = federal_agency.domainrequest_set.exclude(status__in=invalid_states)
# Get all suborg information and store it in a dict to avoid doing a db call
suborgs = Suborganization.objects.filter(portfolio=portfolio).in_bulk(field_name="name")
# Add portfolio, sub_org, requested_suborg, suborg_city, and suborg_state_territory.
# For started domain requests, set the federal agency to None if not on a portfolio.
for domain_request in domain_requests:
org_name = normalize_string(domain_request.organization_name, lowercase=False)
domain_request.portfolio = portfolio
domain_request.sub_organization = suborgs.get(org_name, None)
if domain_request.sub_organization is None:
domain_request.requested_suborganization = normalize_string(
domain_request.organization_name, lowercase=False
)
domain_request.suborganization_city = normalize_string(domain_request.city, lowercase=False)
domain_request.suborganization_state_territory = domain_request.state_territory
self.domain_request_changes.update.append(domain_request)
# For each STARTED request, clear the federal agency under these conditions:
# 1. A portfolio *already exists* with the same name as the federal agency.
# 2. Said portfolio (or portfolios) are only the ones specified at the start of the script.
# 3. The domain request is in status "started".
# Note: Both names are normalized so excess spaces are stripped and the string is lowercased.
if clear_federal_agency_on_started_domain_requests:
started_domain_requests = federal_agency.domainrequest_set.filter(
status=DomainRequest.DomainRequestStatus.STARTED,
organization_name__isnull=False,
)
portfolio_name = normalize_string(portfolio.organization_name)
# Update the request, assuming the given agency name matches the portfolio name
for domain_request in started_domain_requests:
if domain_request.status != DomainRequest.DomainRequestStatus.STARTED:
org_name = normalize_string(domain_request.organization_name, lowercase=False)
domain_request.portfolio = portfolio
domain_request.sub_organization = suborgs.get(org_name, None)
if domain_request.sub_organization is None:
domain_request.requested_suborganization = normalize_string(
domain_request.organization_name, lowercase=False
)
domain_request.suborganization_city = normalize_string(domain_request.city, lowercase=False)
domain_request.suborganization_state_territory = domain_request.state_territory
else:
# Clear the federal agency for started domain requests
agency_name = normalize_string(domain_request.federal_agency.agency)
if agency_name == portfolio_name:
portfolio_name = normalize_string(portfolio.organization_name)
if not domain_request.portfolio and agency_name == portfolio_name:
domain_request.federal_agency = None
self.domain_request_changes.update.append(domain_request)
logger.info(f"Set federal agency on started domain request '{domain_request}' to None.")
updated_domain_requests.add(domain_request)
def handle_portfolio_managers(self, portfolio: Portfolio, debug):
"""
Add all domain managers of the portfolio's domains to the organization.
This includes adding them to the correct group and creating portfolio invitations.
"""
domains = portfolio.information_portfolio.all().values_list("domain", flat=True)
if not updated_domain_requests and debug:
message = (
f"Portfolio '{portfolio}' not added to domain requests: nothing to add found."
)
logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
# Fetch all users with manager roles for the domains
user_domain_roles = UserDomainRole.objects.select_related("user").filter(
domain__in=domains, role=UserDomainRole.Roles.MANAGER
return updated_domain_requests
def create_user_portfolio_permissions(self, domains):
user_domain_roles = UserDomainRole.objects.select_related(
"user",
"domain",
"domain__domain_info",
"domain__domain_info__portfolio"
).filter(
domain__in=domains,
domain__domain_info__portfolio__isnull=False,
role=UserDomainRole.Roles.MANAGER
)
existing_permissions = UserPortfolioPermission.objects.filter(
user__in=user_domain_roles.values_list("user"), portfolio=portfolio
)
existing_permissions_dict = {permission.user: permission for permission in existing_permissions}
for user_domain_role in user_domain_roles:
user = user_domain_role.user
if user not in existing_permissions_dict:
permission = UserPortfolioPermission(
portfolio=portfolio,
user=user,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
permission, created = UserPortfolioPermission.objects.get_or_create(
portfolio=user_domain_role.domain.domain_info.portfolio,
user=user,
defaults={
"roles": [UserPortfolioRoleChoices.ORGANIZATION_MEMBER]
}
)
if created:
self.user_portfolio_perm_changes.create.append(permission)
if debug:
logger.info(f"Added manager '{permission.user}' to portfolio '{portfolio}'.")
else:
existing_permission = existing_permissions_dict.get(user)
self.user_portfolio_perm_changes.skip.append(existing_permission)
if debug:
logger.info(f"Manager '{user}' already exists on portfolio '{portfolio}'.")
self.user_portfolio_perm_changes.skip.append(permission)
# Get the emails of invited managers
domain_invitations = DomainInvitation.objects.filter(
domain__in=domains, status=DomainInvitation.DomainInvitationStatus.INVITED
def create_portfolio_invitations(self, domains):
domain_invitations = DomainInvitation.objects.select_related(
"domain",
"domain__domain_info",
"domain__domain_info__portfolio"
).filter(
domain__in=domains,
domain__domain_info__portfolio__isnull=False,
status=DomainInvitation.DomainInvitationStatus.INVITED
)
existing_invitations = PortfolioInvitation.objects.filter(
email__in=domain_invitations.values_list("email"), portfolio=portfolio
)
existing_invitation_dict = {normalize_string(invite.email): invite for invite in existing_invitations}
for domain_invitation in domain_invitations:
email = normalize_string(domain_invitation.email)
if email not in existing_invitation_dict:
invitation = PortfolioInvitation(
portfolio=portfolio,
email=email,
status=PortfolioInvitation.PortfolioInvitationStatus.INVITED,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
invitation, created = PortfolioInvitation.objects.get_or_create(
portfolio=domain_invitation.domain.domain_info.portfolio,
email=email,
status=PortfolioInvitation.PortfolioInvitationStatus.INVITED,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
if created:
self.portfolio_invitation_changes.create.append(invitation)
if debug:
logger.info(f"Added invitation '{email}' to portfolio '{portfolio}'.")
else:
existing_invitation = existing_invitations.get(email)
self.portfolio_invitation_changes.skip.append(existing_invitation)
if debug:
logger.info(f"Invitation '{email}' already exists in portfolio '{portfolio}'.")
self.portfolio_invitation_changes.skip.append(invitation)

View file

@ -1530,13 +1530,10 @@ class TestCreateFederalPortfolio(TestCase):
@less_console_noise_decorator
def test_post_process_started_domain_requests_existing_portfolio(self):
"""Ensures that federal agency is cleared when agency name matches portfolio name.
As the name implies, this implicitly tests the "post_process_started_domain_requests" function.
"""
"""Ensures that federal agency is cleared when agency name matches portfolio name."""
federal_agency_2 = FederalAgency.objects.create(agency="Sugarcane", federal_type=BranchChoices.EXECUTIVE)
# Test records with portfolios and no org names
# Create a portfolio. This script skips over "started"
portfolio = Portfolio.objects.create(organization_name="Sugarcane", creator=self.user)
# Create a domain request with matching org name
matching_request = completed_domain_request(