Merge pull request #3134 from cisagov/ag/3110-create-all-federal-executive-portfolios

#3117 Allow create portfolio script to run for executive, legislative, and judical portfolios - [AG]
This commit is contained in:
zandercymatics 2024-12-05 10:56:41 -07:00 committed by GitHub
commit 6fb56fe082
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 352 additions and 134 deletions

View file

@ -893,22 +893,28 @@ Example: `cf ssh getgov-za`
[Follow these steps](#use-scp-to-transfer-data-to-sandboxes) to upload the federal_cio csv to a sandbox of your choice. [Follow these steps](#use-scp-to-transfer-data-to-sandboxes) to upload the federal_cio csv to a sandbox of your choice.
#### Step 5: Running the script #### Step 5: Running the script
```./manage.py create_federal_portfolio "{federal_agency_name}" --both``` To create a specific portfolio:
```./manage.py create_federal_portfolio --agency_name "{federal_agency_name}" --both```
Example (only requests): `./manage.py create_federal_portfolio "AMTRAK" --parse_requests` Example (only requests): `./manage.py create_federal_portfolio "AMTRAK" --parse_requests`
To create a portfolios for all federal agencies in a branch:
```./manage.py create_federal_portfolio --branch "{executive|legislative|judicial}" --both```
Example (only requests): `./manage.py create_federal_portfolio --branch "executive" --parse_requests`
### Running locally ### Running locally
#### Step 1: Running the script #### Step 1: Running the script
```docker-compose exec app ./manage.py create_federal_portfolio "{federal_agency_name}" --both``` ```docker-compose exec app ./manage.py create_federal_portfolio --agency_name "{federal_agency_name}" --both```
##### Parameters ##### Parameters
| | Parameter | Description | | | Parameter | Description |
|:-:|:-------------------------- |:-------------------------------------------------------------------------------------------| |:-:|:-------------------------- |:-------------------------------------------------------------------------------------------|
| 1 | **federal_agency_name** | Name of the FederalAgency record surrounded by quotes. For instance,"AMTRAK". | | 1 | **agency_name** | Name of the FederalAgency record surrounded by quotes. For instance,"AMTRAK". |
| 2 | **both** | If True, runs parse_requests and parse_domains. | | 2 | **branch** | Creates a portfolio for each federal agency in a branch: executive, legislative, judicial |
| 3 | **parse_requests** | If True, then the created portfolio is added to all related DomainRequests. | | 3 | **both** | If True, runs parse_requests and parse_domains. |
| 4 | **parse_domains** | If True, then the created portfolio is added to all related Domains. | | 4 | **parse_requests** | If True, then the created portfolio is added to all related DomainRequests. |
| 5 | **parse_domains** | If True, then the created portfolio is added to all related Domains. |
Note: Regarding parameters #2-#3, you cannot use `--both` while using these. You must specify either `--parse_requests` or `--parse_domains` seperately. While all of these parameters are optional in that you do not need to specify all of them, - Parameters #1-#2: Either `--agency_name` or `--branch` must be specified. Not both.
- Parameters #2-#3, you cannot use `--both` while using these. You must specify either `--parse_requests` or `--parse_domains` seperately. While all of these parameters are optional in that you do not need to specify all of them,
you must specify at least one to run this script. you must specify at least one to run this script.

View file

@ -13,16 +13,29 @@ logger = logging.getLogger(__name__)
class Command(BaseCommand): class Command(BaseCommand):
help = "Creates a federal portfolio given a FederalAgency name" help = "Creates a federal portfolio given a FederalAgency name"
def __init__(self, *args, **kwargs):
"""Defines fields to track what portfolios were updated, skipped, or just outright failed."""
super().__init__(*args, **kwargs)
self.updated_portfolios = set()
self.skipped_portfolios = set()
self.failed_portfolios = set()
def add_arguments(self, parser): def add_arguments(self, parser):
"""Add three arguments: """Add three arguments:
1. agency_name => the value of FederalAgency.agency 1. agency_name => the value of FederalAgency.agency
2. --parse_requests => if true, adds the given portfolio to each related DomainRequest 2. --parse_requests => if true, adds the given portfolio to each related DomainRequest
3. --parse_domains => if true, adds the given portfolio to each related DomainInformation 3. --parse_domains => if true, adds the given portfolio to each related DomainInformation
""" """
parser.add_argument( group = parser.add_mutually_exclusive_group(required=True)
"agency_name", group.add_argument(
"--agency_name",
help="The name of the FederalAgency to add", help="The name of the FederalAgency to add",
) )
group.add_argument(
"--branch",
choices=["executive", "legislative", "judicial"],
help="The federal branch to process. Creates a portfolio for each FederalAgency in this branch.",
)
parser.add_argument( parser.add_argument(
"--parse_requests", "--parse_requests",
action=argparse.BooleanOptionalAction, action=argparse.BooleanOptionalAction,
@ -39,7 +52,9 @@ class Command(BaseCommand):
help="Adds portfolio to both requests and domains", help="Adds portfolio to both requests and domains",
) )
def handle(self, agency_name, **options): def handle(self, **options):
agency_name = options.get("agency_name")
branch = options.get("branch")
parse_requests = options.get("parse_requests") parse_requests = options.get("parse_requests")
parse_domains = options.get("parse_domains") parse_domains = options.get("parse_domains")
both = options.get("both") both = options.get("both")
@ -51,84 +66,94 @@ class Command(BaseCommand):
if parse_requests or parse_domains: if parse_requests or parse_domains:
raise CommandError("You cannot pass --parse_requests or --parse_domains when passing --both.") raise CommandError("You cannot pass --parse_requests or --parse_domains when passing --both.")
federal_agency = FederalAgency.objects.filter(agency__iexact=agency_name).first() federal_agency_filter = {"agency__iexact": agency_name} if agency_name else {"federal_type": branch}
if not federal_agency: agencies = FederalAgency.objects.filter(**federal_agency_filter)
raise ValueError( if not agencies or agencies.count() < 1:
if agency_name:
raise CommandError(
f"Cannot find the federal agency '{agency_name}' in our database. " f"Cannot find the federal agency '{agency_name}' in our database. "
"The value you enter for `agency_name` must be " "The value you enter for `agency_name` must be "
"prepopulated in the FederalAgency table before proceeding." "prepopulated in the FederalAgency table before proceeding."
) )
else:
raise CommandError(f"Cannot find '{branch}' federal agencies in our database.")
portfolio = self.create_or_modify_portfolio(federal_agency) for federal_agency in agencies:
message = f"Processing federal agency '{federal_agency.agency}'..."
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
try:
# C901 'Command.handle' is too complex (12)
self.handle_populate_portfolio(federal_agency, parse_domains, parse_requests, both)
except Exception as exec:
self.failed_portfolios.add(federal_agency)
logger.error(exec)
message = f"Failed to create portfolio '{federal_agency.agency}'"
TerminalHelper.colorful_logger(logger.info, TerminalColors.FAIL, message)
TerminalHelper.log_script_run_summary(
self.updated_portfolios,
self.failed_portfolios,
self.skipped_portfolios,
debug=False,
skipped_header="----- SOME PORTFOLIOS WERE SKIPPED -----",
display_as_str=True,
)
def handle_populate_portfolio(self, federal_agency, parse_domains, parse_requests, both):
"""Attempts to create a portfolio. If successful, this function will
also create new suborganizations"""
portfolio, created = self.create_portfolio(federal_agency)
if created:
self.create_suborganizations(portfolio, federal_agency) self.create_suborganizations(portfolio, federal_agency)
if parse_domains or both:
self.handle_portfolio_domains(portfolio, federal_agency)
if parse_requests or both: if parse_requests or both:
self.handle_portfolio_requests(portfolio, federal_agency) self.handle_portfolio_requests(portfolio, federal_agency)
if parse_domains or both: def create_portfolio(self, federal_agency):
self.handle_portfolio_domains(portfolio, federal_agency) """Creates a portfolio if it doesn't presently exist.
Returns portfolio, created."""
# Get the org name / senior official
org_name = federal_agency.agency
so = federal_agency.so_federal_agency.first() if federal_agency.so_federal_agency.exists() else None
def create_or_modify_portfolio(self, federal_agency): # First just try to get an existing portfolio
"""Creates or modifies a portfolio record based on a federal agency.""" portfolio = Portfolio.objects.filter(organization_name=org_name).first()
portfolio_args = { if portfolio:
"federal_agency": federal_agency, self.skipped_portfolios.add(portfolio)
"organization_name": federal_agency.agency, TerminalHelper.colorful_logger(
"organization_type": DomainRequest.OrganizationChoices.FEDERAL, logger.info,
"creator": User.get_default_user(), TerminalColors.YELLOW,
"notes": "Auto-generated record", f"Portfolio with organization name '{org_name}' already exists. Skipping create.",
} )
return portfolio, False
if federal_agency.so_federal_agency.exists(): # Create new portfolio if it doesn't exist
portfolio_args["senior_official"] = federal_agency.so_federal_agency.first() portfolio = Portfolio.objects.create(
organization_name=org_name,
portfolio, created = Portfolio.objects.get_or_create( federal_agency=federal_agency,
organization_name=portfolio_args.get("organization_name"), defaults=portfolio_args organization_type=DomainRequest.OrganizationChoices.FEDERAL,
creator=User.get_default_user(),
notes="Auto-generated record",
senior_official=so,
) )
if created: self.updated_portfolios.add(portfolio)
message = f"Created portfolio '{portfolio}'" TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, f"Created portfolio '{portfolio}'")
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message)
if portfolio_args.get("senior_official"): # Log if the senior official was added or not.
message = f"Added senior official '{portfolio_args['senior_official']}'" if portfolio.senior_official:
message = f"Added senior official '{portfolio.senior_official}'"
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message) TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message)
else: else:
message = ( message = (
f"No senior official added to portfolio '{portfolio}'. " f"No senior official added to portfolio '{org_name}'. "
"None was returned for the reverse relation `FederalAgency.so_federal_agency.first()`" "None was returned for the reverse relation `FederalAgency.so_federal_agency.first()`"
) )
TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message) TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message)
else:
proceed = TerminalHelper.prompt_for_execution(
system_exit_on_terminate=False,
prompt_message=f"""
The given portfolio '{federal_agency.agency}' already exists in our DB.
If you cancel, the rest of the script will still execute but this record will not update.
""",
prompt_title="Do you wish to modify this record?",
)
if proceed:
# Don't override the creator and notes fields return portfolio, True
if portfolio.creator:
portfolio_args.pop("creator")
if portfolio.notes:
portfolio_args.pop("notes")
# Update everything else
for key, value in portfolio_args.items():
setattr(portfolio, key, value)
portfolio.save()
message = f"Modified portfolio '{portfolio}'"
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
if portfolio_args.get("senior_official"):
message = f"Added/modified senior official '{portfolio_args['senior_official']}'"
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
return portfolio
def create_suborganizations(self, portfolio: Portfolio, federal_agency: FederalAgency): def create_suborganizations(self, portfolio: Portfolio, federal_agency: FederalAgency):
"""Create Suborganizations tied to the given portfolio based on DomainInformation objects""" """Create Suborganizations tied to the given portfolio based on DomainInformation objects"""
@ -146,10 +171,11 @@ class Command(BaseCommand):
TerminalHelper.colorful_logger(logger.warning, TerminalColors.FAIL, message) TerminalHelper.colorful_logger(logger.warning, TerminalColors.FAIL, message)
return return
# Check if we need to update any existing suborgs first. This step is optional. # Check for existing suborgs on the current portfolio
existing_suborgs = Suborganization.objects.filter(name__in=org_names) existing_suborgs = Suborganization.objects.filter(name__in=org_names)
if existing_suborgs.exists(): if existing_suborgs.exists():
self._update_existing_suborganizations(portfolio, existing_suborgs) message = f"Some suborganizations already exist for portfolio '{portfolio}'."
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKBLUE, message)
# Create new suborgs, as long as they don't exist in the db already # Create new suborgs, as long as they don't exist in the db already
new_suborgs = [] new_suborgs = []
@ -175,29 +201,6 @@ class Command(BaseCommand):
else: else:
TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, "No suborganizations added") TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, "No suborganizations added")
def _update_existing_suborganizations(self, portfolio, orgs_to_update):
"""
Update existing suborganizations with new portfolio.
Prompts for user confirmation before proceeding.
"""
proceed = TerminalHelper.prompt_for_execution(
system_exit_on_terminate=False,
prompt_message=f"""Some suborganizations already exist in our DB.
If you cancel, the rest of the script will still execute but these records will not update.
==Proposed Changes==
The following suborgs will be updated: {[org.name for org in orgs_to_update]}
""",
prompt_title="Do you wish to modify existing suborganizations?",
)
if proceed:
for org in orgs_to_update:
org.portfolio = portfolio
Suborganization.objects.bulk_update(orgs_to_update, ["portfolio"])
message = f"Updated {len(orgs_to_update)} suborganizations."
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
def handle_portfolio_requests(self, portfolio: Portfolio, federal_agency: FederalAgency): def handle_portfolio_requests(self, portfolio: Portfolio, federal_agency: FederalAgency):
""" """
Associate portfolio with domain requests for a federal agency. Associate portfolio with domain requests for a federal agency.
@ -208,12 +211,17 @@ class Command(BaseCommand):
DomainRequest.DomainRequestStatus.INELIGIBLE, DomainRequest.DomainRequestStatus.INELIGIBLE,
DomainRequest.DomainRequestStatus.REJECTED, DomainRequest.DomainRequestStatus.REJECTED,
] ]
domain_requests = DomainRequest.objects.filter(federal_agency=federal_agency).exclude(status__in=invalid_states) domain_requests = DomainRequest.objects.filter(federal_agency=federal_agency, portfolio__isnull=True).exclude(
status__in=invalid_states
)
if not domain_requests.exists(): if not domain_requests.exists():
message = f""" message = f"""
Portfolios not added to domain requests: no valid records found. Portfolio '{portfolio}' not added to domain requests: no valid records found.
This means that a filter on DomainInformation for the federal_agency '{federal_agency}' returned no results. This means that a filter on DomainInformation for the federal_agency '{federal_agency}' returned no results.
Excluded statuses: STARTED, INELIGIBLE, REJECTED. Excluded statuses: STARTED, INELIGIBLE, REJECTED.
Filter info: DomainRequest.objects.filter(federal_agency=federal_agency, portfolio__isnull=True).exclude(
status__in=invalid_states
)
""" """
TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message) TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message)
return None return None
@ -224,6 +232,7 @@ class Command(BaseCommand):
domain_request.portfolio = portfolio domain_request.portfolio = portfolio
if domain_request.organization_name in suborgs: if domain_request.organization_name in suborgs:
domain_request.sub_organization = suborgs.get(domain_request.organization_name) domain_request.sub_organization = suborgs.get(domain_request.organization_name)
self.updated_portfolios.add(portfolio)
DomainRequest.objects.bulk_update(domain_requests, ["portfolio", "sub_organization"]) DomainRequest.objects.bulk_update(domain_requests, ["portfolio", "sub_organization"])
message = f"Added portfolio '{portfolio}' to {len(domain_requests)} domain requests." message = f"Added portfolio '{portfolio}' to {len(domain_requests)} domain requests."
@ -234,11 +243,12 @@ class Command(BaseCommand):
Associate portfolio with domains for a federal agency. Associate portfolio with domains for a federal agency.
Updates all relevant domain information records. Updates all relevant domain information records.
""" """
domain_infos = DomainInformation.objects.filter(federal_agency=federal_agency) domain_infos = DomainInformation.objects.filter(federal_agency=federal_agency, portfolio__isnull=True)
if not domain_infos.exists(): if not domain_infos.exists():
message = f""" message = f"""
Portfolios not added to domains: no valid records found. Portfolio '{portfolio}' not added to domains: no valid records found.
This means that a filter on DomainInformation for the federal_agency '{federal_agency}' returned no results. The filter on DomainInformation for the federal_agency '{federal_agency}' returned no results.
Filter info: DomainInformation.objects.filter(federal_agency=federal_agency, portfolio__isnull=True)
""" """
TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message) TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message)
return None return None
@ -251,5 +261,5 @@ class Command(BaseCommand):
domain_info.sub_organization = suborgs.get(domain_info.organization_name) domain_info.sub_organization = suborgs.get(domain_info.organization_name)
DomainInformation.objects.bulk_update(domain_infos, ["portfolio", "sub_organization"]) DomainInformation.objects.bulk_update(domain_infos, ["portfolio", "sub_organization"])
message = f"Added portfolio '{portfolio}' to {len(domain_infos)} domains" message = f"Added portfolio '{portfolio}' to {len(domain_infos)} domains."
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message) TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message)

