Merge branch 'main' into za/3662-uat-testing-issues

This commit is contained in:
zandercymatics 2025-03-27 10:47:41 -06:00
commit beedd54e60
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
6 changed files with 549 additions and 551 deletions

View file

@ -894,32 +894,32 @@ Example: `cf ssh getgov-za`
#### Step 5: Running the script
To create a specific portfolio:
```./manage.py create_federal_portfolio --agency_name "{federal_agency_name}" --both```
```./manage.py create_federal_portfolio --agency_name "{federal_agency_name}" --parse_domains --parse_requests --parse_managers```
Example (only requests): `./manage.py create_federal_portfolio "AMTRAK" --parse_requests`
To create a portfolios for all federal agencies in a branch:
```./manage.py create_federal_portfolio --branch "{executive|legislative|judicial}" --both```
```./manage.py create_federal_portfolio --branch "{executive|legislative|judicial}" --parse_domains --parse_requests --parse_managers```
Example (only requests): `./manage.py create_federal_portfolio --branch "executive" --parse_requests`
### Running locally
#### Step 1: Running the script
```docker-compose exec app ./manage.py create_federal_portfolio --agency_name "{federal_agency_name}" --both```
```docker-compose exec app ./manage.py create_federal_portfolio --agency_name "{federal_agency_name}" --parse_domains```
##### Parameters
| | Parameter | Description |
|:-:|:---------------------------- |:-------------------------------------------------------------------------------------------|
| 1 | **agency_name** | Name of the FederalAgency record surrounded by quotes. For instance,"AMTRAK". |
| 2 | **branch** | Creates a portfolio for each federal agency in a branch: executive, legislative, judicial |
| 3 | **both** | If True, runs parse_requests and parse_domains. |
| 4 | **parse_requests** | If True, then the created portfolio is added to all related DomainRequests. |
| 5 | **parse_domains** | If True, then the created portfolio is added to all related Domains. |
| 6 | **add_managers** | If True, then the created portfolio will add all managers of the portfolio domains as members of the portfolio, including invited managers. |
| 7 | **skip_existing_portfolios** | If True, then the script will only create suborganizations, modify DomainRequest, and modify DomainInformation records only when creating a new portfolio. Use this flag when you do not want to modify existing records. |
| 3 | **parse_requests** | If True, then the created portfolio is added to all related DomainRequests. |
| 4 | **parse_domains** | If True, then the created portfolio is added to all related Domains. |
| 5 | **parse_members** | If True, then the created portfolio will add all managers of the portfolio domains as members of the portfolio, including invited managers. |
| 6 | **skip_existing_portfolios** | If True, then the script will only create suborganizations, modify DomainRequest, and modify DomainInformation records only when creating a new portfolio. Use this flag when you do not want to modify existing records. |
| 7 | **Debug** | Increases log verbosity |
- Parameters #1-#2: Either `--agency_name` or `--branch` must be specified. Not both.
- Parameters #2-#3, you cannot use `--both` while using these. You must specify either `--parse_requests` or `--parse_domains` seperately. While all of these parameters are optional in that you do not need to specify all of them,
you must specify at least one to run this script.
- Parameters #3-#5, while all of these parameters are optional in that you do not need to specify all of them,
you must specify at least one to run this script. You can also chain all of them together.
## Patch suborganizations

View file

