fix tests

This commit is contained in:
matthewswspence 2025-02-02 17:54:53 -06:00
parent 4bac837dbc
commit b28ddf296e
No known key found for this signature in database
GPG key ID: FB458202A7852BA4
2 changed files with 88 additions and 21 deletions

View file

@ -30,6 +30,8 @@ class Command(BaseCommand):
self.failed_portfolios = set()
self.added_managers = set()
self.added_invitations = set()
self.failed_managers = set()
self.failed_invitations = set()
def add_arguments(self, parser):
"""Add command line arguments to create federal portfolios.
@ -114,9 +116,15 @@ class Command(BaseCommand):
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
try:
# C901 'Command.handle' is too complex (12)
portfolio = self.handle_populate_portfolio(federal_agency, parse_domains, parse_requests, both)
portfolios.append(portfolio)
# if the portfolio is already created, we don't want to create it again
portfolio = Portfolio.objects.filter(organization_name=federal_agency.agency)
if portfolio.exists():
portfolio = portfolio.first()
message = f"Portfolio '{federal_agency.agency}' already exists. Skipping create."
TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message)
else:
portfolio = self.handle_populate_portfolio(federal_agency, parse_domains, parse_requests, both)
portfolios.append(portfolio)
logger.debug(f"add_managers: {add_managers}")
if add_managers:
logger.debug("Adding managers to portfolio")
@ -141,6 +149,25 @@ class Command(BaseCommand):
display_as_str=True,
)
if add_managers:
TerminalHelper.log_script_run_summary(
self.added_managers,
self.failed_managers,
[], # can't skip managers, can only add or fail
log_header="----- MANAGERS ADDED -----",
debug=False,
display_as_str=True,
)
TerminalHelper.log_script_run_summary(
self.added_invitations,
self.failed_invitations,
[], # can't skip invitations, can only add or fail
log_header="----- INVITATIONS ADDED -----",
debug=False,
display_as_str=True,
)
# POST PROCESSING STEP: Remove the federal agency if it matches the portfolio name.
# We only do this for started domain requests.
if parse_requests or both:
@ -190,11 +217,13 @@ class Command(BaseCommand):
user=user,
defaults={"roles": [UserPortfolioRoleChoices.ORGANIZATION_MEMBER]},
)
self.added_managers.add(user)
if created:
logger.info(f"Added manager '{user}' to portfolio '{portfolio}'")
else:
logger.info(f"Manager '{user}' already exists in portfolio '{portfolio}'")
except User.DoesNotExist:
self.failed_managers.add(user)
logger.debug(f"User '{user}' does not exist")
for manager in invited_managers:
@ -229,13 +258,20 @@ class Command(BaseCommand):
logger.info(f"Created portfolio permission for '{user}' to portfolio '{portfolio}'")
else:
logger.info(f"Retrieved existing portfolio permission for '{user}' to portfolio '{portfolio}'")
self.added_invitations.add(user)
except User.DoesNotExist:
PortfolioInvitation.objects.get_or_create(
portfolio=portfolio,
email=email,
defaults={"status": PortfolioInvitation.PortfolioInvitationStatus.INVITED},
)
self.added_invitations.add(email)
logger.info(f"Created portfolio invitation for '{email}' to portfolio '{portfolio}'")
except Exception as exc:
self.failed_invitations.add(email)
logger.error(exc, exc_info=True)
logger.error(f"Failed to create portfolio invitation for '{email}' to portfolio '{portfolio}'")
def post_process_started_domain_requests(self, agencies, portfolios):
"""
@ -250,13 +286,20 @@ class Command(BaseCommand):
# 2. Said portfolio (or portfolios) are only the ones specified at the start of the script.
# 3. The domain request is in status "started".
# Note: Both names are normalized so excess spaces are stripped and the string is lowercased.
message = f"agencies: {agencies}"
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
domain_requests_to_update = DomainRequest.objects.filter(
federal_agency__in=agencies,
federal_agency__agency__isnull=False,
status=DomainRequest.DomainRequestStatus.STARTED,
organization_name__isnull=False,
)
message = (f"domain_requests_to_update: {domain_requests_to_update}")
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
portfolio_set = {normalize_string(portfolio.organization_name) for portfolio in portfolios if portfolio}
message = f"portfolio_set: {portfolio_set}"
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
# Update the request, assuming the given agency name matches the portfolio name
updated_requests = []
@ -265,6 +308,9 @@ class Command(BaseCommand):
if agency_name in portfolio_set:
req.federal_agency = None
updated_requests.append(req)
message = f"updated_requests: {updated_requests}"
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
# Execute the update and Log the results
if TerminalHelper.prompt_for_execution(
@ -275,6 +321,8 @@ class Command(BaseCommand):
),
prompt_title="Do you wish to commit this update to the database?",
):
message = f"prompted for execution"
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
DomainRequest.objects.bulk_update(updated_requests, ["federal_agency"])
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKBLUE, "Action completed successfully.")

View file

