linter and final test fixes

This commit is contained in:
matthewswspence 2025-02-03 11:20:09 -06:00
parent 0b91689cd5
commit bdd57c8cfd
No known key found for this signature in database
GPG key ID: FB458202A7852BA4
2 changed files with 94 additions and 66 deletions

View file

@ -89,7 +89,7 @@ class Command(BaseCommand):
help="Only add suborganizations to newly created portfolios, skip existing ones.",
)
def handle(self, **options):
def handle(self, **options): # noqa: C901
agency_name = options.get("agency_name")
branch = options.get("branch")
parse_requests = options.get("parse_requests")
@ -116,7 +116,6 @@ class Command(BaseCommand):
)
else:
raise CommandError(f"Cannot find '{branch}' federal agencies in our database.")
portfolios = []
for federal_agency in agencies:
message = f"Processing federal agency '{federal_agency.agency}'..."
@ -127,6 +126,8 @@ class Command(BaseCommand):
federal_agency, parse_domains, parse_requests, both, skip_existing_portfolios
)
portfolios.append(portfolio)
if add_managers:
self.add_managers_to_portfolio(portfolio)
except Exception as exec:
self.failed_portfolios.add(federal_agency)
logger.error(exec)
@ -192,31 +193,32 @@ class Command(BaseCommand):
This includes adding them to the correct group and creating portfolio invitations.
"""
logger.info(f"Adding managers for portfolio {portfolio}")
# Fetch all domains associated with the portfolio
domains = Domain.objects.filter(domain_info__portfolio=portfolio)
logger.debug(f"domains: {domains}")
domain_managers = set()
domain_managers: set[int] = set()
# Fetch all users with manager roles for the domains
managers = UserDomainRole.objects.filter(
domain__in=domains,
role=UserDomainRole.Roles.MANAGER
).values_list('user', flat=True)
managers = UserDomainRole.objects.filter(domain__in=domains, role=UserDomainRole.Roles.MANAGER).values_list(
"user", flat=True
)
domain_managers.update(managers)
invited_managers = set()
invited_managers: set[str] = set()
# Get the emails of invited managers
for domain in domains:
domain_invitations = DomainInvitation.objects.filter(domain=domain, status=DomainInvitation.DomainInvitationStatus.INVITED).values_list('email', flat=True)
domain_invitations = DomainInvitation.objects.filter(
domain=domain, status=DomainInvitation.DomainInvitationStatus.INVITED
).values_list("email", flat=True)
invited_managers.update(domain_invitations)
logger.debug(f"invited_managers: {invited_managers}")
for manager in domain_managers:
for id in domain_managers:
try:
# manager is a user id
user = User.objects.get(id=manager)
user = User.objects.get(id=id)
_, created = UserPortfolioPermission.objects.get_or_create(
portfolio=portfolio,
user=user,
@ -230,9 +232,9 @@ class Command(BaseCommand):
except User.DoesNotExist:
self.failed_managers.add(user)
logger.debug(f"User '{user}' does not exist")
for manager in invited_managers:
self.create_portfolio_invitation(portfolio, manager)
for email in invited_managers:
self.create_portfolio_invitation(portfolio, email)
def create_portfolio_invitation(self, portfolio: Portfolio, email: str):
"""
@ -252,18 +254,18 @@ class Command(BaseCommand):
logger.info(f"Created portfolio invitation for '{user}' to portfolio '{portfolio}'")
else:
logger.info(f"Retrieved existing portfolio invitation for '{user}' to portfolio '{portfolio}'")
# Assign portfolio permissions
_, created = UserPortfolioPermission.objects.get_or_create(
portfolio=portfolio,
user=user,
defaults={"role": UserPortfolioPermission.RoleChoices.MANAGER},
defaults={"role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER},
)
if created:
logger.info(f"Created portfolio permission for '{user}' to portfolio '{portfolio}'")
else:
logger.info(f"Retrieved existing portfolio permission for '{user}' to portfolio '{portfolio}'")
self.added_invitations.add(user)
except User.DoesNotExist:
PortfolioInvitation.objects.get_or_create(
@ -316,7 +318,7 @@ class Command(BaseCommand):
if agency_name in portfolio_set:
req.federal_agency = None
updated_requests.append(req)
message = f"updated_requests: {updated_requests}"
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
@ -329,7 +331,7 @@ class Command(BaseCommand):
),
prompt_title="Do you wish to commit this update to the database?",
):
message = f"prompted for execution"
message = "prompted for execution"
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
DomainRequest.objects.bulk_update(updated_requests, ["federal_agency"])
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKBLUE, "Action completed successfully.")