View file

@ -192,7 +192,7 @@ class PopulateScriptTemplate(ABC):
class TerminalHelper: class TerminalHelper:
@staticmethod @staticmethod
def log_script_run_summary( def log_script_run_summary(
to_update, failed_to_update, skipped, debug: bool, log_header=None, display_as_str=False to_update, failed_to_update, skipped, debug: bool, log_header=None, skipped_header=None, display_as_str=False
): ):
"""Prints success, failed, and skipped counts, as well as """Prints success, failed, and skipped counts, as well as
all affected objects.""" all affected objects."""
@ -203,8 +203,21 @@ class TerminalHelper:
if log_header is None: if log_header is None:
log_header = "============= FINISHED ===============" log_header = "============= FINISHED ==============="
if skipped_header is None:
skipped_header = "----- SOME DATA WAS INVALID (NEEDS MANUAL PATCHING) -----"
# Give the user the option to see failed / skipped records if any exist.
display_detailed_logs = False
if not debug and update_failed_count > 0 or update_skipped_count > 0:
display_detailed_logs = TerminalHelper.prompt_for_execution(
system_exit_on_terminate=False,
prompt_message=f"You will see {update_failed_count} failed and {update_skipped_count} skipped records.",
verify_message="** Some records were skipped, or some failed to update. **",
prompt_title="Do you wish to see the full list of failed, skipped and updated records?",
)
# Prepare debug messages # Prepare debug messages
if debug: if debug or display_detailed_logs:
updated_display = [str(u) for u in to_update] if display_as_str else to_update updated_display = [str(u) for u in to_update] if display_as_str else to_update
skipped_display = [str(s) for s in skipped] if display_as_str else skipped skipped_display = [str(s) for s in skipped] if display_as_str else skipped
failed_display = [str(f) for f in failed_to_update] if display_as_str else failed_to_update failed_display = [str(f) for f in failed_to_update] if display_as_str else failed_to_update
@ -217,7 +230,7 @@ class TerminalHelper:
# Print out a list of everything that was changed, if we have any changes to log. # Print out a list of everything that was changed, if we have any changes to log.
# Otherwise, don't print anything. # Otherwise, don't print anything.
TerminalHelper.print_conditional( TerminalHelper.print_conditional(
debug, True,
f"{debug_messages.get('success') if update_success_count > 0 else ''}" f"{debug_messages.get('success') if update_success_count > 0 else ''}"
f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}" f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}"
f"{debug_messages.get('failed') if update_failed_count > 0 else ''}", f"{debug_messages.get('failed') if update_failed_count > 0 else ''}",
@ -236,7 +249,7 @@ class TerminalHelper:
f"""{TerminalColors.YELLOW} f"""{TerminalColors.YELLOW}
{log_header} {log_header}
Updated {update_success_count} entries Updated {update_success_count} entries
----- SOME DATA WAS INVALID (NEEDS MANUAL PATCHING) ----- {skipped_header}
Skipped updating {update_skipped_count} entries Skipped updating {update_skipped_count} entries
{TerminalColors.ENDC} {TerminalColors.ENDC}
""" """
@ -368,7 +381,9 @@ class TerminalHelper:
logger.info(print_statement) logger.info(print_statement)
@staticmethod @staticmethod
def prompt_for_execution(system_exit_on_terminate: bool, prompt_message: str, prompt_title: str) -> bool: def prompt_for_execution(
system_exit_on_terminate: bool, prompt_message: str, prompt_title: str, verify_message=None
) -> bool:
"""Create to reduce code complexity. """Create to reduce code complexity.
Prompts the user to inspect the given string Prompts the user to inspect the given string
and asks if they wish to proceed. and asks if they wish to proceed.
@ -380,6 +395,9 @@ class TerminalHelper:
if system_exit_on_terminate: if system_exit_on_terminate:
action_description_for_selecting_no = "exit" action_description_for_selecting_no = "exit"
if verify_message is None:
verify_message = "*** IMPORTANT: VERIFY THE FOLLOWING LOOKS CORRECT ***"
# Allow the user to inspect the command string # Allow the user to inspect the command string
# and ask if they wish to proceed # and ask if they wish to proceed
proceed_execution = TerminalHelper.query_yes_no_exit( proceed_execution = TerminalHelper.query_yes_no_exit(
@ -387,7 +405,7 @@ class TerminalHelper:
===================================================== =====================================================
{prompt_title} {prompt_title}
===================================================== =====================================================
*** IMPORTANT: VERIFY THE FOLLOWING LOOKS CORRECT *** {verify_message}
{prompt_message} {prompt_message}
{TerminalColors.FAIL} {TerminalColors.FAIL}

View file

@ -1421,10 +1421,41 @@ class TestCreateFederalPortfolio(TestCase):
def setUp(self): def setUp(self):
self.mock_client = MockSESClient() self.mock_client = MockSESClient()
self.user = User.objects.create(username="testuser") self.user = User.objects.create(username="testuser")
# Create an agency wih no federal type (can only be created via specifiying it manually)
self.federal_agency = FederalAgency.objects.create(agency="Test Federal Agency") self.federal_agency = FederalAgency.objects.create(agency="Test Federal Agency")
# And create some with federal_type ones with creative names
self.executive_agency_1 = FederalAgency.objects.create(
agency="Executive Agency 1", federal_type=BranchChoices.EXECUTIVE
)
self.executive_agency_2 = FederalAgency.objects.create(
agency="Executive Agency 2", federal_type=BranchChoices.EXECUTIVE
)
self.executive_agency_3 = FederalAgency.objects.create(
agency="Executive Agency 3", federal_type=BranchChoices.EXECUTIVE
)
self.legislative_agency_1 = FederalAgency.objects.create(
agency="Legislative Agency 1", federal_type=BranchChoices.LEGISLATIVE
)
self.legislative_agency_2 = FederalAgency.objects.create(
agency="Legislative Agency 2", federal_type=BranchChoices.LEGISLATIVE
)
self.judicial_agency_1 = FederalAgency.objects.create(
agency="Judicial Agency 1", federal_type=BranchChoices.JUDICIAL
)
self.judicial_agency_2 = FederalAgency.objects.create(
agency="Judicial Agency 2", federal_type=BranchChoices.JUDICIAL
)
self.senior_official = SeniorOfficial.objects.create( self.senior_official = SeniorOfficial.objects.create(
first_name="first", last_name="last", email="testuser@igorville.gov", federal_agency=self.federal_agency first_name="first", last_name="last", email="testuser@igorville.gov", federal_agency=self.federal_agency
) )
self.executive_so_1 = SeniorOfficial.objects.create(
first_name="first", last_name="last", email="apple@igorville.gov", federal_agency=self.executive_agency_1
)
self.executive_so_2 = SeniorOfficial.objects.create(
first_name="first", last_name="last", email="mango@igorville.gov", federal_agency=self.executive_agency_2
)
with boto3_mocking.clients.handler_for("sesv2", self.mock_client): with boto3_mocking.clients.handler_for("sesv2", self.mock_client):
self.domain_request = completed_domain_request( self.domain_request = completed_domain_request(
status=DomainRequest.DomainRequestStatus.IN_REVIEW, status=DomainRequest.DomainRequestStatus.IN_REVIEW,
@ -1436,7 +1467,7 @@ class TestCreateFederalPortfolio(TestCase):
self.domain_info = DomainInformation.objects.filter(domain_request=self.domain_request).get() self.domain_info = DomainInformation.objects.filter(domain_request=self.domain_request).get()
self.domain_request_2 = completed_domain_request( self.domain_request_2 = completed_domain_request(
name="sock@igorville.org", name="icecreamforigorville.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW, status=DomainRequest.DomainRequestStatus.IN_REVIEW,
generic_org_type=DomainRequest.OrganizationChoices.CITY, generic_org_type=DomainRequest.OrganizationChoices.CITY,
federal_agency=self.federal_agency, federal_agency=self.federal_agency,
@ -1446,6 +1477,28 @@ class TestCreateFederalPortfolio(TestCase):
self.domain_request_2.approve() self.domain_request_2.approve()
self.domain_info_2 = DomainInformation.objects.filter(domain_request=self.domain_request_2).get() self.domain_info_2 = DomainInformation.objects.filter(domain_request=self.domain_request_2).get()
self.domain_request_3 = completed_domain_request(
name="exec_1.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
generic_org_type=DomainRequest.OrganizationChoices.FEDERAL,
federal_agency=self.executive_agency_1,
user=self.user,
organization_name="Executive Agency 1",
)
self.domain_request_3.approve()
self.domain_info_3 = self.domain_request_3.DomainRequest_info
self.domain_request_4 = completed_domain_request(
name="exec_2.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
generic_org_type=DomainRequest.OrganizationChoices.FEDERAL,
federal_agency=self.executive_agency_2,
user=self.user,
organization_name="Executive Agency 2",
)
self.domain_request_4.approve()
self.domain_info_4 = self.domain_request_4.DomainRequest_info
def tearDown(self): def tearDown(self):
DomainInformation.objects.all().delete() DomainInformation.objects.all().delete()
DomainRequest.objects.all().delete() DomainRequest.objects.all().delete()
@ -1456,18 +1509,16 @@ class TestCreateFederalPortfolio(TestCase):
User.objects.all().delete() User.objects.all().delete()
@less_console_noise_decorator @less_console_noise_decorator
def run_create_federal_portfolio(self, agency_name, parse_requests=False, parse_domains=False): def run_create_federal_portfolio(self, **kwargs):
with patch( with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", "registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit",
return_value=True, return_value=True,
): ):
call_command( call_command("create_federal_portfolio", **kwargs)
"create_federal_portfolio", agency_name, parse_requests=parse_requests, parse_domains=parse_domains
)
def test_create_or_modify_portfolio(self): def test_create_single_portfolio(self):
"""Test portfolio creation and modification with suborg and senior official.""" """Test portfolio creation with suborg and senior official."""
self.run_create_federal_portfolio("Test Federal Agency", parse_requests=True) self.run_create_federal_portfolio(agency_name="Test Federal Agency", parse_requests=True)
portfolio = Portfolio.objects.get(federal_agency=self.federal_agency) portfolio = Portfolio.objects.get(federal_agency=self.federal_agency)
self.assertEqual(portfolio.organization_name, self.federal_agency.agency) self.assertEqual(portfolio.organization_name, self.federal_agency.agency)
@ -1483,9 +1534,125 @@ class TestCreateFederalPortfolio(TestCase):
# Test the senior official # Test the senior official
self.assertEqual(portfolio.senior_official, self.senior_official) self.assertEqual(portfolio.senior_official, self.senior_official)
def test_create_multiple_portfolios_for_branch_judicial(self):
"""Tests creating all portfolios under a given branch"""
federal_choice = DomainRequest.OrganizationChoices.FEDERAL
expected_portfolio_names = {
self.judicial_agency_1.agency,
self.judicial_agency_2.agency,
}
self.run_create_federal_portfolio(branch="judicial", parse_requests=True, parse_domains=True)
# Ensure that all the portfolios we expect to get created were created
portfolios = Portfolio.objects.all()
self.assertEqual(portfolios.count(), 2)
# Test that all created portfolios have the correct values
org_names, org_types, creators, notes = [], [], [], []
for portfolio in portfolios:
org_names.append(portfolio.organization_name)
org_types.append(portfolio.organization_type)
creators.append(portfolio.creator)
notes.append(portfolio.notes)
# Test organization_name, organization_type, creator, and notes (in that order)
self.assertTrue(all([org_name in expected_portfolio_names for org_name in org_names]))
self.assertTrue(all([org_type == federal_choice for org_type in org_types]))
self.assertTrue(all([creator == User.get_default_user() for creator in creators]))
self.assertTrue(all([note == "Auto-generated record" for note in notes]))
def test_create_multiple_portfolios_for_branch_legislative(self):
"""Tests creating all portfolios under a given branch"""
federal_choice = DomainRequest.OrganizationChoices.FEDERAL
expected_portfolio_names = {
self.legislative_agency_1.agency,
self.legislative_agency_2.agency,
}
self.run_create_federal_portfolio(branch="legislative", parse_requests=True, parse_domains=True)
# Ensure that all the portfolios we expect to get created were created
portfolios = Portfolio.objects.all()
self.assertEqual(portfolios.count(), 2)
# Test that all created portfolios have the correct values
org_names, org_types, creators, notes = [], [], [], []
for portfolio in portfolios:
org_names.append(portfolio.organization_name)
org_types.append(portfolio.organization_type)
creators.append(portfolio.creator)
notes.append(portfolio.notes)
# Test organization_name, organization_type, creator, and notes (in that order)
self.assertTrue(all([org_name in expected_portfolio_names for org_name in org_names]))
self.assertTrue(all([org_type == federal_choice for org_type in org_types]))
self.assertTrue(all([creator == User.get_default_user() for creator in creators]))
self.assertTrue(all([note == "Auto-generated record" for note in notes]))
def test_create_multiple_portfolios_for_branch_executive(self):
"""Tests creating all portfolios under a given branch"""
federal_choice = DomainRequest.OrganizationChoices.FEDERAL
# == Test creating executive portfolios == #
expected_portfolio_names = {
self.executive_agency_1.agency,
self.executive_agency_2.agency,
self.executive_agency_3.agency,
}
self.run_create_federal_portfolio(branch="executive", parse_requests=True, parse_domains=True)
# Ensure that all the portfolios we expect to get created were created
portfolios = Portfolio.objects.all()
self.assertEqual(portfolios.count(), 3)
# Test that all created portfolios have the correct values
org_names, org_types, creators, notes, senior_officials = [], [], [], [], []
for portfolio in portfolios:
org_names.append(portfolio.organization_name)
org_types.append(portfolio.organization_type)
creators.append(portfolio.creator)
notes.append(portfolio.notes)
senior_officials.append(portfolio.senior_official)
# Test organization_name, organization_type, creator, and notes (in that order)
self.assertTrue(all([org_name in expected_portfolio_names for org_name in org_names]))
self.assertTrue(all([org_type == federal_choice for org_type in org_types]))
self.assertTrue(all([creator == User.get_default_user() for creator in creators]))
self.assertTrue(all([note == "Auto-generated record" for note in notes]))
# Test senior officials were assigned correctly
expected_senior_officials = {
self.executive_so_1,
self.executive_so_2,
# We expect one record to skip
None,
}
self.assertTrue(all([senior_official in expected_senior_officials for senior_official in senior_officials]))
# Test that domain requests / domains were assigned correctly
self.domain_request_3.refresh_from_db()
self.domain_request_4.refresh_from_db()
self.domain_info_3.refresh_from_db()
self.domain_info_4.refresh_from_db()
expected_requests = DomainRequest.objects.filter(
portfolio__id__in=[
# Implicity tests for existence
self.domain_request_3.portfolio.id,
self.domain_request_4.portfolio.id,
]
)
expected_domain_infos = DomainInformation.objects.filter(
portfolio__id__in=[
# Implicity tests for existence
self.domain_info_3.portfolio.id,
self.domain_info_4.portfolio.id,
]
)
self.assertEqual(expected_requests.count(), 2)
self.assertEqual(expected_domain_infos.count(), 2)
def test_handle_portfolio_requests(self): def test_handle_portfolio_requests(self):
"""Verify portfolio association with domain requests.""" """Verify portfolio association with domain requests."""
self.run_create_federal_portfolio("Test Federal Agency", parse_requests=True) self.run_create_federal_portfolio(agency_name="Test Federal Agency", parse_requests=True)
self.domain_request.refresh_from_db() self.domain_request.refresh_from_db()
self.assertIsNotNone(self.domain_request.portfolio) self.assertIsNotNone(self.domain_request.portfolio)
@ -1494,7 +1661,7 @@ class TestCreateFederalPortfolio(TestCase):
def test_handle_portfolio_domains(self): def test_handle_portfolio_domains(self):
"""Check portfolio association with domain information.""" """Check portfolio association with domain information."""
self.run_create_federal_portfolio("Test Federal Agency", parse_domains=True) self.run_create_federal_portfolio(agency_name="Test Federal Agency", parse_domains=True)
self.domain_info.refresh_from_db() self.domain_info.refresh_from_db()
self.assertIsNotNone(self.domain_info.portfolio) self.assertIsNotNone(self.domain_info.portfolio)
@ -1503,7 +1670,7 @@ class TestCreateFederalPortfolio(TestCase):
def test_handle_parse_both(self): def test_handle_parse_both(self):
"""Ensure correct parsing of both requests and domains.""" """Ensure correct parsing of both requests and domains."""
self.run_create_federal_portfolio("Test Federal Agency", parse_requests=True, parse_domains=True) self.run_create_federal_portfolio(agency_name="Test Federal Agency", parse_requests=True, parse_domains=True)
self.domain_request.refresh_from_db() self.domain_request.refresh_from_db()
self.domain_info.refresh_from_db() self.domain_info.refresh_from_db()
@ -1511,12 +1678,26 @@ class TestCreateFederalPortfolio(TestCase):
self.assertIsNotNone(self.domain_info.portfolio) self.assertIsNotNone(self.domain_info.portfolio)
self.assertEqual(self.domain_request.portfolio, self.domain_info.portfolio) self.assertEqual(self.domain_request.portfolio, self.domain_info.portfolio)
def test_command_error_no_parse_options(self): def test_command_error_parse_options(self):
"""Verify error when no parse options are provided.""" """Verify error when bad parse options are provided."""
# The command should enforce either --branch or --agency_name
with self.assertRaisesRegex(CommandError, "Error: one of the arguments --agency_name --branch is required"):
self.run_create_federal_portfolio()
# We should forbid both at the same time
with self.assertRaisesRegex(CommandError, "Error: argument --branch: not allowed with argument --agency_name"):
self.run_create_federal_portfolio(agency_name="test", branch="executive")
# We expect a error to be thrown when we dont pass parse requests or domains
with self.assertRaisesRegex( with self.assertRaisesRegex(
CommandError, "You must specify at least one of --parse_requests or --parse_domains." CommandError, "You must specify at least one of --parse_requests or --parse_domains."
): ):
self.run_create_federal_portfolio("Test Federal Agency") self.run_create_federal_portfolio(branch="executive")
with self.assertRaisesRegex(
CommandError, "You must specify at least one of --parse_requests or --parse_domains."
):
self.run_create_federal_portfolio(agency_name="test")
def test_command_error_agency_not_found(self): def test_command_error_agency_not_found(self):
"""Check error handling for non-existent agency.""" """Check error handling for non-existent agency."""
@ -1524,11 +1705,11 @@ class TestCreateFederalPortfolio(TestCase):
"Cannot find the federal agency 'Non-existent Agency' in our database. " "Cannot find the federal agency 'Non-existent Agency' in our database. "
"The value you enter for `agency_name` must be prepopulated in the FederalAgency table before proceeding." "The value you enter for `agency_name` must be prepopulated in the FederalAgency table before proceeding."
) )
with self.assertRaisesRegex(ValueError, expected_message): with self.assertRaisesRegex(CommandError, expected_message):
self.run_create_federal_portfolio("Non-existent Agency", parse_requests=True) self.run_create_federal_portfolio(agency_name="Non-existent Agency", parse_requests=True)
def test_update_existing_portfolio(self): def test_does_not_update_existing_portfolio(self):
"""Test updating an existing portfolio.""" """Tests that an existing portfolio is not updated"""
# Create an existing portfolio # Create an existing portfolio
existing_portfolio = Portfolio.objects.create( existing_portfolio = Portfolio.objects.create(
federal_agency=self.federal_agency, federal_agency=self.federal_agency,
@ -1538,12 +1719,15 @@ class TestCreateFederalPortfolio(TestCase):
notes="Old notes", notes="Old notes",
) )
self.run_create_federal_portfolio("Test Federal Agency", parse_requests=True) self.run_create_federal_portfolio(agency_name="Test Federal Agency", parse_requests=True)
existing_portfolio.refresh_from_db() existing_portfolio.refresh_from_db()
self.assertEqual(existing_portfolio.organization_name, self.federal_agency.agency) # SANITY CHECK: if the portfolio updates, it will change to FEDERAL.
self.assertEqual(existing_portfolio.organization_type, DomainRequest.OrganizationChoices.FEDERAL) # if this case fails, it means we are overriding data (and not simply just other weirdness)
self.assertNotEqual(existing_portfolio.organization_type, DomainRequest.OrganizationChoices.FEDERAL)
# Notes and creator should be untouched # Notes and creator should be untouched
self.assertEqual(existing_portfolio.organization_type, DomainRequest.OrganizationChoices.CITY)
self.assertEqual(existing_portfolio.organization_name, self.federal_agency.agency)
self.assertEqual(existing_portfolio.notes, "Old notes") self.assertEqual(existing_portfolio.notes, "Old notes")
self.assertEqual(existing_portfolio.creator, self.user) self.assertEqual(existing_portfolio.creator, self.user)