@ -1460,14 +1460,11 @@ class TestCreateFederalPortfolio(TestCase):
first_name="first", last_name="last", email="mango@igorville.gov", federal_agency=self.executive_agency_2
)
self.portfolio = Portfolio.objects.create(organization_name=self.federal_agency.agency, federal_agency=self.federal_agency, creator=self.user)
with boto3_mocking.clients.handler_for("sesv2", self.mock_client):
self.domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
generic_org_type=DomainRequest.OrganizationChoices.CITY,
federal_agency=self.federal_agency,
portfolio=self.portfolio,
user=self.user,
)
self.domain_request.approve()
@ -1479,26 +1476,22 @@ class TestCreateFederalPortfolio(TestCase):
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
generic_org_type=DomainRequest.OrganizationChoices.CITY,
federal_agency=self.federal_agency,
portfolio=self.portfolio,
user=self.user,
organization_name="Test Federal Agency",
)
self.domain_request_2.approve()
self.domain_info_2 = DomainInformation.objects.filter(domain_request=self.domain_request_2).get()
self.domain_2 = Domain.objects.get(name="icecreamforigorville.gov")
self.domain_request_3 = completed_domain_request(
name="exec_1.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
generic_org_type=DomainRequest.OrganizationChoices.FEDERAL,
federal_agency=self.executive_agency_1,
portfolio=self.portfolio,
user=self.user,
organization_name="Executive Agency 1",
)
self.domain_request_3.approve()
self.domain_info_3 = self.domain_request_3.DomainRequest_info
self.domain = Domain.objects.get(name="exec_1.gov")
self.domain_request_4 = completed_domain_request(
name="exec_2.gov",
@ -1506,12 +1499,10 @@ class TestCreateFederalPortfolio(TestCase):
generic_org_type=DomainRequest.OrganizationChoices.FEDERAL,
federal_agency=self.executive_agency_2,
user=self.user,
portfolio=self.portfolio,
organization_name="Executive Agency 2",
)
self.domain_request_4.approve()
self.domain_info_4 = self.domain_request_4.DomainRequest_info
self.domain_4 = Domain.objects.get(name="exec_2.gov")
def tearDown(self):
DomainInformation.objects.all().delete()
@ -1529,7 +1520,7 @@ class TestCreateFederalPortfolio(TestCase):
):
call_command("create_federal_portfolio", **kwargs)
@less_console_noise_decorator
# @less_console_noise_decorator
def test_post_process_started_domain_requests_existing_portfolio(self):
"""Ensures that federal agency is cleared when agency name matches portfolio name.
As the name implies, this implicitly tests the "post_process_started_domain_requests" function.
@ -1869,7 +1860,10 @@ class TestCreateFederalPortfolio(TestCase):
UserDomainRole.objects.create(user=manager2, domain=self.domain, role=UserDomainRole.Roles.MANAGER)
# Run the management command
self.run_create_federal_portfolio(agency_name=self.portfolio.organization_name, parse_requests=True, add_managers=True)
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True)
# Check that the portfolio was created
self.portfolio = Portfolio.objects.get(federal_agency=self.federal_agency)
# Check that the users have been added as portfolio managers
permissions = UserPortfolioPermission.objects.filter(portfolio=self.portfolio, user__in=[manager1, manager2])
@ -1890,13 +1884,13 @@ class TestCreateFederalPortfolio(TestCase):
email="manager1@example.com",
status=DomainInvitation.DomainInvitationStatus.INVITED
)
# Ensure no existing PortfolioInvitation for the invited email
self.assertFalse(PortfolioInvitation.objects.filter(email="manager1@example.com", portfolio=self.portfolio).exists())
# Run the management command
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True)
# Check that the portfolio was created
self.portfolio = Portfolio.objects.get(federal_agency=self.federal_agency)
# Check that a PortfolioInvitation has been created for the invited email
invitation = PortfolioInvitation.objects.get(email="manager1@example.com", portfolio=self.portfolio)
@ -1909,9 +1903,9 @@ class TestCreateFederalPortfolio(TestCase):
# Verify that no duplicate invitations are created
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True)
duplicated_invitations = PortfolioInvitation.objects.filter(email="manager1@example.com", portfolio=self.portfolio)
invitations = PortfolioInvitation.objects.filter(email="manager1@example.com", portfolio=self.portfolio)
self.assertEqual(
duplicated_invitations.count(),
invitations.count(),
1,
"Duplicate PortfolioInvitation should not be created for the same email and portfolio."
)
@ -1922,6 +1916,9 @@ class TestCreateFederalPortfolio(TestCase):
# Create a manager
manager = User.objects.create(username="manager", email="manager@example.com")
UserDomainRole.objects.create(user=manager, domain=self.domain, role=UserDomainRole.Roles.MANAGER)
# Create a pre-existing portfolio
self.portfolio = Portfolio.objects.create(organization_name=self.federal_agency.agency, federal_agency=self.federal_agency, creator=self.user)
# Manually add the manager to the portfolio
UserPortfolioPermission.objects.create(portfolio=self.portfolio, user=manager, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER])
@ -1931,4 +1928,26 @@ class TestCreateFederalPortfolio(TestCase):
# Ensure that the manager is not duplicated
permissions = UserPortfolioPermission.objects.filter(portfolio=self.portfolio, user=manager)
self.assertEqual(permissions.count(), 1)
self.assertEqual(permissions.count(), 1)
@less_console_noise_decorator
def test_add_managers_portfolio_already_exists(self):
"""Test that managers are skipped when the portfolio already exists."""
# Create a pre-existing portfolio
self.portfolio = Portfolio.objects.create(organization_name=self.federal_agency.agency, federal_agency=self.federal_agency, creator=self.user)
# Create users and assign them as domain managers
manager1 = User.objects.create(username="manager1", email="manager1@example.com")
manager2 = User.objects.create(username="manager2", email="manager2@example.com")
UserDomainRole.objects.create(user=manager1, domain=self.domain, role=UserDomainRole.Roles.MANAGER)
UserDomainRole.objects.create(user=manager2, domain=self.domain, role=UserDomainRole.Roles.MANAGER)
# Run the management command
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True)
# Check that managers were added to the portfolio
permissions = UserPortfolioPermission.objects.filter(portfolio=self.portfolio, user__in=[manager1, manager2])
self.assertEqual(permissions.count(), 2)
for perm in permissions:
self.assertIn(UserPortfolioRoleChoices.ORGANIZATION_MEMBER, perm.roles)