@ -3,14 +3,14 @@
import argparse
import logging
from django.core.management import BaseCommand, CommandError
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from registrar.management.commands.utility.terminal_helper import ScriptDataHelper, TerminalColors, TerminalHelper
from registrar.models import DomainInformation, DomainRequest, FederalAgency, Suborganization, Portfolio, User
from registrar.models.domain import Domain
from registrar.models.domain_invitation import DomainInvitation
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user_domain_role import UserDomainRole
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.generic_helper import normalize_string
from registrar.models.utility.generic_helper import count_capitals, normalize_string
from django.db.models import F, Q
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
@ -22,16 +22,56 @@ logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Creates a federal portfolio given a FederalAgency name"
class ChangeTracker:
def __init__(self, model_class):
self.model_class = model_class
self.create = []
self.update = []
self.skip = []
self.fail = []
def print_script_run_summary(self, no_changes_message, **kwargs):
"""Helper function that runs TerminalHelper.log_script_run_summary on this object."""
if self.has_changes():
TerminalHelper.log_script_run_summary(self.create, self.update, self.skip, self.fail, **kwargs)
else:
logger.info(f"{TerminalColors.BOLD}{no_changes_message}{TerminalColors.ENDC}")
def has_changes(self) -> bool:
changes = [self.create, self.update, self.skip, self.fail]
return any([change for change in changes if change])
def bulk_create(self):
try:
res = ScriptDataHelper.bulk_create_fields(
self.model_class, self.create, return_created=True, quiet=True
)
self.create = res
return res
except Exception as err:
# In this case, just swap the fail and add lists
self.fail = self.create.copy()
self.create.clear()
raise err
def bulk_update(self, fields_to_update):
try:
ScriptDataHelper.bulk_update_fields(self.model_class, self.update, fields_to_update, quiet=True)
except Exception as err:
# In this case, just swap the fail and update lists
self.fail = self.update.copy()
self.update.clear()
raise err
def __init__(self, *args, **kwargs):
"""Defines fields to track what portfolios were updated, skipped, or just outright failed."""
super().__init__(*args, **kwargs)
self.updated_portfolios = set()
self.skipped_portfolios = set()
self.failed_portfolios = set()
self.added_managers = set()
self.added_invitations = set()
self.skipped_invitations = set()
self.failed_managers = set()
self.portfolio_changes = self.ChangeTracker(model_class=Portfolio)
self.suborganization_changes = self.ChangeTracker(model_class=Suborganization)
self.domain_info_changes = self.ChangeTracker(model_class=DomainInformation)
self.domain_request_changes = self.ChangeTracker(model_class=DomainRequest)
self.user_portfolio_perm_changes = self.ChangeTracker(model_class=UserPortfolioPermission)
self.portfolio_invitation_changes = self.ChangeTracker(model_class=PortfolioInvitation)
def add_arguments(self, parser):
"""Add command line arguments to create federal portfolios.
@ -44,14 +84,11 @@ class Command(BaseCommand):
Required (at least one):
--parse_requests: Add the created portfolio(s) to related DomainRequest records
--parse_domains: Add the created portfolio(s) to related DomainInformation records
Note: You can use both --parse_requests and --parse_domains together
Optional (mutually exclusive with parse options):
--both: Shorthand for using both --parse_requests and --parse_domains
Cannot be used with --parse_requests or --parse_domains
--parse_managers: Add all domain managers of the portfolio's domains to the organization.
Optional:
--add_managers: Add all domain managers of the portfolio's domains to the organization.
--skip_existing_portfolios: Does not perform substeps on a portfolio if it already exists.
--debug: Increases log verbosity
"""
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
@ -74,19 +111,19 @@ class Command(BaseCommand):
help="Adds portfolio to DomainInformation",
)
parser.add_argument(
"--both",
action=argparse.BooleanOptionalAction,
help="Adds portfolio to both requests and domains",
)
parser.add_argument(
"--add_managers",
"--parse_managers",
action=argparse.BooleanOptionalAction,
help="Add all domain managers of the portfolio's domains to the organization.",
)
parser.add_argument(
"--skip_existing_portfolios",
action=argparse.BooleanOptionalAction,
help="Only add suborganizations to newly created portfolios, skip existing ones.",
help="Only parses newly created portfolios, skippubg existing ones.",
)
parser.add_argument(
"--debug",
action=argparse.BooleanOptionalAction,
help="Shows additional log info.",
)
def handle(self, **options): # noqa: C901
@ -94,22 +131,20 @@ class Command(BaseCommand):
branch = options.get("branch")
parse_requests = options.get("parse_requests")
parse_domains = options.get("parse_domains")
both = options.get("both")
add_managers = options.get("add_managers")
parse_managers = options.get("parse_managers")
skip_existing_portfolios = options.get("skip_existing_portfolios")
debug = options.get("debug")
if not both:
if not (parse_requests or parse_domains or add_managers):
raise CommandError(
"You must specify at least one of --parse_requests, --parse_domains, or --add_managers."
)
else:
if parse_requests or parse_domains:
raise CommandError("You cannot pass --parse_requests or --parse_domains when passing --both.")
# Parse script params
if not (parse_requests or parse_domains or parse_managers):
raise CommandError(
"You must specify at least one of --parse_requests, --parse_domains, or --parse_managers."
)
# Get agencies
federal_agency_filter = {"agency__iexact": agency_name} if agency_name else {"federal_type": branch}
agencies = FederalAgency.objects.filter(**federal_agency_filter)
if not agencies or agencies.count() < 1:
agencies = FederalAgency.objects.filter(agency__isnull=False, **federal_agency_filter).distinct()
if not agencies.exists():
if agency_name:
raise CommandError(
f"Cannot find the federal agency '{agency_name}' in our database. "
@ -118,421 +153,207 @@ class Command(BaseCommand):
)
else:
raise CommandError(f"Cannot find '{branch}' federal agencies in our database.")
portfolios = []
for federal_agency in agencies:
message = f"Processing federal agency '{federal_agency.agency}'..."
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
try:
# C901 'Command.handle' is too complex (12)
portfolio = self.handle_populate_portfolio(
federal_agency, parse_domains, parse_requests, both, skip_existing_portfolios
)
portfolios.append(portfolio)
if add_managers:
self.add_managers_to_portfolio(portfolio)
except Exception as exec:
self.failed_portfolios.add(federal_agency)
logger.error(exec)
message = f"Failed to create portfolio '{federal_agency.agency}'"
TerminalHelper.colorful_logger(logger.info, TerminalColors.FAIL, message)
# POST PROCESS STEP: Add additional suborg info where applicable.
updated_suborg_count = self.post_process_all_suborganization_fields(agencies)
message = f"Added city and state_territory information to {updated_suborg_count} suborgs."
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
TerminalHelper.log_script_run_summary(
self.updated_portfolios,
self.failed_portfolios,
self.skipped_portfolios,
debug=False,
log_header="============= FINISHED HANDLE PORTFOLIO STEP ===============",
# Store all portfolios and agencies in a dict to avoid extra db calls
existing_portfolios = Portfolio.objects.filter(
organization_name__in=agencies.values_list("agency", flat=True), organization_name__isnull=False
)
existing_portfolios_dict = {normalize_string(p.organization_name): p for p in existing_portfolios}
agencies_dict = {normalize_string(agency.agency): agency for agency in agencies}
# NOTE: exceptions to portfolio and suborg are intentionally uncaught.
# parse domains, requests, and managers all rely on these fields to function.
# An error here means everything down the line is compromised.
# The individual parse steps, however, are independent from eachother.
# == Handle portfolios == #
# Loop through every agency we want to add and create a portfolio if the record is new.
for federal_agency in agencies_dict.values():
norm_agency_name = normalize_string(federal_agency.agency)
portfolio = existing_portfolios_dict.get(norm_agency_name, None)
if portfolio is None:
portfolio = Portfolio(
organization_name=federal_agency.agency,
federal_agency=federal_agency,
organization_type=DomainRequest.OrganizationChoices.FEDERAL,
creator=User.get_default_user(),
notes="Auto-generated record",
senior_official=federal_agency.so_federal_agency.first(),
)
self.portfolio_changes.create.append(portfolio)
logger.info(f"{TerminalColors.OKGREEN}Created portfolio '{portfolio}'.{TerminalColors.ENDC}")
elif skip_existing_portfolios:
message = f"Portfolio '{portfolio}' already exists. Skipped."
logger.info(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
self.portfolio_changes.skip.append(portfolio)
# Create portfolios
self.portfolio_changes.bulk_create()
# After create, get the list of all portfolios to use
portfolios_to_use = set(self.portfolio_changes.create)
if not skip_existing_portfolios:
portfolios_to_use.update(set(existing_portfolios))
portfolios_to_use_dict = {normalize_string(p.organization_name): p for p in portfolios_to_use}
# == Handle suborganizations == #
created_suborgs = self.create_suborganizations(portfolios_to_use_dict, agencies_dict)
if created_suborgs:
self.suborganization_changes.create.extend(created_suborgs.values())
self.suborganization_changes.bulk_create()
# == Handle domains and requests == #
for portfolio_org_name, portfolio in portfolios_to_use_dict.items():
federal_agency = agencies_dict.get(portfolio_org_name)
suborgs = portfolio.portfolio_suborganizations.in_bulk(field_name="name")
if parse_domains:
updated_domains = self.update_domains(portfolio, federal_agency, suborgs, debug)
self.domain_info_changes.update.extend(updated_domains)
if parse_requests:
updated_domain_requests = self.update_requests(portfolio, federal_agency, suborgs, debug)
self.domain_request_changes.update.extend(updated_domain_requests)
# Update DomainInformation
try:
self.domain_info_changes.bulk_update(["portfolio", "sub_organization"])
except Exception as err:
logger.error(f"{TerminalColors.FAIL}Could not bulk update domain infos.{TerminalColors.ENDC}")
logger.error(err, exc_info=True)
# Update DomainRequest
try:
self.domain_request_changes.bulk_update(
[
"portfolio",
"sub_organization",
"requested_suborganization",
"suborganization_city",
"suborganization_state_territory",
"federal_agency",
]
)
except Exception as err:
logger.error(f"{TerminalColors.FAIL}Could not bulk update domain requests.{TerminalColors.ENDC}")
logger.error(err, exc_info=True)
# == Handle managers (no bulk_create) == #
if parse_managers:
domain_infos = DomainInformation.objects.filter(portfolio__in=portfolios_to_use)
domains = Domain.objects.filter(domain_info__in=domain_infos)
# Create UserPortfolioPermission
self.create_user_portfolio_permissions(domains)
# Create PortfolioInvitation
self.create_portfolio_invitations(domains)
# == PRINT RUN SUMMARY == #
self.print_final_run_summary(parse_domains, parse_requests, parse_managers, debug)
def print_final_run_summary(self, parse_domains, parse_requests, parse_managers, debug):
self.portfolio_changes.print_script_run_summary(
no_changes_message="||============= No portfolios changed. =============||",
log_header="============= PORTFOLIOS =============",
skipped_header="----- SOME PORTFOLIOS WERENT CREATED (BUT OTHER RECORDS ARE STILL PROCESSED) -----",
display_as_str=True,
)
if add_managers:
TerminalHelper.log_script_run_summary(
self.added_managers,
self.failed_managers,
[], # can't skip managers, can only add or fail
log_header="----- MANAGERS ADDED -----",
debug=False,
display_as_str=True,
)
TerminalHelper.log_script_run_summary(
self.added_invitations,
[],
self.skipped_invitations,
log_header="----- INVITATIONS ADDED -----",
debug=False,
skipped_header="----- INVITATIONS SKIPPED (ALREADY EXISTED) -----",
display_as_str=True,
)
# POST PROCESSING STEP: Remove the federal agency if it matches the portfolio name.
# We only do this for started domain requests.
if parse_requests or both:
prompt_message = (
"This action will update domain requests even if they aren't on a portfolio."
"\nNOTE: This will modify domain requests, even if no portfolios were created."
"\nIn the event no portfolios *are* created, then this step will target "
"the existing portfolios with your given params."
"\nThis step is entirely optional, and is just for extra data cleanup."
)
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
prompt_message=prompt_message,
prompt_title=(
"POST PROCESS STEP: Do you want to clear federal agency on (related) started domain requests?"
),
verify_message="*** THIS STEP IS OPTIONAL ***",
)
self.post_process_started_domain_requests(agencies, portfolios)
def add_managers_to_portfolio(self, portfolio: Portfolio):
"""
Add all domain managers of the portfolio's domains to the organization.
This includes adding them to the correct group and creating portfolio invitations.
"""
logger.info(f"Adding managers for portfolio {portfolio}")
# Fetch all domains associated with the portfolio
domains = Domain.objects.filter(domain_info__portfolio=portfolio)
domain_managers: set[UserDomainRole] = set()
# Fetch all users with manager roles for the domains
# select_related means that a db query will not be occur when you do user_domain_role.user
# Its similar to a set or dict in that it costs slightly more upfront in exchange for perf later
user_domain_roles = UserDomainRole.objects.select_related("user").filter(
domain__in=domains, role=UserDomainRole.Roles.MANAGER
)
domain_managers.update(user_domain_roles)
invited_managers: set[str] = set()
# Get the emails of invited managers
domain_invitations = DomainInvitation.objects.filter(
domain__in=domains, status=DomainInvitation.DomainInvitationStatus.INVITED
).values_list("email", flat=True)
invited_managers.update(domain_invitations)
for user_domain_role in domain_managers:
try:
# manager is a user id
user = user_domain_role.user
_, created = UserPortfolioPermission.objects.get_or_create(
portfolio=portfolio,
user=user,
defaults={"roles": [UserPortfolioRoleChoices.ORGANIZATION_MEMBER]},
)
self.added_managers.add(user)
if created:
logger.info(f"Added manager '{user}' to portfolio '{portfolio}'")
else:
logger.info(f"Manager '{user}' already exists in portfolio '{portfolio}'")
except User.DoesNotExist:
self.failed_managers.add(user)
logger.debug(f"User '{user}' does not exist")
for email in invited_managers:
self.create_portfolio_invitation(portfolio, email)
def create_portfolio_invitation(self, portfolio: Portfolio, email: str):
"""
Create a portfolio invitation for the given email.
"""
_, created = PortfolioInvitation.objects.get_or_create(
portfolio=portfolio,
email=email,
defaults={
"status": PortfolioInvitation.PortfolioInvitationStatus.INVITED,
"roles": [UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
},
)
if created:
self.added_invitations.add(email)
logger.info(f"Created portfolio invitation for '{email}' to portfolio '{portfolio}'")
else:
self.skipped_invitations.add(email)
logger.info(f"Found existing portfolio invitation for '{email}' to portfolio '{portfolio}'")
def post_process_started_domain_requests(self, agencies, portfolios):
"""
Removes duplicate organization data by clearing federal_agency when it matches the portfolio name.
Only processes domain requests in STARTED status.
"""
message = "Removing duplicate portfolio and federal_agency values from domain requests..."
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
# For each request, clear the federal agency under these conditions:
# 1. A portfolio *already exists* with the same name as the federal agency.
# 2. Said portfolio (or portfolios) are only the ones specified at the start of the script.
# 3. The domain request is in status "started".
# Note: Both names are normalized so excess spaces are stripped and the string is lowercased.
domain_requests_to_update = DomainRequest.objects.filter(
federal_agency__in=agencies,
federal_agency__agency__isnull=False,
status=DomainRequest.DomainRequestStatus.STARTED,
organization_name__isnull=False,
)
if domain_requests_to_update.count() == 0:
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, "No domain requests to update.")
return
portfolio_set = {normalize_string(portfolio.organization_name) for portfolio in portfolios if portfolio}
# Update the request, assuming the given agency name matches the portfolio name
updated_requests = []
for req in domain_requests_to_update:
agency_name = normalize_string(req.federal_agency.agency)
if agency_name in portfolio_set:
req.federal_agency = None
updated_requests.append(req)
# Execute the update and Log the results
if TerminalHelper.prompt_for_execution(
system_exit_on_terminate=False,
prompt_message=(
f"{len(domain_requests_to_update)} domain requests will be updated. "
f"These records will be changed: {[str(req) for req in updated_requests]}"
detailed_prompt_title=(
"PORTFOLIOS: Do you wish to see the full list of failed, skipped and updated records?"
),
prompt_title="Do you wish to commit this update to the database?",
):
DomainRequest.objects.bulk_update(updated_requests, ["federal_agency"])
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKBLUE, "Action completed successfully.")
def handle_populate_portfolio(self, federal_agency, parse_domains, parse_requests, both, skip_existing_portfolios):
"""Attempts to create a portfolio. If successful, this function will
also create new suborganizations"""
portfolio, created = self.create_portfolio(federal_agency)
if skip_existing_portfolios and not created:
TerminalHelper.colorful_logger(
logger.warning,
TerminalColors.YELLOW,
"Skipping modifications to suborgs, domain requests, and "
"domains due to the --skip_existing_portfolios flag. Portfolio already exists.",
)
return portfolio
self.create_suborganizations(portfolio, federal_agency)
if parse_domains or both:
self.handle_portfolio_domains(portfolio, federal_agency)
if parse_requests or both:
self.handle_portfolio_requests(portfolio, federal_agency)
return portfolio
def create_portfolio(self, federal_agency):
"""Creates a portfolio if it doesn't presently exist.
Returns portfolio, created."""
# Get the org name / senior official
org_name = federal_agency.agency
so = federal_agency.so_federal_agency.first() if federal_agency.so_federal_agency.exists() else None
# First just try to get an existing portfolio
portfolio = Portfolio.objects.filter(organization_name=org_name).first()
if portfolio:
self.skipped_portfolios.add(portfolio)
TerminalHelper.colorful_logger(
logger.info,
TerminalColors.YELLOW,
f"Portfolio with organization name '{org_name}' already exists. Skipping create.",
)
return portfolio, False
# Create new portfolio if it doesn't exist
portfolio = Portfolio.objects.create(
organization_name=org_name,
federal_agency=federal_agency,
organization_type=DomainRequest.OrganizationChoices.FEDERAL,
creator=User.get_default_user(),
notes="Auto-generated record",
senior_official=so,
display_as_str=True,
debug=debug,
)
self.suborganization_changes.print_script_run_summary(
no_changes_message="||============= No suborganizations changed. =============||",
log_header="============= SUBORGANIZATIONS =============",
skipped_header="----- SUBORGANIZATIONS SKIPPED (SAME NAME AS PORTFOLIO NAME) -----",
detailed_prompt_title=(
"SUBORGANIZATIONS: Do you wish to see the full list of failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
)
self.updated_portfolios.add(portfolio)
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, f"Created portfolio '{portfolio}'")
# Log if the senior official was added or not.
if portfolio.senior_official:
message = f"Added senior official '{portfolio.senior_official}'"
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message)
else:
message = (
f"No senior official added to portfolio '{org_name}'. "
"None was returned for the reverse relation `FederalAgency.so_federal_agency.first()`"
if parse_domains:
self.domain_info_changes.print_script_run_summary(
no_changes_message="||============= No domains changed. =============||",
log_header="============= DOMAINS =============",
detailed_prompt_title=(
"DOMAINS: Do you wish to see the full list of failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
)
TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message)
return portfolio, True
if parse_requests:
self.domain_request_changes.print_script_run_summary(
no_changes_message="||============= No domain requests changed. =============||",
log_header="============= DOMAIN REQUESTS =============",
detailed_prompt_title=(
"DOMAIN REQUESTS: Do you wish to see the full list of failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
)
def create_suborganizations(self, portfolio: Portfolio, federal_agency: FederalAgency):
if parse_managers:
self.user_portfolio_perm_changes.print_script_run_summary(
no_changes_message="||============= No managers changed. =============||",
log_header="============= MANAGERS =============",
skipped_header="----- MANAGERS SKIPPED (ALREADY EXISTED) -----",
detailed_prompt_title=(
"MANAGERS: Do you wish to see the full list of failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
)
self.portfolio_invitation_changes.print_script_run_summary(
no_changes_message="||============= No manager invitations changed. =============||",
log_header="============= MANAGER INVITATIONS =============",
skipped_header="----- INVITATIONS SKIPPED (ALREADY EXISTED) -----",
detailed_prompt_title=(
"MANAGER INVITATIONS: Do you wish to see the full list of failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
)
def create_suborganizations(self, portfolio_dict, agency_dict):
"""Create Suborganizations tied to the given portfolio based on DomainInformation objects"""
valid_agencies = DomainInformation.objects.filter(
federal_agency=federal_agency, organization_name__isnull=False
)
org_names = set(valid_agencies.values_list("organization_name", flat=True))
if not org_names:
message = (
"Could not add any suborganizations."
f"\nNo suborganizations were found for '{federal_agency}' when filtering on this name, "
"and excluding null organization_name records."
)
TerminalHelper.colorful_logger(logger.warning, TerminalColors.FAIL, message)
return
created_suborgs = {}
# Check for existing suborgs on the current portfolio
existing_suborgs = Suborganization.objects.filter(name__in=org_names, name__isnull=False)
if existing_suborgs.exists():
message = f"Some suborganizations already exist for portfolio '{portfolio}'."
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKBLUE, message)
portfolios = portfolio_dict.values()
agencies = agency_dict.values()
# Create new suborgs, as long as they don't exist in the db already
new_suborgs = []
for name in org_names - set(existing_suborgs.values_list("name", flat=True)):
if normalize_string(name) == normalize_string(portfolio.organization_name):
# You can use this to populate location information, when this occurs.
# However, this isn't needed for now so we can skip it.
message = (
f"Skipping suborganization create on record '{name}'. "
"The federal agency name is the same as the portfolio name."
)
TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, message)
else:
new_suborgs.append(Suborganization(name=name, portfolio=portfolio)) # type: ignore
if new_suborgs:
Suborganization.objects.bulk_create(new_suborgs)
TerminalHelper.colorful_logger(
logger.info, TerminalColors.OKGREEN, f"Added {len(new_suborgs)} suborganizations"
)
else:
TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, "No suborganizations added")
def handle_portfolio_requests(self, portfolio: Portfolio, federal_agency: FederalAgency):
"""
Associate portfolio with domain requests for a federal agency.
Updates all relevant domain request records.
"""
invalid_states = [
DomainRequest.DomainRequestStatus.STARTED,
DomainRequest.DomainRequestStatus.INELIGIBLE,
DomainRequest.DomainRequestStatus.REJECTED,
]
domain_requests = DomainRequest.objects.filter(federal_agency=federal_agency).exclude(status__in=invalid_states)
if not domain_requests.exists():
message = f"""
Portfolio '{portfolio}' not added to domain requests: no valid records found.
This means that a filter on DomainInformation for the federal_agency '{federal_agency}' returned no results.
Excluded statuses: STARTED, INELIGIBLE, REJECTED.
Filter info: DomainRequest.objects.filter(federal_agency=federal_agency).exclude(
status__in=invalid_states
)
"""
TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message)
return None
# Get all suborg information and store it in a dict to avoid doing a db call
suborgs = Suborganization.objects.filter(portfolio=portfolio).in_bulk(field_name="name")
for domain_request in domain_requests:
# Set the portfolio
domain_request.portfolio = portfolio
# Set suborg info
domain_request.sub_organization = suborgs.get(domain_request.organization_name, None)
if domain_request.sub_organization is None:
domain_request.requested_suborganization = normalize_string(
domain_request.organization_name, lowercase=False
)
domain_request.suborganization_city = normalize_string(domain_request.city, lowercase=False)
domain_request.suborganization_state_territory = domain_request.state_territory
self.updated_portfolios.add(portfolio)
DomainRequest.objects.bulk_update(
domain_requests,
[
"portfolio",
"sub_organization",
"requested_suborganization",
"suborganization_city",
"suborganization_state_territory",
],
)
message = f"Added portfolio '{portfolio}' to {len(domain_requests)} domain requests."
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message)
def handle_portfolio_domains(self, portfolio: Portfolio, federal_agency: FederalAgency):
"""
Associate portfolio with domains for a federal agency.
Updates all relevant domain information records.
Returns a queryset of DomainInformation objects, or None if nothing changed.
"""
domain_infos = DomainInformation.objects.filter(federal_agency=federal_agency)
if not domain_infos.exists():
message = f"""
Portfolio '{portfolio}' not added to domains: no valid records found.
The filter on DomainInformation for the federal_agency '{federal_agency}' returned no results.
Filter info: DomainInformation.objects.filter(federal_agency=federal_agency)
"""
TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message)
return None
# Get all suborg information and store it in a dict to avoid doing a db call
suborgs = Suborganization.objects.filter(portfolio=portfolio).in_bulk(field_name="name")
for domain_info in domain_infos:
domain_info.portfolio = portfolio
domain_info.sub_organization = suborgs.get(domain_info.organization_name, None)
DomainInformation.objects.bulk_update(domain_infos, ["portfolio", "sub_organization"])
message = f"Added portfolio '{portfolio}' to {len(domain_infos)} domains."
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message)
def post_process_all_suborganization_fields(self, agencies):
"""Batch updates suborganization locations from domain and request data.
Args:
agencies: List of FederalAgency objects to process
Returns:
int: Number of suborganizations updated
Priority for location data:
1. Domain information
2. Domain request suborganization fields
3. Domain request standard fields
"""
# Common filter between domaininformation / domain request.
# Filter by only the agencies we've updated thus far.
# Then, only process records without null portfolio, org name, or suborg name.
base_filter = Q(
federal_agency__in=agencies,
portfolio__isnull=False,
organization_name__isnull=False,
sub_organization__isnull=False,
) & ~Q(organization_name__iexact=F("portfolio__organization_name"))
# First: Remove null city / state_territory values on domain info / domain requests.
# We want to add city data if there is data to add to begin with!
domains = DomainInformation.objects.filter(
base_filter,
Q(city__isnull=False, state_territory__isnull=False),
# Org name must not be null, and must not be the portfolio name
Q(
organization_name__isnull=False,
)
& ~Q(organization_name__iexact=F("portfolio__organization_name")),
# Only get relevant data to the agency/portfolio we are targeting
Q(federal_agency__in=agencies) | Q(portfolio__in=portfolios),
)
requests = DomainRequest.objects.filter(
base_filter,
(
Q(city__isnull=False, state_territory__isnull=False)
| Q(suborganization_city__isnull=False, suborganization_state_territory__isnull=False)
),
# Org name must not be null, and must not be the portfolio name
Q(
organization_name__isnull=False,
)
& ~Q(organization_name__iexact=F("portfolio__organization_name")),
# Only get relevant data to the agency/portfolio we are targeting
Q(federal_agency__in=agencies) | Q(portfolio__in=portfolios),
)
# First: get all existing suborgs
# NOTE: .all() is a heavy query, but unavoidable as we need to check for duplicate names.
# This is not quite as heavy as just using a for loop and .get_or_create, but worth noting.
# Change this if you can find a way to avoid doing this.
# This won't scale great for 10k+ records.
existing_suborgs = Suborganization.objects.all()
suborg_dict = {normalize_string(org.name): org for org in existing_suborgs}
# Second: Group domains and requests by normalized organization name.
# This means that later down the line we have to account for "duplicate" org names.
domains_dict = {}
requests_dict = {}
for domain in domains:
@ -543,40 +364,60 @@ class Command(BaseCommand):
normalized_name = normalize_string(request.organization_name)
requests_dict.setdefault(normalized_name, []).append(request)
# Third: Get suborganizations to update
suborgs_to_edit = Suborganization.objects.filter(
Q(id__in=domains.values_list("sub_organization", flat=True))
| Q(id__in=requests.values_list("sub_organization", flat=True))
)
# Third: Parse through each group of domains that have the same organization names,
# then create *one* suborg record from it.
# Normalize all suborg names so we don't add duplicate data unintentionally.
for portfolio_name, portfolio in portfolio_dict.items():
# For a given agency, find all domains that list suborg info for it.
for norm_org_name, domains in domains_dict.items():
# Don't add the record if the suborg name would equal the portfolio name
if norm_org_name == portfolio_name:
continue
# Fourth: Process each suborg to add city / state territory info
for suborg in suborgs_to_edit:
self.post_process_suborganization_fields(suborg, domains_dict, requests_dict)
new_suborg_name = None
if len(domains) == 1:
new_suborg_name = normalize_string(domains[0].organization_name, lowercase=False)
elif len(domains) > 1:
# Pick the best record for a suborg name (fewest spaces, most leading capitals)
best_record = max(
domains,
key=lambda rank: (
-domain.organization_name.count(" "),
count_capitals(domain.organization_name, leading_only=True),
),
)
new_suborg_name = normalize_string(best_record.organization_name, lowercase=False)
# Fifth: Perform a bulk update
return Suborganization.objects.bulk_update(suborgs_to_edit, ["city", "state_territory"])
# If the suborg already exists, don't add it again.
if norm_org_name not in suborg_dict and norm_org_name not in created_suborgs:
requests = requests_dict.get(norm_org_name)
suborg = Suborganization(name=new_suborg_name, portfolio=portfolio)
self.set_suborganization_location(suborg, domains, requests)
created_suborgs[norm_org_name] = suborg
return created_suborgs
def post_process_suborganization_fields(self, suborg, domains_dict, requests_dict):
def set_suborganization_location(self, suborg, domains, requests):
"""Updates a single suborganization's location data if valid.
Args:
suborg: Suborganization to update
domains_dict: Dict of domain info records grouped by org name
requests_dict: Dict of domain requests grouped by org name
domains: omain info records grouped by org name
requests: domain requests grouped by org name
Priority matches parent method. Updates are skipped if location data conflicts
Updates are skipped if location data conflicts
between multiple records of the same type.
"""
normalized_suborg_name = normalize_string(suborg.name)
domains = domains_dict.get(normalized_suborg_name, [])
requests = requests_dict.get(normalized_suborg_name, [])
# Try to get matching domain info
domain = None
if domains:
reference = domains[0]
use_location_for_domain = all(
d.city == reference.city and d.state_territory == reference.state_territory for d in domains
d.city
and d.state_territory
and d.city == reference.city
and d.state_territory == reference.state_territory
for d in domains
)
if use_location_for_domain:
domain = reference
@ -608,7 +449,7 @@ class Command(BaseCommand):
if not domain and not request:
message = f"Skipping adding city / state_territory information to suborg: {suborg}. Bad data."
TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, message)
logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
return
# PRIORITY:
@ -625,8 +466,106 @@ class Command(BaseCommand):
suborg.city = normalize_string(request.city, lowercase=False)
suborg.state_territory = request.state_territory
message = (
f"Added city/state_territory to suborg: {suborg}. "
f"city - {suborg.city}, state - {suborg.state_territory}"
def update_domains(self, portfolio, federal_agency, suborgs, debug):
"""
Associate portfolio with domains for a federal agency.
Updates all relevant domain information records.
Returns a queryset of DomainInformation objects, or None if nothing changed.
"""
updated_domains = set()
domain_infos = federal_agency.domaininformation_set.all()
for domain_info in domain_infos:
org_name = normalize_string(domain_info.organization_name, lowercase=False)
domain_info.portfolio = portfolio
domain_info.sub_organization = suborgs.get(org_name, None)
updated_domains.add(domain_info)
if not updated_domains and debug:
message = f"Portfolio '{portfolio}' not added to domains: nothing to add found."
logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
return updated_domains
def update_requests(
self,
portfolio,
federal_agency,
suborgs,
debug,
):
"""
Associate portfolio with domain requests for a federal agency.
Updates all relevant domain request records.
"""
updated_domain_requests = set()
invalid_states = [
DomainRequest.DomainRequestStatus.INELIGIBLE,
DomainRequest.DomainRequestStatus.REJECTED,
]
domain_requests = federal_agency.domainrequest_set.exclude(status__in=invalid_states)
# Add portfolio, sub_org, requested_suborg, suborg_city, and suborg_state_territory.
# For started domain requests, set the federal agency to None if not on a portfolio.
for domain_request in domain_requests:
if domain_request.status != DomainRequest.DomainRequestStatus.STARTED:
org_name = normalize_string(domain_request.organization_name, lowercase=False)
domain_request.portfolio = portfolio
domain_request.sub_organization = suborgs.get(org_name, None)
if domain_request.sub_organization is None:
domain_request.requested_suborganization = normalize_string(
domain_request.organization_name, lowercase=False
)
domain_request.suborganization_city = normalize_string(domain_request.city, lowercase=False)
domain_request.suborganization_state_territory = domain_request.state_territory
else:
# Clear the federal agency for started domain requests
agency_name = normalize_string(domain_request.federal_agency.agency)
portfolio_name = normalize_string(portfolio.organization_name)
if agency_name == portfolio_name:
domain_request.federal_agency = None
logger.info(f"Set federal agency on started domain request '{domain_request}' to None.")
updated_domain_requests.add(domain_request)
if not updated_domain_requests and debug:
message = f"Portfolio '{portfolio}' not added to domain requests: nothing to add found."
logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
return updated_domain_requests
def create_user_portfolio_permissions(self, domains):
user_domain_roles = UserDomainRole.objects.select_related(
"user", "domain", "domain__domain_info", "domain__domain_info__portfolio"
).filter(domain__in=domains, domain__domain_info__portfolio__isnull=False, role=UserDomainRole.Roles.MANAGER)
for user_domain_role in user_domain_roles:
user = user_domain_role.user
permission, created = UserPortfolioPermission.objects.get_or_create(
portfolio=user_domain_role.domain.domain_info.portfolio,
user=user,
defaults={"roles": [UserPortfolioRoleChoices.ORGANIZATION_MEMBER]},
)
if created:
self.user_portfolio_perm_changes.create.append(permission)
else:
self.user_portfolio_perm_changes.skip.append(permission)
def create_portfolio_invitations(self, domains):
domain_invitations = DomainInvitation.objects.select_related(
"domain", "domain__domain_info", "domain__domain_info__portfolio"
).filter(
domain__in=domains,
domain__domain_info__portfolio__isnull=False,
status=DomainInvitation.DomainInvitationStatus.INVITED,
)
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
for domain_invitation in domain_invitations:
email = normalize_string(domain_invitation.email)
invitation, created = PortfolioInvitation.objects.get_or_create(
portfolio=domain_invitation.domain.domain_info.portfolio,
email=email,
status=PortfolioInvitation.PortfolioInvitationStatus.INVITED,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
if created:
self.portfolio_invitation_changes.create.append(invitation)
else:
self.portfolio_invitation_changes.skip.append(invitation)

View file

@ -51,7 +51,7 @@ class Command(BaseCommand):
ScriptDataHelper.bulk_update_fields(Domain, self.to_update, ["first_ready"])
# Log what happened
TerminalHelper.log_script_run_summary(self.to_update, self.failed_to_update, self.skipped, debug)
TerminalHelper.log_script_run_summary(self.to_update, self.failed_to_update, self.skipped, [], debug=debug)
def update_first_ready_for_domain(self, domain: Domain, debug: bool):
"""Grabs the created_at field and associates it with the first_ready column.

View file

@ -144,7 +144,12 @@ class Command(BaseCommand):
# Log what happened
log_header = "============= FINISHED UPDATE FOR DOMAINREQUEST ==============="
TerminalHelper.log_script_run_summary(
self.request_to_update, self.request_failed_to_update, self.request_skipped, True, log_header
self.request_to_update,
self.request_failed_to_update,
self.request_skipped,
[],
debug=True,
log_header=log_header,
)
update_skipped_count = len(self.request_to_update)
@ -195,7 +200,7 @@ class Command(BaseCommand):
# Log what happened
log_header = "============= FINISHED UPDATE FOR DOMAININFORMATION ==============="
TerminalHelper.log_script_run_summary(
self.di_to_update, self.di_failed_to_update, self.di_skipped, True, log_header
self.di_to_update, self.di_failed_to_update, self.di_skipped, [], debug=True, log_header=log_header
)
update_skipped_count = len(self.di_skipped)

View file

@ -32,7 +32,7 @@ class ScriptDataHelper:
"""Helper method with utilities to speed up development of scripts that do DB operations"""
@staticmethod
def bulk_update_fields(model_class, update_list, fields_to_update, batch_size=1000):
def bulk_update_fields(model_class, update_list, fields_to_update, batch_size=1000, quiet=False):
"""
This function performs a bulk update operation on a specified Django model class in batches.
It uses Django's Paginator to handle large datasets in a memory-efficient manner.
@ -51,9 +51,12 @@ class ScriptDataHelper:
fields_to_update: Specifies which fields to update.
Usage:
bulk_update_fields(Domain, page.object_list, ["first_ready"])
ScriptDataHelper.bulk_update_fields(Domain, page.object_list, ["first_ready"])
Returns: A queryset of the updated objets
"""
logger.info(f"{TerminalColors.YELLOW} Bulk updating fields... {TerminalColors.ENDC}")
if not quiet:
logger.info(f"{TerminalColors.YELLOW} Bulk updating fields... {TerminalColors.ENDC}")
# Create a Paginator object. Bulk_update on the full dataset
# is too memory intensive for our current app config, so we can chunk this data instead.
paginator = Paginator(update_list, batch_size)
@ -61,6 +64,41 @@ class ScriptDataHelper:
page = paginator.page(page_num)
model_class.objects.bulk_update(page.object_list, fields_to_update)
@staticmethod
def bulk_create_fields(model_class, update_list, batch_size=1000, return_created=False, quiet=False):
"""
This function performs a bulk create operation on a specified Django model class in batches.
It uses Django's Paginator to handle large datasets in a memory-efficient manner.
Parameters:
model_class: The Django model class that you want to perform the bulk update on.
This should be the actual class, not a string of the class name.
update_list: A list of model instances that you want to update. Each instance in the list
should already have the updated values set on the instance.
batch_size: The maximum number of model instances to update in a single database query.
Defaults to 1000. If you're dealing with models that have a large number of fields,
or large field values, you may need to decrease this value to prevent out-of-memory errors.
Usage:
ScriptDataHelper.bulk_add_fields(Domain, page.object_list)
Returns: A queryset of the added objects
"""
if not quiet:
logger.info(f"{TerminalColors.YELLOW} Bulk adding fields... {TerminalColors.ENDC}")
created_objs = []
paginator = Paginator(update_list, batch_size)
for page_num in paginator.page_range:
page = paginator.page(page_num)
all_created = model_class.objects.bulk_create(page.object_list)
if return_created:
created_objs.extend([created.id for created in all_created])
if return_created:
return model_class.objects.filter(id__in=created_objs)
return None
class PopulateScriptTemplate(ABC):
"""
@ -132,9 +170,7 @@ class PopulateScriptTemplate(ABC):
)
if verbose:
proposed_changes = f"""{proposed_changes}
These records will be updated: {list(records.all())}
"""
proposed_changes = f"{proposed_changes}\n" f"These records will be updated: {list(records.all())}"
# Code execution will stop here if the user prompts "N"
TerminalHelper.prompt_for_execution(
@ -170,6 +206,7 @@ class PopulateScriptTemplate(ABC):
to_update,
failed_to_update,
to_skip,
[],
debug=debug,
log_header=self.run_summary_header,
display_as_str=True,
@ -203,81 +240,96 @@ class PopulateScriptTemplate(ABC):
class TerminalHelper:
@staticmethod
def log_script_run_summary(
to_update, failed_to_update, skipped, debug: bool, log_header=None, skipped_header=None, display_as_str=False
create,
update,
skip,
fail,
debug: bool,
log_header="============= FINISHED =============",
skipped_header="----- SOME DATA WAS INVALID (NEEDS MANUAL PATCHING) -----",
failed_header="----- UPDATE FAILED -----",
display_as_str=False,
detailed_prompt_title="Do you wish to see the full list of failed, skipped and updated records?",
):
"""Prints success, failed, and skipped counts, as well as
all affected objects."""
update_success_count = len(to_update)
update_failed_count = len(failed_to_update)
update_skipped_count = len(skipped)
"""Generates a formatted summary of script execution results with colored output.
if log_header is None:
log_header = "============= FINISHED ==============="
Displays counts and details of successful, failed, and skipped operations.
In debug mode or when prompted, shows full record details.
Uses color coding: green for success, yellow for skipped, red for failures.
if skipped_header is None:
skipped_header = "----- SOME DATA WAS INVALID (NEEDS MANUAL PATCHING) -----"
Args:
to_update: Records that were successfully updated
failed_to_update: Records that failed to update
skipped: Records that were intentionally skipped
to_add: Records that were newly added
debug: If True, shows detailed record information
log_header: Custom header for the summary (default: "FINISHED")
skipped_header: Custom header for skipped records section
failed_header: Custom header for failed records section
display_as_str: If True, converts records to strings for display
Output Format (if count > 0 for each category):
[log_header]
Created W entries
Updated X entries
[skipped_header]
Skipped updating Y entries
[failed_header]
Failed to update Z entries
Debug output (if enabled):
- Directly prints each list for each category (add, update, etc)
- Converts each item to string if display_as_str is True
"""
counts = {
"created": len(create),
"updated": len(update),
"skipped": len(skip),
"failed": len(fail),
}
# Give the user the option to see failed / skipped records if any exist.
display_detailed_logs = False
if not debug and update_failed_count > 0 or update_skipped_count > 0:
if not debug and counts["failed"] > 0 or counts["skipped"] > 0:
display_detailed_logs = TerminalHelper.prompt_for_execution(
system_exit_on_terminate=False,
prompt_message=f"You will see {update_failed_count} failed and {update_skipped_count} skipped records.",
prompt_message=f'You will see {counts["failed"]} failed and {counts["skipped"]} skipped records.',
verify_message="** Some records were skipped, or some failed to update. **",
prompt_title="Do you wish to see the full list of failed, skipped and updated records?",
prompt_title=detailed_prompt_title,
)
# Prepare debug messages
if debug or display_detailed_logs:
updated_display = [str(u) for u in to_update] if display_as_str else to_update
skipped_display = [str(s) for s in skipped] if display_as_str else skipped
failed_display = [str(f) for f in failed_to_update] if display_as_str else failed_to_update
debug_messages = {
"success": (f"{TerminalColors.OKCYAN}Updated: {updated_display}{TerminalColors.ENDC}\n"),
"skipped": (f"{TerminalColors.YELLOW}Skipped: {skipped_display}{TerminalColors.ENDC}\n"),
"failed": (f"{TerminalColors.FAIL}Failed: {failed_display}{TerminalColors.ENDC}\n"),
}
non_zero_counts = {category: count for category, count in counts.items() if count > 0}
messages = []
for category, count in non_zero_counts.items():
match category:
case "created":
label, values, debug_color = "Created", create, TerminalColors.OKBLUE
case "updated":
label, values, debug_color = "Updated", update, TerminalColors.OKCYAN
case "skipped":
label, values, debug_color = "Skipped updating", skip, TerminalColors.YELLOW
messages.append(skipped_header)
case "failed":
label, values, debug_color = "Failed to update", fail, TerminalColors.FAIL
messages.append(failed_header)
messages.append(f"{label} {count} entries")
# Print out a list of everything that was changed, if we have any changes to log.
# Otherwise, don't print anything.
TerminalHelper.print_conditional(
True,
f"{debug_messages.get('success') if update_success_count > 0 else ''}"
f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}"
f"{debug_messages.get('failed') if update_failed_count > 0 else ''}",
)
# Print debug messages (prints the internal add, update, skip, fail lists)
if debug or display_detailed_logs:
display_values = [str(v) for v in values] if display_as_str else values
debug_message = f"{label}: {display_values}"
logger.info(f"{debug_color}{debug_message}{TerminalColors.ENDC}")
if update_failed_count == 0 and update_skipped_count == 0:
logger.info(
f"""{TerminalColors.OKGREEN}
{log_header}
Updated {update_success_count} entries
{TerminalColors.ENDC}
"""
)
elif update_failed_count == 0:
logger.warning(
f"""{TerminalColors.YELLOW}
{log_header}
Updated {update_success_count} entries
{skipped_header}
Skipped updating {update_skipped_count} entries
{TerminalColors.ENDC}
"""
)
final_message = f"\n{log_header}\n" + "\n".join(messages)
if counts["failed"] > 0:
logger.error(f"{TerminalColors.FAIL}{final_message}{TerminalColors.ENDC}")
elif counts["skipped"] > 0:
logger.warning(f"{TerminalColors.YELLOW}{final_message}{TerminalColors.ENDC}")
else:
logger.error(
f"""{TerminalColors.FAIL}
{log_header}
Updated {update_success_count} entries
----- UPDATE FAILED -----
Failed to update {update_failed_count} entries,
Skipped updating {update_skipped_count} entries
{TerminalColors.ENDC}
"""
)
logger.info(f"{TerminalColors.OKGREEN}{final_message}{TerminalColors.ENDC}")
@staticmethod
def query_yes_no(question: str, default="yes"):
@ -415,11 +467,11 @@ class TerminalHelper:
# and ask if they wish to proceed
proceed_execution = TerminalHelper.query_yes_no_exit(
f"\n{TerminalColors.OKCYAN}"
"====================================================="
f"\n{prompt_title}\n"
"====================================================="
f"\n{verify_message}\n"
f"\n{prompt_message}\n"
"=====================================================\n"
f"{prompt_title}\n"
"=====================================================\n"
f"{verify_message}\n"
f"{prompt_message}\n"
f"{TerminalColors.FAIL}"
f"Proceed? (Y = proceed, N = {action_description_for_selecting_no})"
f"{TerminalColors.ENDC}"

View file

@ -1474,6 +1474,7 @@ class TestCreateFederalPortfolio(TestCase):
generic_org_type=DomainRequest.OrganizationChoices.CITY,
federal_agency=self.federal_agency,
user=self.user,
organization_name="Testorg",
)
self.domain_request.approve()
self.domain_info = DomainInformation.objects.filter(domain_request=self.domain_request).get()
@ -1530,13 +1531,10 @@ class TestCreateFederalPortfolio(TestCase):
@less_console_noise_decorator
def test_post_process_started_domain_requests_existing_portfolio(self):
"""Ensures that federal agency is cleared when agency name matches portfolio name.
As the name implies, this implicitly tests the "post_process_started_domain_requests" function.
"""
"""Ensures that federal agency is cleared when agency name matches portfolio name."""
federal_agency_2 = FederalAgency.objects.create(agency="Sugarcane", federal_type=BranchChoices.EXECUTIVE)
# Test records with portfolios and no org names
# Create a portfolio. This script skips over "started"
portfolio = Portfolio.objects.create(organization_name="Sugarcane", creator=self.user)
# Create a domain request with matching org name
matching_request = completed_domain_request(
@ -1823,12 +1821,12 @@ class TestCreateFederalPortfolio(TestCase):
# We expect a error to be thrown when we dont pass parse requests or domains
with self.assertRaisesRegex(
CommandError, "You must specify at least one of --parse_requests, --parse_domains, or --add_managers."
CommandError, "You must specify at least one of --parse_requests, --parse_domains, or --parse_managers."
):
self.run_create_federal_portfolio(branch="executive")
with self.assertRaisesRegex(
CommandError, "You must specify at least one of --parse_requests, --parse_domains, or --add_managers."
CommandError, "You must specify at least one of --parse_requests, --parse_domains, or --parse_managers."
):
self.run_create_federal_portfolio(agency_name="test")
@ -1878,7 +1876,9 @@ class TestCreateFederalPortfolio(TestCase):
UserDomainRole.objects.create(user=manager2, domain=self.domain, role=UserDomainRole.Roles.MANAGER)
# Run the management command
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_domains=True, add_managers=True)
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency, parse_domains=True, parse_managers=True
)
# Check that the portfolio was created
self.portfolio = Portfolio.objects.get(federal_agency=self.federal_agency)
@ -1901,7 +1901,9 @@ class TestCreateFederalPortfolio(TestCase):
)
# Run the management command
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_domains=True, add_managers=True)
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency, parse_domains=True, parse_managers=True
)
# Check that the portfolio was created
self.portfolio = Portfolio.objects.get(federal_agency=self.federal_agency)
@ -1918,7 +1920,7 @@ class TestCreateFederalPortfolio(TestCase):
# Verify that no duplicate invitations are created
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True
agency_name=self.federal_agency.agency, parse_requests=True, parse_managers=True
)
invitations = PortfolioInvitation.objects.filter(email="manager1@example.com", portfolio=self.portfolio)
self.assertEqual(
@ -1946,7 +1948,7 @@ class TestCreateFederalPortfolio(TestCase):
# Run the management command
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True
agency_name=self.federal_agency.agency, parse_requests=True, parse_managers=True
)
# Ensure that the manager is not duplicated
@ -1994,13 +1996,13 @@ class TestCreateFederalPortfolio(TestCase):
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency,
parse_requests=True,
add_managers=True,
parse_managers=True,
skip_existing_portfolios=True,
)
# Check that managers were added to the portfolio
# Check that managers weren't added to the portfolio
permissions = UserPortfolioPermission.objects.filter(portfolio=self.portfolio, user__in=[manager1, manager2])
self.assertEqual(permissions.count(), 2)
self.assertEqual(permissions.count(), 0)
for perm in permissions:
self.assertIn(UserPortfolioRoleChoices.ORGANIZATION_MEMBER, perm.roles)