From bdd57c8cfda0abbaa7f3e3a0e8e1b9b8460ceb93 Mon Sep 17 00:00:00 2001 From: matthewswspence Date: Mon, 3 Feb 2025 11:20:09 -0600 Subject: [PATCH] linter and final test fixes --- .../commands/create_federal_portfolio.py | 42 ++++--- .../tests/test_management_scripts.py | 118 +++++++++++------- 2 files changed, 94 insertions(+), 66 deletions(-) diff --git a/src/registrar/management/commands/create_federal_portfolio.py b/src/registrar/management/commands/create_federal_portfolio.py index 2f2cc2625..71004dc49 100644 --- a/src/registrar/management/commands/create_federal_portfolio.py +++ b/src/registrar/management/commands/create_federal_portfolio.py @@ -89,7 +89,7 @@ class Command(BaseCommand): help="Only add suborganizations to newly created portfolios, skip existing ones.", ) - def handle(self, **options): + def handle(self, **options): # noqa: C901 agency_name = options.get("agency_name") branch = options.get("branch") parse_requests = options.get("parse_requests") @@ -116,7 +116,6 @@ class Command(BaseCommand): ) else: raise CommandError(f"Cannot find '{branch}' federal agencies in our database.") - portfolios = [] for federal_agency in agencies: message = f"Processing federal agency '{federal_agency.agency}'..." @@ -127,6 +126,8 @@ class Command(BaseCommand): federal_agency, parse_domains, parse_requests, both, skip_existing_portfolios ) portfolios.append(portfolio) + if add_managers: + self.add_managers_to_portfolio(portfolio) except Exception as exec: self.failed_portfolios.add(federal_agency) logger.error(exec) @@ -192,31 +193,32 @@ class Command(BaseCommand): This includes adding them to the correct group and creating portfolio invitations. """ logger.info(f"Adding managers for portfolio {portfolio}") - + # Fetch all domains associated with the portfolio domains = Domain.objects.filter(domain_info__portfolio=portfolio) logger.debug(f"domains: {domains}") - domain_managers = set() + domain_managers: set[int] = set() # Fetch all users with manager roles for the domains - managers = UserDomainRole.objects.filter( - domain__in=domains, - role=UserDomainRole.Roles.MANAGER - ).values_list('user', flat=True) + managers = UserDomainRole.objects.filter(domain__in=domains, role=UserDomainRole.Roles.MANAGER).values_list( + "user", flat=True + ) domain_managers.update(managers) - invited_managers = set() + invited_managers: set[str] = set() # Get the emails of invited managers for domain in domains: - domain_invitations = DomainInvitation.objects.filter(domain=domain, status=DomainInvitation.DomainInvitationStatus.INVITED).values_list('email', flat=True) + domain_invitations = DomainInvitation.objects.filter( + domain=domain, status=DomainInvitation.DomainInvitationStatus.INVITED + ).values_list("email", flat=True) invited_managers.update(domain_invitations) logger.debug(f"invited_managers: {invited_managers}") - for manager in domain_managers: + for id in domain_managers: try: # manager is a user id - user = User.objects.get(id=manager) + user = User.objects.get(id=id) _, created = UserPortfolioPermission.objects.get_or_create( portfolio=portfolio, user=user, @@ -230,9 +232,9 @@ class Command(BaseCommand): except User.DoesNotExist: self.failed_managers.add(user) logger.debug(f"User '{user}' does not exist") - - for manager in invited_managers: - self.create_portfolio_invitation(portfolio, manager) + + for email in invited_managers: + self.create_portfolio_invitation(portfolio, email) def create_portfolio_invitation(self, portfolio: Portfolio, email: str): """ @@ -252,18 +254,18 @@ class Command(BaseCommand): logger.info(f"Created portfolio invitation for '{user}' to portfolio '{portfolio}'") else: logger.info(f"Retrieved existing portfolio invitation for '{user}' to portfolio '{portfolio}'") - + # Assign portfolio permissions _, created = UserPortfolioPermission.objects.get_or_create( portfolio=portfolio, user=user, - defaults={"role": UserPortfolioPermission.RoleChoices.MANAGER}, + defaults={"role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER}, ) if created: logger.info(f"Created portfolio permission for '{user}' to portfolio '{portfolio}'") else: logger.info(f"Retrieved existing portfolio permission for '{user}' to portfolio '{portfolio}'") - + self.added_invitations.add(user) except User.DoesNotExist: PortfolioInvitation.objects.get_or_create( @@ -316,7 +318,7 @@ class Command(BaseCommand): if agency_name in portfolio_set: req.federal_agency = None updated_requests.append(req) - + message = f"updated_requests: {updated_requests}" TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message) @@ -329,7 +331,7 @@ class Command(BaseCommand): ), prompt_title="Do you wish to commit this update to the database?", ): - message = f"prompted for execution" + message = "prompted for execution" TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message) DomainRequest.objects.bulk_update(updated_requests, ["federal_agency"]) TerminalHelper.colorful_logger(logger.info, TerminalColors.OKBLUE, "Action completed successfully.") diff --git a/src/registrar/tests/test_management_scripts.py b/src/registrar/tests/test_management_scripts.py index 2ca4e4ffe..ac507cdf2 100644 --- a/src/registrar/tests/test_management_scripts.py +++ b/src/registrar/tests/test_management_scripts.py @@ -3,17 +3,11 @@ import boto3_mocking # type: ignore from datetime import date, datetime, time from django.core.management import call_command from django.test import TestCase, override_settings -<<<<<<< HEAD -from registrar.models.portfolio_invitation import PortfolioInvitation -from registrar.models.senior_official import SeniorOfficial -from registrar.models.user_portfolio_permission import UserPortfolioPermission -from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices -======= from registrar.models.domain_group import DomainGroup from registrar.models.portfolio_invitation import PortfolioInvitation from registrar.models.senior_official import SeniorOfficial from registrar.models.user_portfolio_permission import UserPortfolioPermission ->>>>>>> origin/main +from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices from registrar.utility.constants import BranchChoices from django.utils import timezone from django.utils.module_loading import import_string @@ -1472,7 +1466,7 @@ class TestCreateFederalPortfolio(TestCase): self.executive_so_2 = SeniorOfficial.objects.create( first_name="first", last_name="last", email="mango@igorville.gov", federal_agency=self.executive_agency_2 ) - + with boto3_mocking.clients.handler_for("sesv2", self.mock_client): self.domain_request = completed_domain_request( status=DomainRequest.DomainRequestStatus.IN_REVIEW, @@ -1533,7 +1527,7 @@ class TestCreateFederalPortfolio(TestCase): ): call_command("create_federal_portfolio", **kwargs) - # @less_console_noise_decorator + @less_console_noise_decorator def test_post_process_started_domain_requests_existing_portfolio(self): """Ensures that federal agency is cleared when agency name matches portfolio name. As the name implies, this implicitly tests the "post_process_started_domain_requests" function. @@ -1862,7 +1856,6 @@ class TestCreateFederalPortfolio(TestCase): self.assertEqual(existing_portfolio.notes, "Old notes") self.assertEqual(existing_portfolio.creator, self.user) -<<<<<<< HEAD @less_console_noise_decorator def test_add_managers_from_domains(self): """Test that all domain managers are added as portfolio managers.""" @@ -1872,56 +1865,55 @@ class TestCreateFederalPortfolio(TestCase): manager2 = User.objects.create(username="manager2", email="manager2@example.com") UserDomainRole.objects.create(user=manager1, domain=self.domain, role=UserDomainRole.Roles.MANAGER) UserDomainRole.objects.create(user=manager2, domain=self.domain, role=UserDomainRole.Roles.MANAGER) - + # Run the management command - self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True) + self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_domains=True, add_managers=True) # Check that the portfolio was created self.portfolio = Portfolio.objects.get(federal_agency=self.federal_agency) - + # Check that the users have been added as portfolio managers permissions = UserPortfolioPermission.objects.filter(portfolio=self.portfolio, user__in=[manager1, manager2]) - print(UserPortfolioPermission.objects.all()) - # Check that the users have been added as portfolio managers + # Check that the users have been added as portfolio managers self.assertEqual(permissions.count(), 2) for perm in permissions: self.assertIn(UserPortfolioRoleChoices.ORGANIZATION_MEMBER, perm.roles) - + @less_console_noise_decorator def test_add_invited_managers(self): """Test that invited domain managers receive portfolio invitations.""" # create a domain invitation for the manager _ = DomainInvitation.objects.create( - domain=self.domain, - email="manager1@example.com", - status=DomainInvitation.DomainInvitationStatus.INVITED - ) - + domain=self.domain, email="manager1@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED + ) + # Run the management command - self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True) - + self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_domains=True, add_managers=True) + # Check that the portfolio was created self.portfolio = Portfolio.objects.get(federal_agency=self.federal_agency) # Check that a PortfolioInvitation has been created for the invited email invitation = PortfolioInvitation.objects.get(email="manager1@example.com", portfolio=self.portfolio) - + # Verify the status of the invitation remains INVITED self.assertEqual( invitation.status, PortfolioInvitation.PortfolioInvitationStatus.INVITED, - "PortfolioInvitation status should remain INVITED for non-existent users." + "PortfolioInvitation status should remain INVITED for non-existent users.", ) - + # Verify that no duplicate invitations are created - self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True) + self.run_create_federal_portfolio( + agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True + ) invitations = PortfolioInvitation.objects.filter(email="manager1@example.com", portfolio=self.portfolio) self.assertEqual( invitations.count(), 1, - "Duplicate PortfolioInvitation should not be created for the same email and portfolio." + "Duplicate PortfolioInvitation should not be created for the same email and portfolio.", ) @less_console_noise_decorator @@ -1931,41 +1923,76 @@ class TestCreateFederalPortfolio(TestCase): manager = User.objects.create(username="manager", email="manager@example.com") UserDomainRole.objects.create(user=manager, domain=self.domain, role=UserDomainRole.Roles.MANAGER) - # Create a pre-existing portfolio - self.portfolio = Portfolio.objects.create(organization_name=self.federal_agency.agency, federal_agency=self.federal_agency, creator=self.user) - + # Create a pre-existing portfolio + self.portfolio = Portfolio.objects.create( + organization_name=self.federal_agency.agency, federal_agency=self.federal_agency, creator=self.user + ) + # Manually add the manager to the portfolio - UserPortfolioPermission.objects.create(portfolio=self.portfolio, user=manager, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER]) - + UserPortfolioPermission.objects.create( + portfolio=self.portfolio, user=manager, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER] + ) + # Run the management command - self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True) - + self.run_create_federal_portfolio( + agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True + ) + # Ensure that the manager is not duplicated permissions = UserPortfolioPermission.objects.filter(portfolio=self.portfolio, user=manager) self.assertEqual(permissions.count(), 1) @less_console_noise_decorator - def test_add_managers_portfolio_already_exists(self): + def test_add_managers_skip_existing_portfolios(self): """Test that managers are skipped when the portfolio already exists.""" - - # Create a pre-existing portfolio - self.portfolio = Portfolio.objects.create(organization_name=self.federal_agency.agency, federal_agency=self.federal_agency, creator=self.user) - + + # Create a pre-existing portfolio + self.portfolio = Portfolio.objects.create( + organization_name=self.federal_agency.agency, federal_agency=self.federal_agency, creator=self.user + ) + + domain_request_1 = completed_domain_request( + name="domain1.gov", + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + generic_org_type=DomainRequest.OrganizationChoices.CITY, + federal_agency=self.federal_agency, + user=self.user, + portfolio=self.portfolio, + ) + domain_request_1.approve() + domain1 = Domain.objects.get(name="domain1.gov") + + domain_request_2 = completed_domain_request( + name="domain2.gov", + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + generic_org_type=DomainRequest.OrganizationChoices.CITY, + federal_agency=self.federal_agency, + user=self.user, + portfolio=self.portfolio, + ) + domain_request_2.approve() + domain2 = Domain.objects.get(name="domain2.gov") + # Create users and assign them as domain managers manager1 = User.objects.create(username="manager1", email="manager1@example.com") manager2 = User.objects.create(username="manager2", email="manager2@example.com") - UserDomainRole.objects.create(user=manager1, domain=self.domain, role=UserDomainRole.Roles.MANAGER) - UserDomainRole.objects.create(user=manager2, domain=self.domain, role=UserDomainRole.Roles.MANAGER) + UserDomainRole.objects.create(user=manager1, domain=domain1, role=UserDomainRole.Roles.MANAGER) + UserDomainRole.objects.create(user=manager2, domain=domain2, role=UserDomainRole.Roles.MANAGER) # Run the management command - self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True) - + self.run_create_federal_portfolio( + agency_name=self.federal_agency.agency, + parse_requests=True, + add_managers=True, + skip_existing_portfolios=True, + ) + # Check that managers were added to the portfolio permissions = UserPortfolioPermission.objects.filter(portfolio=self.portfolio, user__in=[manager1, manager2]) self.assertEqual(permissions.count(), 2) for perm in permissions: self.assertIn(UserPortfolioRoleChoices.ORGANIZATION_MEMBER, perm.roles) -======= + def test_skip_existing_portfolios(self): """Tests the skip_existing_portfolios to ensure that it doesn't add suborgs, domain requests, and domain info.""" @@ -2466,4 +2493,3 @@ class TestRemovePortfolios(TestCase): # Check that the portfolio was deleted self.assertFalse(Portfolio.objects.filter(organization_name="Test with suborg").exists()) ->>>>>>> origin/main