View file

@ -3,17 +3,11 @@ import boto3_mocking # type: ignore
from datetime import date, datetime, time
from django.core.management import call_command
from django.test import TestCase, override_settings
<<<<<<< HEAD
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.senior_official import SeniorOfficial
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
=======
from registrar.models.domain_group import DomainGroup
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.senior_official import SeniorOfficial
from registrar.models.user_portfolio_permission import UserPortfolioPermission
>>>>>>> origin/main
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
from registrar.utility.constants import BranchChoices
from django.utils import timezone
from django.utils.module_loading import import_string
@ -1472,7 +1466,7 @@ class TestCreateFederalPortfolio(TestCase):
self.executive_so_2 = SeniorOfficial.objects.create(
first_name="first", last_name="last", email="mango@igorville.gov", federal_agency=self.executive_agency_2
)
with boto3_mocking.clients.handler_for("sesv2", self.mock_client):
self.domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
@ -1533,7 +1527,7 @@ class TestCreateFederalPortfolio(TestCase):
):
call_command("create_federal_portfolio", **kwargs)
# @less_console_noise_decorator
@less_console_noise_decorator
def test_post_process_started_domain_requests_existing_portfolio(self):
"""Ensures that federal agency is cleared when agency name matches portfolio name.
As the name implies, this implicitly tests the "post_process_started_domain_requests" function.
@ -1862,7 +1856,6 @@ class TestCreateFederalPortfolio(TestCase):
self.assertEqual(existing_portfolio.notes, "Old notes")
self.assertEqual(existing_portfolio.creator, self.user)
<<<<<<< HEAD
@less_console_noise_decorator
def test_add_managers_from_domains(self):
"""Test that all domain managers are added as portfolio managers."""
@ -1872,56 +1865,55 @@ class TestCreateFederalPortfolio(TestCase):
manager2 = User.objects.create(username="manager2", email="manager2@example.com")
UserDomainRole.objects.create(user=manager1, domain=self.domain, role=UserDomainRole.Roles.MANAGER)
UserDomainRole.objects.create(user=manager2, domain=self.domain, role=UserDomainRole.Roles.MANAGER)
# Run the management command
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True)
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_domains=True, add_managers=True)
# Check that the portfolio was created
self.portfolio = Portfolio.objects.get(federal_agency=self.federal_agency)
# Check that the users have been added as portfolio managers
permissions = UserPortfolioPermission.objects.filter(portfolio=self.portfolio, user__in=[manager1, manager2])
print(UserPortfolioPermission.objects.all())
# Check that the users have been added as portfolio managers
# Check that the users have been added as portfolio managers
self.assertEqual(permissions.count(), 2)
for perm in permissions:
self.assertIn(UserPortfolioRoleChoices.ORGANIZATION_MEMBER, perm.roles)
@less_console_noise_decorator
def test_add_invited_managers(self):
"""Test that invited domain managers receive portfolio invitations."""
# create a domain invitation for the manager
_ = DomainInvitation.objects.create(
domain=self.domain,
email="manager1@example.com",
status=DomainInvitation.DomainInvitationStatus.INVITED
)
domain=self.domain, email="manager1@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
)
# Run the management command
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True)
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_domains=True, add_managers=True)
# Check that the portfolio was created
self.portfolio = Portfolio.objects.get(federal_agency=self.federal_agency)
# Check that a PortfolioInvitation has been created for the invited email
invitation = PortfolioInvitation.objects.get(email="manager1@example.com", portfolio=self.portfolio)
# Verify the status of the invitation remains INVITED
self.assertEqual(
invitation.status,
PortfolioInvitation.PortfolioInvitationStatus.INVITED,
"PortfolioInvitation status should remain INVITED for non-existent users."
"PortfolioInvitation status should remain INVITED for non-existent users.",
)
# Verify that no duplicate invitations are created
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True)
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True
)
invitations = PortfolioInvitation.objects.filter(email="manager1@example.com", portfolio=self.portfolio)
self.assertEqual(
invitations.count(),
1,
"Duplicate PortfolioInvitation should not be created for the same email and portfolio."
"Duplicate PortfolioInvitation should not be created for the same email and portfolio.",
)
@less_console_noise_decorator
@ -1931,41 +1923,76 @@ class TestCreateFederalPortfolio(TestCase):
manager = User.objects.create(username="manager", email="manager@example.com")
UserDomainRole.objects.create(user=manager, domain=self.domain, role=UserDomainRole.Roles.MANAGER)
# Create a pre-existing portfolio
self.portfolio = Portfolio.objects.create(organization_name=self.federal_agency.agency, federal_agency=self.federal_agency, creator=self.user)
# Create a pre-existing portfolio
self.portfolio = Portfolio.objects.create(
organization_name=self.federal_agency.agency, federal_agency=self.federal_agency, creator=self.user
)
# Manually add the manager to the portfolio
UserPortfolioPermission.objects.create(portfolio=self.portfolio, user=manager, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER])
UserPortfolioPermission.objects.create(
portfolio=self.portfolio, user=manager, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER]
)
# Run the management command
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True)
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True
)
# Ensure that the manager is not duplicated
permissions = UserPortfolioPermission.objects.filter(portfolio=self.portfolio, user=manager)
self.assertEqual(permissions.count(), 1)
@less_console_noise_decorator
def test_add_managers_portfolio_already_exists(self):
def test_add_managers_skip_existing_portfolios(self):
"""Test that managers are skipped when the portfolio already exists."""
# Create a pre-existing portfolio
self.portfolio = Portfolio.objects.create(organization_name=self.federal_agency.agency, federal_agency=self.federal_agency, creator=self.user)
# Create a pre-existing portfolio
self.portfolio = Portfolio.objects.create(
organization_name=self.federal_agency.agency, federal_agency=self.federal_agency, creator=self.user
)
domain_request_1 = completed_domain_request(
name="domain1.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
generic_org_type=DomainRequest.OrganizationChoices.CITY,
federal_agency=self.federal_agency,
user=self.user,
portfolio=self.portfolio,
)
domain_request_1.approve()
domain1 = Domain.objects.get(name="domain1.gov")
domain_request_2 = completed_domain_request(
name="domain2.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
generic_org_type=DomainRequest.OrganizationChoices.CITY,
federal_agency=self.federal_agency,
user=self.user,
portfolio=self.portfolio,
)
domain_request_2.approve()
domain2 = Domain.objects.get(name="domain2.gov")
# Create users and assign them as domain managers
manager1 = User.objects.create(username="manager1", email="manager1@example.com")
manager2 = User.objects.create(username="manager2", email="manager2@example.com")
UserDomainRole.objects.create(user=manager1, domain=self.domain, role=UserDomainRole.Roles.MANAGER)
UserDomainRole.objects.create(user=manager2, domain=self.domain, role=UserDomainRole.Roles.MANAGER)
UserDomainRole.objects.create(user=manager1, domain=domain1, role=UserDomainRole.Roles.MANAGER)
UserDomainRole.objects.create(user=manager2, domain=domain2, role=UserDomainRole.Roles.MANAGER)
# Run the management command
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True)
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency,
parse_requests=True,
add_managers=True,
skip_existing_portfolios=True,
)
# Check that managers were added to the portfolio
permissions = UserPortfolioPermission.objects.filter(portfolio=self.portfolio, user__in=[manager1, manager2])
self.assertEqual(permissions.count(), 2)
for perm in permissions:
self.assertIn(UserPortfolioRoleChoices.ORGANIZATION_MEMBER, perm.roles)
=======
def test_skip_existing_portfolios(self):
"""Tests the skip_existing_portfolios to ensure that it doesn't add
suborgs, domain requests, and domain info."""
@ -2466,4 +2493,3 @@ class TestRemovePortfolios(TestCase):
# Check that the portfolio was deleted
self.assertFalse(Portfolio.objects.filter(organization_name="Test with suborg").exists())
>>>>>>> origin/main