Merge branch 'main' of https://github.com/cisagov/manage.get.gov into cb/3212-subissues

This commit is contained in:
Erin Song 2025-04-01 23:35:10 -07:00
commit ca5dce2541
No known key found for this signature in database
44 changed files with 1747 additions and 639 deletions

View file

@ -894,32 +894,32 @@ Example: `cf ssh getgov-za`
#### Step 5: Running the script
To create a specific portfolio:
```./manage.py create_federal_portfolio --agency_name "{federal_agency_name}" --both```
```./manage.py create_federal_portfolio --agency_name "{federal_agency_name}" --parse_domains --parse_requests --parse_managers```
Example (only requests): `./manage.py create_federal_portfolio "AMTRAK" --parse_requests`
To create a portfolios for all federal agencies in a branch:
```./manage.py create_federal_portfolio --branch "{executive|legislative|judicial}" --both```
```./manage.py create_federal_portfolio --branch "{executive|legislative|judicial}" --parse_domains --parse_requests --parse_managers```
Example (only requests): `./manage.py create_federal_portfolio --branch "executive" --parse_requests`
### Running locally
#### Step 1: Running the script
```docker-compose exec app ./manage.py create_federal_portfolio --agency_name "{federal_agency_name}" --both```
```docker-compose exec app ./manage.py create_federal_portfolio --agency_name "{federal_agency_name}" --parse_domains```
##### Parameters
| | Parameter | Description |
|:-:|:---------------------------- |:-------------------------------------------------------------------------------------------|
| 1 | **agency_name** | Name of the FederalAgency record surrounded by quotes. For instance,"AMTRAK". |
| 2 | **branch** | Creates a portfolio for each federal agency in a branch: executive, legislative, judicial |
| 3 | **both** | If True, runs parse_requests and parse_domains. |
| 4 | **parse_requests** | If True, then the created portfolio is added to all related DomainRequests. |
| 5 | **parse_domains** | If True, then the created portfolio is added to all related Domains. |
| 6 | **add_managers** | If True, then the created portfolio will add all managers of the portfolio domains as members of the portfolio, including invited managers. |
| 7 | **skip_existing_portfolios** | If True, then the script will only create suborganizations, modify DomainRequest, and modify DomainInformation records only when creating a new portfolio. Use this flag when you do not want to modify existing records. |
| 3 | **parse_requests** | If True, then the created portfolio is added to all related DomainRequests. |
| 4 | **parse_domains** | If True, then the created portfolio is added to all related Domains. |
| 5 | **parse_members** | If True, then the created portfolio will add all managers of the portfolio domains as members of the portfolio, including invited managers. |
| 6 | **skip_existing_portfolios** | If True, then the script will only create suborganizations, modify DomainRequest, and modify DomainInformation records only when creating a new portfolio. Use this flag when you do not want to modify existing records. |
| 7 | **Debug** | Increases log verbosity |
- Parameters #1-#2: Either `--agency_name` or `--branch` must be specified. Not both.
- Parameters #2-#3, you cannot use `--both` while using these. You must specify either `--parse_requests` or `--parse_domains` seperately. While all of these parameters are optional in that you do not need to specify all of them,
you must specify at least one to run this script.
- Parameters #3-#5, while all of these parameters are optional in that you do not need to specify all of them,
you must specify at least one to run this script. You can also chain all of them together.
## Patch suborganizations

View file

@ -151,7 +151,7 @@ class EPPLibWrapper:
raise RegistryError(message) from err
else:
if response.code >= 2000:
raise RegistryError(response.msg, code=response.code)
raise RegistryError(response.msg, code=response.code, response=response)
else:
return response
@ -174,6 +174,8 @@ class EPPLibWrapper:
try:
return self._send(command)
except RegistryError as err:
if err.response:
logger.info(f"cltrid is {err.response.cl_tr_id} svtrid is {err.response.sv_tr_id}")
if (
err.is_transport_error()
or err.is_connection_error()

View file

@ -1,4 +1,4 @@
from enum import IntEnum
from enum import IntEnum, Enum
class ErrorCode(IntEnum):
@ -52,6 +52,10 @@ class ErrorCode(IntEnum):
SESSION_LIMIT_EXCEEDED_SERVER_CLOSING_CONNECTION = 2502
class RegistryErrorMessage(Enum):
REGISTRAR_NOT_LOGGED_IN = "Registrar is not logged in."
class RegistryError(Exception):
"""
Overview of registry response codes from RFC 5730. See RFC 5730 for full text.
@ -62,14 +66,21 @@ class RegistryError(Exception):
- 2501 - 2502 Something malicious or abusive may have occurred
"""
def __init__(self, *args, code=None, note="", **kwargs):
def __init__(self, *args, code=None, note="", response=None, **kwargs):
super().__init__(*args, **kwargs)
self.code = code
self.response = response
# note is a string that can be used to provide additional context
self.note = note
def should_retry(self):
return self.code == ErrorCode.COMMAND_FAILED
# COMMAND_USE_ERROR is returning with message, Registrar is not logged in,
# which can be recovered from with a retry
return self.code == ErrorCode.COMMAND_FAILED or (
self.code == ErrorCode.COMMAND_USE_ERROR
and self.response
and getattr(self.response, "msg", None) == RegistryErrorMessage.REGISTRAR_NOT_LOGGED_IN.value
)
def is_transport_error(self):
return self.code == ErrorCode.TRANSPORT_ERROR

View file

@ -264,6 +264,58 @@ class TestClient(TestCase):
# send() is called 5 times: send(login), send(command) fail, send(logout), send(login), send(command)
self.assertEquals(mock_send.call_count, 5)
@less_console_noise_decorator
@patch("epplibwrapper.client.Client")
@patch("epplibwrapper.client.logger")
def test_send_command_2002_failure_prompts_successful_retry(self, mock_logger, mock_client):
"""Test when the send("InfoDomainCommand) call fails with a 2002, prompting a retry
and the subsequent send("InfoDomainCommand) call succeeds
Flow:
Initialization succeeds
Send command fails (with 2002 code) prompting retry
Client closes and re-initializes, and command succeeds"""
# Mock the Client instance and its methods
# connect() and close() should succeed throughout
mock_connect = MagicMock()
mock_close = MagicMock()
# create success and failure result messages
send_command_success_result = self.fake_result(1000, "Command completed successfully")
send_command_failure_result = self.fake_result(2002, "Registrar is not logged in.")
# side_effect for send call, initial send(login) succeeds during initialization, next send(command)
# fails, subsequent sends (logout, login, command) all succeed
send_call_count = 0
# Create a mock command
mock_command = MagicMock()
mock_command.__class__.__name__ = "InfoDomainCommand"
def side_effect(*args, **kwargs):
nonlocal send_call_count
send_call_count += 1
if send_call_count == 2:
return send_command_failure_result
else:
return send_command_success_result
mock_send = MagicMock(side_effect=side_effect)
mock_client.return_value.connect = mock_connect
mock_client.return_value.close = mock_close
mock_client.return_value.send = mock_send
# Create EPPLibWrapper instance and initialize client
wrapper = EPPLibWrapper()
wrapper.send(mock_command, cleaned=True)
# connect() is called twice, once during initialization of app, once during retry
self.assertEquals(mock_connect.call_count, 2)
# close() is called once, during retry
mock_close.assert_called_once()
# send() is called 5 times: send(login), send(command) fail, send(logout), send(login), send(command)
self.assertEquals(mock_send.call_count, 5)
# Assertion proper logging; note that the
mock_logger.info.assert_any_call(
"InfoDomainCommand failed and will be retried Error: Registrar is not logged in."
)
mock_logger.info.assert_any_call("cltrid is cl_tr_id svtrid is sv_tr_id")
@less_console_noise_decorator
def fake_failure_send_concurrent_threads(self, command=None, cleaned=None):
"""

View file

@ -72,6 +72,7 @@ from django.contrib.admin.widgets import FilteredSelectMultiple
from django.utils.html import format_html
from django.utils.translation import gettext_lazy as _
logger = logging.getLogger(__name__)
@ -1261,6 +1262,13 @@ class HostIpAdmin(AuditedAdmin, ImportExportRegistrarModelAdmin):
resource_classes = [HostIpResource]
model = models.HostIP
search_fields = ["host__name", "address"]
search_help_text = "Search by host name or address."
list_display = (
"host",
"address",
)
class ContactResource(resources.ModelResource):
"""defines how each field in the referenced model should be mapped to the corresponding fields in the
@ -3211,9 +3219,9 @@ class DomainRequestAdmin(ListHeaderAdmin, ImportExportRegistrarModelAdmin):
Returns a tuple: (obj: DomainRequest, should_proceed: bool)
"""
should_proceed = True
error_message = None
domain_name = original_obj.requested_domain.name
# Get the method that should be run given the status
selected_method = self.get_status_method_mapping(obj)
@ -3235,6 +3243,17 @@ class DomainRequestAdmin(ListHeaderAdmin, ImportExportRegistrarModelAdmin):
# duplicated in the model and the error is raised from the model.
# This avoids an ugly Django error screen.
error_message = "This action is not permitted. The domain is already active."
elif (
original_obj.status != models.DomainRequest.DomainRequestStatus.APPROVED
and obj.status == models.DomainRequest.DomainRequestStatus.APPROVED
and original_obj.requested_domain is not None
and Domain.is_pending_delete(domain_name)
):
# 1. If the domain request is not approved in previous state (original status)
# 2. If the new status that's supposed to be triggered IS approved
# 3. That it's a valid domain
# 4. AND that the domain is currently in pendingDelete state
error_message = FSMDomainRequestError.get_error_message(FSMErrorCodes.DOMAIN_IS_PENDING_DELETE)
elif (
original_obj.status != models.DomainRequest.DomainRequestStatus.APPROVED
and obj.status == models.DomainRequest.DomainRequestStatus.APPROVED
@ -4609,6 +4628,10 @@ class PublicContactAdmin(ListHeaderAdmin, ImportExportRegistrarModelAdmin):
change_form_template = "django/admin/email_clipboard_change_form.html"
autocomplete_fields = ["domain"]
list_display = ("registry_id", "contact_type", "domain", "name")
search_fields = ["registry_id", "domain__name", "name"]
search_help_text = "Search by registry id, domain, or name."
list_filter = ("contact_type",)
def changeform_view(self, request, object_id=None, form_url="", extra_context=None):
if extra_context is None:

View file

@ -11,7 +11,7 @@ import { initDomainRequestsTable } from './table-domain-requests.js';
import { initMembersTable } from './table-members.js';
import { initMemberDomainsTable } from './table-member-domains.js';
import { initEditMemberDomainsTable } from './table-edit-member-domains.js';
import { initPortfolioNewMemberPageToggle, initAddNewMemberPageListeners, initPortfolioMemberPageRadio } from './portfolio-member-page.js';
import { initPortfolioNewMemberPageToggle, initAddNewMemberPageListeners, initPortfolioMemberPage } from './portfolio-member-page.js';
import { initDomainRequestForm } from './domain-request-form.js';
import { initDomainManagersPage } from './domain-managers.js';
import { initDomainDNSSEC } from './domain-dnssec.js';
@ -56,8 +56,10 @@ initDomainDNSSEC();
initFormErrorHandling();
// Init the portfolio member page
initPortfolioMemberPage();
// Init the portfolio new member page
initPortfolioMemberPageRadio();
initPortfolioNewMemberPageToggle();
initAddNewMemberPageListeners();

View file

@ -193,10 +193,14 @@ export function initAddNewMemberPageListeners() {
}
// Initalize the radio for the member pages
export function initPortfolioMemberPageRadio() {
export function initPortfolioMemberPage() {
document.addEventListener("DOMContentLoaded", () => {
let memberForm = document.getElementById("member_form");
let newMemberForm = document.getElementById("add_member_form")
let newMemberForm = document.getElementById("add_member_form");
let editSelfWarningModal = document.getElementById("toggle-member-permissions-edit-self");
let editSelfWarningModalConfirm = document.getElementById("member-permissions-edit-self");
// Init the radio
if (memberForm || newMemberForm) {
hookupRadioTogglerListener(
'role',
@ -206,5 +210,36 @@ export function initPortfolioMemberPageRadio() {
}
);
}
// Init the "edit self" warning modal, which triggers when the user is trying to edit themselves.
// The dom will include these elements when this occurs.
// NOTE: This logic does not trigger when the user is the ONLY admin in the portfolio.
// This is because info alerts are used rather than modals in this case.
if (memberForm && editSelfWarningModal) {
// Only show the warning modal when the user is changing their ROLE.
var canSubmit = document.querySelector(`input[name="role"]:checked`)?.value != "organization_member";
let radioButtons = document.querySelectorAll(`input[name="role"]`);
radioButtons.forEach(function (radioButton) {
radioButton.addEventListener("change", function() {
let selectedValue = radioButton.checked ? radioButton.value : null;
canSubmit = selectedValue != "organization_member";
});
});
// Prevent form submission assuming org member is selected for role, and open the modal.
memberForm.addEventListener("submit", function(e) {
if (!canSubmit) {
e.preventDefault();
editSelfWarningModal.click();
}
});
// Hook the confirm button on the modal to form submission.
editSelfWarningModalConfirm.addEventListener("click", function() {
canSubmit = true;
memberForm.submit();
});
}
});
}

View file

@ -203,6 +203,8 @@ MIDDLEWARE = [
"registrar.registrar_middleware.CheckPortfolioMiddleware",
# Restrict access using Opt-Out approach
"registrar.registrar_middleware.RestrictAccessMiddleware",
# Our own router logs that included user info to speed up log tracing time on stable
"registrar.registrar_middleware.RequestLoggingMiddleware",
]
# application object used by Django's built-in servers (e.g. `runserver`)

View file

@ -171,6 +171,7 @@ urlpatterns = [
path(
"admin/logout/",
RedirectView.as_view(pattern_name="logout", permanent=False),
name="logout",
),
path(
"admin/analytics/export_data_type/",

View file

@ -411,6 +411,27 @@ class PortfolioMemberForm(BasePortfolioMemberForm):
model = UserPortfolioPermission
fields = ["roles", "additional_permissions"]
def clean(self):
"""
Override of clean to ensure that the user isn't removing themselves
if they're the only portfolio admin
"""
super().clean()
role = self.cleaned_data.get("role")
if self.instance and hasattr(self.instance, "user") and hasattr(self.instance, "portfolio"):
if role and self.instance.user.is_only_admin_of_portfolio(self.instance.portfolio):
# This is how you associate a validation error to a particular field.
# The alternative is to do this in clean_role, but execution order matters.
raise forms.ValidationError(
{
"role": forms.ValidationError(
"You can't change your member access because you're "
"the only admin for this organization. "
"To change your access, you'll need to add another admin."
)
}
)
class PortfolioInvitedMemberForm(BasePortfolioMemberForm):
"""

View file

@ -3,14 +3,14 @@
import argparse
import logging
from django.core.management import BaseCommand, CommandError
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from registrar.management.commands.utility.terminal_helper import ScriptDataHelper, TerminalColors, TerminalHelper
from registrar.models import DomainInformation, DomainRequest, FederalAgency, Suborganization, Portfolio, User
from registrar.models.domain import Domain
from registrar.models.domain_invitation import DomainInvitation
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user_domain_role import UserDomainRole
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.generic_helper import normalize_string
from registrar.models.utility.generic_helper import count_capitals, normalize_string
from django.db.models import F, Q
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
@ -22,16 +22,56 @@ logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Creates a federal portfolio given a FederalAgency name"
class ChangeTracker:
def __init__(self, model_class):
self.model_class = model_class
self.create = []
self.update = []
self.skip = []
self.fail = []
def print_script_run_summary(self, no_changes_message, **kwargs):
"""Helper function that runs TerminalHelper.log_script_run_summary on this object."""
if self.has_changes():
TerminalHelper.log_script_run_summary(self.create, self.update, self.skip, self.fail, **kwargs)
else:
logger.info(f"{TerminalColors.BOLD}{no_changes_message}{TerminalColors.ENDC}")
def has_changes(self) -> bool:
changes = [self.create, self.update, self.skip, self.fail]
return any([change for change in changes if change])
def bulk_create(self):
try:
res = ScriptDataHelper.bulk_create_fields(
self.model_class, self.create, return_created=True, quiet=True
)
self.create = res
return res
except Exception as err:
# In this case, just swap the fail and add lists
self.fail = self.create.copy()
self.create.clear()
raise err
def bulk_update(self, fields_to_update):
try:
ScriptDataHelper.bulk_update_fields(self.model_class, self.update, fields_to_update, quiet=True)
except Exception as err:
# In this case, just swap the fail and update lists
self.fail = self.update.copy()
self.update.clear()
raise err
def __init__(self, *args, **kwargs):
"""Defines fields to track what portfolios were updated, skipped, or just outright failed."""
super().__init__(*args, **kwargs)
self.updated_portfolios = set()
self.skipped_portfolios = set()
self.failed_portfolios = set()
self.added_managers = set()
self.added_invitations = set()
self.skipped_invitations = set()
self.failed_managers = set()
self.portfolio_changes = self.ChangeTracker(model_class=Portfolio)
self.suborganization_changes = self.ChangeTracker(model_class=Suborganization)
self.domain_info_changes = self.ChangeTracker(model_class=DomainInformation)
self.domain_request_changes = self.ChangeTracker(model_class=DomainRequest)
self.user_portfolio_perm_changes = self.ChangeTracker(model_class=UserPortfolioPermission)
self.portfolio_invitation_changes = self.ChangeTracker(model_class=PortfolioInvitation)
def add_arguments(self, parser):
"""Add command line arguments to create federal portfolios.
@ -44,14 +84,11 @@ class Command(BaseCommand):
Required (at least one):
--parse_requests: Add the created portfolio(s) to related DomainRequest records
--parse_domains: Add the created portfolio(s) to related DomainInformation records
Note: You can use both --parse_requests and --parse_domains together
Optional (mutually exclusive with parse options):
--both: Shorthand for using both --parse_requests and --parse_domains
Cannot be used with --parse_requests or --parse_domains
--parse_managers: Add all domain managers of the portfolio's domains to the organization.
Optional:
--add_managers: Add all domain managers of the portfolio's domains to the organization.
--skip_existing_portfolios: Does not perform substeps on a portfolio if it already exists.
--debug: Increases log verbosity
"""
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
@ -74,19 +111,19 @@ class Command(BaseCommand):
help="Adds portfolio to DomainInformation",
)
parser.add_argument(
"--both",
action=argparse.BooleanOptionalAction,
help="Adds portfolio to both requests and domains",
)
parser.add_argument(
"--add_managers",
"--parse_managers",
action=argparse.BooleanOptionalAction,
help="Add all domain managers of the portfolio's domains to the organization.",
)
parser.add_argument(
"--skip_existing_portfolios",
action=argparse.BooleanOptionalAction,
help="Only add suborganizations to newly created portfolios, skip existing ones.",
help="Only parses newly created portfolios, skippubg existing ones.",
)
parser.add_argument(
"--debug",
action=argparse.BooleanOptionalAction,
help="Shows additional log info.",
)
def handle(self, **options): # noqa: C901
@ -94,22 +131,20 @@ class Command(BaseCommand):
branch = options.get("branch")
parse_requests = options.get("parse_requests")
parse_domains = options.get("parse_domains")
both = options.get("both")
add_managers = options.get("add_managers")
parse_managers = options.get("parse_managers")
skip_existing_portfolios = options.get("skip_existing_portfolios")
debug = options.get("debug")
if not both:
if not (parse_requests or parse_domains or add_managers):
raise CommandError(
"You must specify at least one of --parse_requests, --parse_domains, or --add_managers."
)
else:
if parse_requests or parse_domains:
raise CommandError("You cannot pass --parse_requests or --parse_domains when passing --both.")
# Parse script params
if not (parse_requests or parse_domains or parse_managers):
raise CommandError(
"You must specify at least one of --parse_requests, --parse_domains, or --parse_managers."
)
# Get agencies
federal_agency_filter = {"agency__iexact": agency_name} if agency_name else {"federal_type": branch}
agencies = FederalAgency.objects.filter(**federal_agency_filter)
if not agencies or agencies.count() < 1:
agencies = FederalAgency.objects.filter(agency__isnull=False, **federal_agency_filter).distinct()
if not agencies.exists():
if agency_name:
raise CommandError(
f"Cannot find the federal agency '{agency_name}' in our database. "
@ -118,421 +153,207 @@ class Command(BaseCommand):
)
else:
raise CommandError(f"Cannot find '{branch}' federal agencies in our database.")
portfolios = []
for federal_agency in agencies:
message = f"Processing federal agency '{federal_agency.agency}'..."
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
try:
# C901 'Command.handle' is too complex (12)
portfolio = self.handle_populate_portfolio(
federal_agency, parse_domains, parse_requests, both, skip_existing_portfolios
)
portfolios.append(portfolio)
if add_managers:
self.add_managers_to_portfolio(portfolio)
except Exception as exec:
self.failed_portfolios.add(federal_agency)
logger.error(exec)
message = f"Failed to create portfolio '{federal_agency.agency}'"
TerminalHelper.colorful_logger(logger.info, TerminalColors.FAIL, message)
# POST PROCESS STEP: Add additional suborg info where applicable.
updated_suborg_count = self.post_process_all_suborganization_fields(agencies)
message = f"Added city and state_territory information to {updated_suborg_count} suborgs."
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
TerminalHelper.log_script_run_summary(
self.updated_portfolios,
self.failed_portfolios,
self.skipped_portfolios,
debug=False,
log_header="============= FINISHED HANDLE PORTFOLIO STEP ===============",
# Store all portfolios and agencies in a dict to avoid extra db calls
existing_portfolios = Portfolio.objects.filter(
organization_name__in=agencies.values_list("agency", flat=True), organization_name__isnull=False
)
existing_portfolios_dict = {normalize_string(p.organization_name): p for p in existing_portfolios}
agencies_dict = {normalize_string(agency.agency): agency for agency in agencies}
# NOTE: exceptions to portfolio and suborg are intentionally uncaught.
# parse domains, requests, and managers all rely on these fields to function.
# An error here means everything down the line is compromised.
# The individual parse steps, however, are independent from eachother.
# == Handle portfolios == #
# Loop through every agency we want to add and create a portfolio if the record is new.
for federal_agency in agencies_dict.values():
norm_agency_name = normalize_string(federal_agency.agency)
portfolio = existing_portfolios_dict.get(norm_agency_name, None)
if portfolio is None:
portfolio = Portfolio(
organization_name=federal_agency.agency,
federal_agency=federal_agency,
organization_type=DomainRequest.OrganizationChoices.FEDERAL,
creator=User.get_default_user(),
notes="Auto-generated record",
senior_official=federal_agency.so_federal_agency.first(),
)
self.portfolio_changes.create.append(portfolio)
logger.info(f"{TerminalColors.OKGREEN}Created portfolio '{portfolio}'.{TerminalColors.ENDC}")
elif skip_existing_portfolios:
message = f"Portfolio '{portfolio}' already exists. Skipped."
logger.info(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
self.portfolio_changes.skip.append(portfolio)
# Create portfolios
self.portfolio_changes.bulk_create()
# After create, get the list of all portfolios to use
portfolios_to_use = set(self.portfolio_changes.create)
if not skip_existing_portfolios:
portfolios_to_use.update(set(existing_portfolios))
portfolios_to_use_dict = {normalize_string(p.organization_name): p for p in portfolios_to_use}
# == Handle suborganizations == #
created_suborgs = self.create_suborganizations(portfolios_to_use_dict, agencies_dict)
if created_suborgs:
self.suborganization_changes.create.extend(created_suborgs.values())
self.suborganization_changes.bulk_create()
# == Handle domains and requests == #
for portfolio_org_name, portfolio in portfolios_to_use_dict.items():
federal_agency = agencies_dict.get(portfolio_org_name)
suborgs = portfolio.portfolio_suborganizations.in_bulk(field_name="name")
if parse_domains:
updated_domains = self.update_domains(portfolio, federal_agency, suborgs, debug)
self.domain_info_changes.update.extend(updated_domains)
if parse_requests:
updated_domain_requests = self.update_requests(portfolio, federal_agency, suborgs, debug)
self.domain_request_changes.update.extend(updated_domain_requests)
# Update DomainInformation
try:
self.domain_info_changes.bulk_update(["portfolio", "sub_organization"])
except Exception as err:
logger.error(f"{TerminalColors.FAIL}Could not bulk update domain infos.{TerminalColors.ENDC}")
logger.error(err, exc_info=True)
# Update DomainRequest
try:
self.domain_request_changes.bulk_update(
[
"portfolio",
"sub_organization",
"requested_suborganization",
"suborganization_city",
"suborganization_state_territory",
"federal_agency",
]
)
except Exception as err:
logger.error(f"{TerminalColors.FAIL}Could not bulk update domain requests.{TerminalColors.ENDC}")
logger.error(err, exc_info=True)
# == Handle managers (no bulk_create) == #
if parse_managers:
domain_infos = DomainInformation.objects.filter(portfolio__in=portfolios_to_use)
domains = Domain.objects.filter(domain_info__in=domain_infos)
# Create UserPortfolioPermission
self.create_user_portfolio_permissions(domains)
# Create PortfolioInvitation
self.create_portfolio_invitations(domains)
# == PRINT RUN SUMMARY == #
self.print_final_run_summary(parse_domains, parse_requests, parse_managers, debug)
def print_final_run_summary(self, parse_domains, parse_requests, parse_managers, debug):
self.portfolio_changes.print_script_run_summary(
no_changes_message="||============= No portfolios changed. =============||",
log_header="============= PORTFOLIOS =============",
skipped_header="----- SOME PORTFOLIOS WERENT CREATED (BUT OTHER RECORDS ARE STILL PROCESSED) -----",
display_as_str=True,
)
if add_managers:
TerminalHelper.log_script_run_summary(
self.added_managers,
self.failed_managers,
[], # can't skip managers, can only add or fail
log_header="----- MANAGERS ADDED -----",
debug=False,
display_as_str=True,
)
TerminalHelper.log_script_run_summary(
self.added_invitations,
[],
self.skipped_invitations,
log_header="----- INVITATIONS ADDED -----",
debug=False,
skipped_header="----- INVITATIONS SKIPPED (ALREADY EXISTED) -----",
display_as_str=True,
)
# POST PROCESSING STEP: Remove the federal agency if it matches the portfolio name.
# We only do this for started domain requests.
if parse_requests or both:
prompt_message = (
"This action will update domain requests even if they aren't on a portfolio."
"\nNOTE: This will modify domain requests, even if no portfolios were created."
"\nIn the event no portfolios *are* created, then this step will target "
"the existing portfolios with your given params."
"\nThis step is entirely optional, and is just for extra data cleanup."
)
TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
prompt_message=prompt_message,
prompt_title=(
"POST PROCESS STEP: Do you want to clear federal agency on (related) started domain requests?"
),
verify_message="*** THIS STEP IS OPTIONAL ***",
)
self.post_process_started_domain_requests(agencies, portfolios)
def add_managers_to_portfolio(self, portfolio: Portfolio):
"""
Add all domain managers of the portfolio's domains to the organization.
This includes adding them to the correct group and creating portfolio invitations.
"""
logger.info(f"Adding managers for portfolio {portfolio}")
# Fetch all domains associated with the portfolio
domains = Domain.objects.filter(domain_info__portfolio=portfolio)
domain_managers: set[UserDomainRole] = set()
# Fetch all users with manager roles for the domains
# select_related means that a db query will not be occur when you do user_domain_role.user
# Its similar to a set or dict in that it costs slightly more upfront in exchange for perf later
user_domain_roles = UserDomainRole.objects.select_related("user").filter(
domain__in=domains, role=UserDomainRole.Roles.MANAGER
)
domain_managers.update(user_domain_roles)
invited_managers: set[str] = set()
# Get the emails of invited managers
domain_invitations = DomainInvitation.objects.filter(
domain__in=domains, status=DomainInvitation.DomainInvitationStatus.INVITED
).values_list("email", flat=True)
invited_managers.update(domain_invitations)
for user_domain_role in domain_managers:
try:
# manager is a user id
user = user_domain_role.user
_, created = UserPortfolioPermission.objects.get_or_create(
portfolio=portfolio,
user=user,
defaults={"roles": [UserPortfolioRoleChoices.ORGANIZATION_MEMBER]},
)
self.added_managers.add(user)
if created:
logger.info(f"Added manager '{user}' to portfolio '{portfolio}'")
else:
logger.info(f"Manager '{user}' already exists in portfolio '{portfolio}'")
except User.DoesNotExist:
self.failed_managers.add(user)
logger.debug(f"User '{user}' does not exist")
for email in invited_managers:
self.create_portfolio_invitation(portfolio, email)
def create_portfolio_invitation(self, portfolio: Portfolio, email: str):
"""
Create a portfolio invitation for the given email.
"""
_, created = PortfolioInvitation.objects.get_or_create(
portfolio=portfolio,
email=email,
defaults={
"status": PortfolioInvitation.PortfolioInvitationStatus.INVITED,
"roles": [UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
},
)
if created:
self.added_invitations.add(email)
logger.info(f"Created portfolio invitation for '{email}' to portfolio '{portfolio}'")
else:
self.skipped_invitations.add(email)
logger.info(f"Found existing portfolio invitation for '{email}' to portfolio '{portfolio}'")
def post_process_started_domain_requests(self, agencies, portfolios):
"""
Removes duplicate organization data by clearing federal_agency when it matches the portfolio name.
Only processes domain requests in STARTED status.
"""
message = "Removing duplicate portfolio and federal_agency values from domain requests..."
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
# For each request, clear the federal agency under these conditions:
# 1. A portfolio *already exists* with the same name as the federal agency.
# 2. Said portfolio (or portfolios) are only the ones specified at the start of the script.
# 3. The domain request is in status "started".
# Note: Both names are normalized so excess spaces are stripped and the string is lowercased.
domain_requests_to_update = DomainRequest.objects.filter(
federal_agency__in=agencies,
federal_agency__agency__isnull=False,
status=DomainRequest.DomainRequestStatus.STARTED,
organization_name__isnull=False,
)
if domain_requests_to_update.count() == 0:
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, "No domain requests to update.")
return
portfolio_set = {normalize_string(portfolio.organization_name) for portfolio in portfolios if portfolio}
# Update the request, assuming the given agency name matches the portfolio name
updated_requests = []
for req in domain_requests_to_update:
agency_name = normalize_string(req.federal_agency.agency)
if agency_name in portfolio_set:
req.federal_agency = None
updated_requests.append(req)
# Execute the update and Log the results
if TerminalHelper.prompt_for_execution(
system_exit_on_terminate=False,
prompt_message=(
f"{len(domain_requests_to_update)} domain requests will be updated. "
f"These records will be changed: {[str(req) for req in updated_requests]}"
detailed_prompt_title=(
"PORTFOLIOS: Do you wish to see the full list of failed, skipped and updated records?"
),
prompt_title="Do you wish to commit this update to the database?",
):
DomainRequest.objects.bulk_update(updated_requests, ["federal_agency"])
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKBLUE, "Action completed successfully.")
def handle_populate_portfolio(self, federal_agency, parse_domains, parse_requests, both, skip_existing_portfolios):
"""Attempts to create a portfolio. If successful, this function will
also create new suborganizations"""
portfolio, created = self.create_portfolio(federal_agency)
if skip_existing_portfolios and not created:
TerminalHelper.colorful_logger(
logger.warning,
TerminalColors.YELLOW,
"Skipping modifications to suborgs, domain requests, and "
"domains due to the --skip_existing_portfolios flag. Portfolio already exists.",
)
return portfolio
self.create_suborganizations(portfolio, federal_agency)
if parse_domains or both:
self.handle_portfolio_domains(portfolio, federal_agency)
if parse_requests or both:
self.handle_portfolio_requests(portfolio, federal_agency)
return portfolio
def create_portfolio(self, federal_agency):
"""Creates a portfolio if it doesn't presently exist.
Returns portfolio, created."""
# Get the org name / senior official
org_name = federal_agency.agency
so = federal_agency.so_federal_agency.first() if federal_agency.so_federal_agency.exists() else None
# First just try to get an existing portfolio
portfolio = Portfolio.objects.filter(organization_name=org_name).first()
if portfolio:
self.skipped_portfolios.add(portfolio)
TerminalHelper.colorful_logger(
logger.info,
TerminalColors.YELLOW,
f"Portfolio with organization name '{org_name}' already exists. Skipping create.",
)
return portfolio, False
# Create new portfolio if it doesn't exist
portfolio = Portfolio.objects.create(
organization_name=org_name,
federal_agency=federal_agency,
organization_type=DomainRequest.OrganizationChoices.FEDERAL,
creator=User.get_default_user(),
notes="Auto-generated record",
senior_official=so,
display_as_str=True,
debug=debug,
)
self.suborganization_changes.print_script_run_summary(
no_changes_message="||============= No suborganizations changed. =============||",
log_header="============= SUBORGANIZATIONS =============",
skipped_header="----- SUBORGANIZATIONS SKIPPED (SAME NAME AS PORTFOLIO NAME) -----",
detailed_prompt_title=(
"SUBORGANIZATIONS: Do you wish to see the full list of failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
)
self.updated_portfolios.add(portfolio)
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, f"Created portfolio '{portfolio}'")
# Log if the senior official was added or not.
if portfolio.senior_official:
message = f"Added senior official '{portfolio.senior_official}'"
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message)
else:
message = (
f"No senior official added to portfolio '{org_name}'. "
"None was returned for the reverse relation `FederalAgency.so_federal_agency.first()`"
if parse_domains:
self.domain_info_changes.print_script_run_summary(
no_changes_message="||============= No domains changed. =============||",
log_header="============= DOMAINS =============",
detailed_prompt_title=(
"DOMAINS: Do you wish to see the full list of failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
)
TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message)
return portfolio, True
if parse_requests:
self.domain_request_changes.print_script_run_summary(
no_changes_message="||============= No domain requests changed. =============||",
log_header="============= DOMAIN REQUESTS =============",
detailed_prompt_title=(
"DOMAIN REQUESTS: Do you wish to see the full list of failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
)
def create_suborganizations(self, portfolio: Portfolio, federal_agency: FederalAgency):
if parse_managers:
self.user_portfolio_perm_changes.print_script_run_summary(
no_changes_message="||============= No managers changed. =============||",
log_header="============= MANAGERS =============",
skipped_header="----- MANAGERS SKIPPED (ALREADY EXISTED) -----",
detailed_prompt_title=(
"MANAGERS: Do you wish to see the full list of failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
)
self.portfolio_invitation_changes.print_script_run_summary(
no_changes_message="||============= No manager invitations changed. =============||",
log_header="============= MANAGER INVITATIONS =============",
skipped_header="----- INVITATIONS SKIPPED (ALREADY EXISTED) -----",
detailed_prompt_title=(
"MANAGER INVITATIONS: Do you wish to see the full list of failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
)
def create_suborganizations(self, portfolio_dict, agency_dict):
"""Create Suborganizations tied to the given portfolio based on DomainInformation objects"""
valid_agencies = DomainInformation.objects.filter(
federal_agency=federal_agency, organization_name__isnull=False
)
org_names = set(valid_agencies.values_list("organization_name", flat=True))
if not org_names:
message = (
"Could not add any suborganizations."
f"\nNo suborganizations were found for '{federal_agency}' when filtering on this name, "
"and excluding null organization_name records."
)
TerminalHelper.colorful_logger(logger.warning, TerminalColors.FAIL, message)
return
created_suborgs = {}
# Check for existing suborgs on the current portfolio
existing_suborgs = Suborganization.objects.filter(name__in=org_names, name__isnull=False)
if existing_suborgs.exists():
message = f"Some suborganizations already exist for portfolio '{portfolio}'."
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKBLUE, message)
portfolios = portfolio_dict.values()
agencies = agency_dict.values()
# Create new suborgs, as long as they don't exist in the db already
new_suborgs = []
for name in org_names - set(existing_suborgs.values_list("name", flat=True)):
if normalize_string(name) == normalize_string(portfolio.organization_name):
# You can use this to populate location information, when this occurs.
# However, this isn't needed for now so we can skip it.
message = (
f"Skipping suborganization create on record '{name}'. "
"The federal agency name is the same as the portfolio name."
)
TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, message)
else:
new_suborgs.append(Suborganization(name=name, portfolio=portfolio)) # type: ignore
if new_suborgs:
Suborganization.objects.bulk_create(new_suborgs)
TerminalHelper.colorful_logger(
logger.info, TerminalColors.OKGREEN, f"Added {len(new_suborgs)} suborganizations"
)
else:
TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, "No suborganizations added")
def handle_portfolio_requests(self, portfolio: Portfolio, federal_agency: FederalAgency):
"""
Associate portfolio with domain requests for a federal agency.
Updates all relevant domain request records.
"""
invalid_states = [
DomainRequest.DomainRequestStatus.STARTED,
DomainRequest.DomainRequestStatus.INELIGIBLE,
DomainRequest.DomainRequestStatus.REJECTED,
]
domain_requests = DomainRequest.objects.filter(federal_agency=federal_agency).exclude(status__in=invalid_states)
if not domain_requests.exists():
message = f"""
Portfolio '{portfolio}' not added to domain requests: no valid records found.
This means that a filter on DomainInformation for the federal_agency '{federal_agency}' returned no results.
Excluded statuses: STARTED, INELIGIBLE, REJECTED.
Filter info: DomainRequest.objects.filter(federal_agency=federal_agency).exclude(
status__in=invalid_states
)
"""
TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message)
return None
# Get all suborg information and store it in a dict to avoid doing a db call
suborgs = Suborganization.objects.filter(portfolio=portfolio).in_bulk(field_name="name")
for domain_request in domain_requests:
# Set the portfolio
domain_request.portfolio = portfolio
# Set suborg info
domain_request.sub_organization = suborgs.get(domain_request.organization_name, None)
if domain_request.sub_organization is None:
domain_request.requested_suborganization = normalize_string(
domain_request.organization_name, lowercase=False
)
domain_request.suborganization_city = normalize_string(domain_request.city, lowercase=False)
domain_request.suborganization_state_territory = domain_request.state_territory
self.updated_portfolios.add(portfolio)
DomainRequest.objects.bulk_update(
domain_requests,
[
"portfolio",
"sub_organization",
"requested_suborganization",
"suborganization_city",
"suborganization_state_territory",
],
)
message = f"Added portfolio '{portfolio}' to {len(domain_requests)} domain requests."
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message)
def handle_portfolio_domains(self, portfolio: Portfolio, federal_agency: FederalAgency):
"""
Associate portfolio with domains for a federal agency.
Updates all relevant domain information records.
Returns a queryset of DomainInformation objects, or None if nothing changed.
"""
domain_infos = DomainInformation.objects.filter(federal_agency=federal_agency)
if not domain_infos.exists():
message = f"""
Portfolio '{portfolio}' not added to domains: no valid records found.
The filter on DomainInformation for the federal_agency '{federal_agency}' returned no results.
Filter info: DomainInformation.objects.filter(federal_agency=federal_agency)
"""
TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message)
return None
# Get all suborg information and store it in a dict to avoid doing a db call
suborgs = Suborganization.objects.filter(portfolio=portfolio).in_bulk(field_name="name")
for domain_info in domain_infos:
domain_info.portfolio = portfolio
domain_info.sub_organization = suborgs.get(domain_info.organization_name, None)
DomainInformation.objects.bulk_update(domain_infos, ["portfolio", "sub_organization"])
message = f"Added portfolio '{portfolio}' to {len(domain_infos)} domains."
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message)
def post_process_all_suborganization_fields(self, agencies):
"""Batch updates suborganization locations from domain and request data.
Args:
agencies: List of FederalAgency objects to process
Returns:
int: Number of suborganizations updated
Priority for location data:
1. Domain information
2. Domain request suborganization fields
3. Domain request standard fields
"""
# Common filter between domaininformation / domain request.
# Filter by only the agencies we've updated thus far.
# Then, only process records without null portfolio, org name, or suborg name.
base_filter = Q(
federal_agency__in=agencies,
portfolio__isnull=False,
organization_name__isnull=False,
sub_organization__isnull=False,
) & ~Q(organization_name__iexact=F("portfolio__organization_name"))
# First: Remove null city / state_territory values on domain info / domain requests.
# We want to add city data if there is data to add to begin with!
domains = DomainInformation.objects.filter(
base_filter,
Q(city__isnull=False, state_territory__isnull=False),
# Org name must not be null, and must not be the portfolio name
Q(
organization_name__isnull=False,
)
& ~Q(organization_name__iexact=F("portfolio__organization_name")),
# Only get relevant data to the agency/portfolio we are targeting
Q(federal_agency__in=agencies) | Q(portfolio__in=portfolios),
)
requests = DomainRequest.objects.filter(
base_filter,
(
Q(city__isnull=False, state_territory__isnull=False)
| Q(suborganization_city__isnull=False, suborganization_state_territory__isnull=False)
),
# Org name must not be null, and must not be the portfolio name
Q(
organization_name__isnull=False,
)
& ~Q(organization_name__iexact=F("portfolio__organization_name")),
# Only get relevant data to the agency/portfolio we are targeting
Q(federal_agency__in=agencies) | Q(portfolio__in=portfolios),
)
# First: get all existing suborgs
# NOTE: .all() is a heavy query, but unavoidable as we need to check for duplicate names.
# This is not quite as heavy as just using a for loop and .get_or_create, but worth noting.
# Change this if you can find a way to avoid doing this.
# This won't scale great for 10k+ records.
existing_suborgs = Suborganization.objects.all()
suborg_dict = {normalize_string(org.name): org for org in existing_suborgs}
# Second: Group domains and requests by normalized organization name.
# This means that later down the line we have to account for "duplicate" org names.
domains_dict = {}
requests_dict = {}
for domain in domains:
@ -543,40 +364,60 @@ class Command(BaseCommand):
normalized_name = normalize_string(request.organization_name)
requests_dict.setdefault(normalized_name, []).append(request)
# Third: Get suborganizations to update
suborgs_to_edit = Suborganization.objects.filter(
Q(id__in=domains.values_list("sub_organization", flat=True))
| Q(id__in=requests.values_list("sub_organization", flat=True))
)
# Third: Parse through each group of domains that have the same organization names,
# then create *one* suborg record from it.
# Normalize all suborg names so we don't add duplicate data unintentionally.
for portfolio_name, portfolio in portfolio_dict.items():
# For a given agency, find all domains that list suborg info for it.
for norm_org_name, domains in domains_dict.items():
# Don't add the record if the suborg name would equal the portfolio name
if norm_org_name == portfolio_name:
continue
# Fourth: Process each suborg to add city / state territory info
for suborg in suborgs_to_edit:
self.post_process_suborganization_fields(suborg, domains_dict, requests_dict)
new_suborg_name = None
if len(domains) == 1:
new_suborg_name = normalize_string(domains[0].organization_name, lowercase=False)
elif len(domains) > 1:
# Pick the best record for a suborg name (fewest spaces, most leading capitals)
best_record = max(
domains,
key=lambda rank: (
-domain.organization_name.count(" "),
count_capitals(domain.organization_name, leading_only=True),
),
)
new_suborg_name = normalize_string(best_record.organization_name, lowercase=False)
# Fifth: Perform a bulk update
return Suborganization.objects.bulk_update(suborgs_to_edit, ["city", "state_territory"])
# If the suborg already exists, don't add it again.
if norm_org_name not in suborg_dict and norm_org_name not in created_suborgs:
requests = requests_dict.get(norm_org_name)
suborg = Suborganization(name=new_suborg_name, portfolio=portfolio)
self.set_suborganization_location(suborg, domains, requests)
created_suborgs[norm_org_name] = suborg
return created_suborgs
def post_process_suborganization_fields(self, suborg, domains_dict, requests_dict):
def set_suborganization_location(self, suborg, domains, requests):
"""Updates a single suborganization's location data if valid.
Args:
suborg: Suborganization to update
domains_dict: Dict of domain info records grouped by org name
requests_dict: Dict of domain requests grouped by org name
domains: omain info records grouped by org name
requests: domain requests grouped by org name
Priority matches parent method. Updates are skipped if location data conflicts
Updates are skipped if location data conflicts
between multiple records of the same type.
"""
normalized_suborg_name = normalize_string(suborg.name)
domains = domains_dict.get(normalized_suborg_name, [])
requests = requests_dict.get(normalized_suborg_name, [])
# Try to get matching domain info
domain = None
if domains:
reference = domains[0]
use_location_for_domain = all(
d.city == reference.city and d.state_territory == reference.state_territory for d in domains
d.city
and d.state_territory
and d.city == reference.city
and d.state_territory == reference.state_territory
for d in domains
)
if use_location_for_domain:
domain = reference
@ -608,7 +449,7 @@ class Command(BaseCommand):
if not domain and not request:
message = f"Skipping adding city / state_territory information to suborg: {suborg}. Bad data."
TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, message)
logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
return
# PRIORITY:
@ -625,8 +466,106 @@ class Command(BaseCommand):
suborg.city = normalize_string(request.city, lowercase=False)
suborg.state_territory = request.state_territory
message = (
f"Added city/state_territory to suborg: {suborg}. "
f"city - {suborg.city}, state - {suborg.state_territory}"
def update_domains(self, portfolio, federal_agency, suborgs, debug):
"""
Associate portfolio with domains for a federal agency.
Updates all relevant domain information records.
Returns a queryset of DomainInformation objects, or None if nothing changed.
"""
updated_domains = set()
domain_infos = federal_agency.domaininformation_set.all()
for domain_info in domain_infos:
org_name = normalize_string(domain_info.organization_name, lowercase=False)
domain_info.portfolio = portfolio
domain_info.sub_organization = suborgs.get(org_name, None)
updated_domains.add(domain_info)
if not updated_domains and debug:
message = f"Portfolio '{portfolio}' not added to domains: nothing to add found."
logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
return updated_domains
def update_requests(
self,
portfolio,
federal_agency,
suborgs,
debug,
):
"""
Associate portfolio with domain requests for a federal agency.
Updates all relevant domain request records.
"""
updated_domain_requests = set()
invalid_states = [
DomainRequest.DomainRequestStatus.INELIGIBLE,
DomainRequest.DomainRequestStatus.REJECTED,
]
domain_requests = federal_agency.domainrequest_set.exclude(status__in=invalid_states)
# Add portfolio, sub_org, requested_suborg, suborg_city, and suborg_state_territory.
# For started domain requests, set the federal agency to None if not on a portfolio.
for domain_request in domain_requests:
if domain_request.status != DomainRequest.DomainRequestStatus.STARTED:
org_name = normalize_string(domain_request.organization_name, lowercase=False)
domain_request.portfolio = portfolio
domain_request.sub_organization = suborgs.get(org_name, None)
if domain_request.sub_organization is None:
domain_request.requested_suborganization = normalize_string(
domain_request.organization_name, lowercase=False
)
domain_request.suborganization_city = normalize_string(domain_request.city, lowercase=False)
domain_request.suborganization_state_territory = domain_request.state_territory
else:
# Clear the federal agency for started domain requests
agency_name = normalize_string(domain_request.federal_agency.agency)
portfolio_name = normalize_string(portfolio.organization_name)
if agency_name == portfolio_name:
domain_request.federal_agency = None
logger.info(f"Set federal agency on started domain request '{domain_request}' to None.")
updated_domain_requests.add(domain_request)
if not updated_domain_requests and debug:
message = f"Portfolio '{portfolio}' not added to domain requests: nothing to add found."
logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
return updated_domain_requests
def create_user_portfolio_permissions(self, domains):
user_domain_roles = UserDomainRole.objects.select_related(
"user", "domain", "domain__domain_info", "domain__domain_info__portfolio"
).filter(domain__in=domains, domain__domain_info__portfolio__isnull=False, role=UserDomainRole.Roles.MANAGER)
for user_domain_role in user_domain_roles:
user = user_domain_role.user
permission, created = UserPortfolioPermission.objects.get_or_create(
portfolio=user_domain_role.domain.domain_info.portfolio,
user=user,
defaults={"roles": [UserPortfolioRoleChoices.ORGANIZATION_MEMBER]},
)
if created:
self.user_portfolio_perm_changes.create.append(permission)
else:
self.user_portfolio_perm_changes.skip.append(permission)
def create_portfolio_invitations(self, domains):
domain_invitations = DomainInvitation.objects.select_related(
"domain", "domain__domain_info", "domain__domain_info__portfolio"
).filter(
domain__in=domains,
domain__domain_info__portfolio__isnull=False,
status=DomainInvitation.DomainInvitationStatus.INVITED,
)
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
for domain_invitation in domain_invitations:
email = normalize_string(domain_invitation.email)
invitation, created = PortfolioInvitation.objects.get_or_create(
portfolio=domain_invitation.domain.domain_info.portfolio,
email=email,
status=PortfolioInvitation.PortfolioInvitationStatus.INVITED,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
if created:
self.portfolio_invitation_changes.create.append(invitation)
else:
self.portfolio_invitation_changes.skip.append(invitation)

View file

@ -51,7 +51,7 @@ class Command(BaseCommand):
ScriptDataHelper.bulk_update_fields(Domain, self.to_update, ["first_ready"])
# Log what happened
TerminalHelper.log_script_run_summary(self.to_update, self.failed_to_update, self.skipped, debug)
TerminalHelper.log_script_run_summary(self.to_update, self.failed_to_update, self.skipped, [], debug=debug)
def update_first_ready_for_domain(self, domain: Domain, debug: bool):
"""Grabs the created_at field and associates it with the first_ready column.

View file

@ -144,7 +144,12 @@ class Command(BaseCommand):
# Log what happened
log_header = "============= FINISHED UPDATE FOR DOMAINREQUEST ==============="
TerminalHelper.log_script_run_summary(
self.request_to_update, self.request_failed_to_update, self.request_skipped, True, log_header
self.request_to_update,
self.request_failed_to_update,
self.request_skipped,
[],
debug=True,
log_header=log_header,
)
update_skipped_count = len(self.request_to_update)
@ -195,7 +200,7 @@ class Command(BaseCommand):
# Log what happened
log_header = "============= FINISHED UPDATE FOR DOMAININFORMATION ==============="
TerminalHelper.log_script_run_summary(
self.di_to_update, self.di_failed_to_update, self.di_skipped, True, log_header
self.di_to_update, self.di_failed_to_update, self.di_skipped, [], debug=True, log_header=log_header
)
update_skipped_count = len(self.di_skipped)

View file

@ -0,0 +1,105 @@
import logging
import argparse
from django.core.management import BaseCommand
from registrar.management.commands.utility.terminal_helper import PopulateScriptTemplate, TerminalColors
from registrar.models import PublicContact
from registrar.models.utility.generic_helper import normalize_string
from registrar.utility.enums import DefaultEmail
logger = logging.getLogger(__name__)
class Command(BaseCommand, PopulateScriptTemplate):
help = "Loops through each default PublicContact and updates some values on each"
def add_arguments(self, parser):
"""Adds command line arguments"""
parser.add_argument(
"--overwrite_updated_contacts",
action=argparse.BooleanOptionalAction,
help=(
"Loops over PublicContacts with an email of 'help@get.gov' when enabled."
"Use this setting if the record was updated in the DB but not correctly in EPP."
),
)
parser.add_argument(
"--target_domain",
help=(
"Updates the public contact on a given domain name (case insensitive). "
"Use this option to avoid doing a mass-update of every public contact record."
),
)
def handle(self, **kwargs):
"""Loops through each valid User object and updates its verification_type value"""
overwrite_updated_contacts = kwargs.get("overwrite_updated_contacts")
target_domain = kwargs.get("target_domain")
default_emails = {email for email in DefaultEmail}
# Don't update records we've already updated
if not overwrite_updated_contacts:
default_emails.remove(DefaultEmail.PUBLIC_CONTACT_DEFAULT)
# We should only update DEFAULT records. This means that if all values are not default,
# we should skip as this could lead to data corruption.
# Since we check for all fields, we don't account for casing differences.
self.old_and_new_default_contact_values = {
"name": {
"csd/cb attn: .gov tld",
"csd/cb attn: cameron dixon",
"program manager",
"registry customer service",
},
"street1": {"1110 n. glebe rd", "cisa ngr stop 0645", "4200 wilson blvd."},
"pc": {"22201", "20598-0645"},
"email": default_emails,
}
if not target_domain:
filter_condition = {"email__in": default_emails}
else:
filter_condition = {"email__in": default_emails, "domain__name__iexact": target_domain}
# This variable is decorative since we are skipping bulk update
fields_to_update = ["name", "street1", "pc", "email"]
self.mass_update_records(PublicContact, filter_condition, fields_to_update, show_record_count=True)
def bulk_update_fields(self, *args, **kwargs):
"""Skip bulk update since we need to manually save each field.
Our EPP logic is tied to an override of .save(), and this also associates
with our caching logic for this area of the code.
Since bulk update does not trigger .save() for each field, we have to
call it manually.
"""
return None
def update_record(self, record: PublicContact):
"""Defines how we update the verification_type field"""
record.name = "CSD/CB Attn: .gov TLD"
record.street1 = "1110 N. Glebe Rd"
record.pc = "22201"
record.email = DefaultEmail.PUBLIC_CONTACT_DEFAULT
record.save()
logger.info(f"{TerminalColors.OKCYAN}Updated '{record}' in EPP.{TerminalColors.ENDC}")
def should_skip_record(self, record) -> bool: # noqa
"""Skips updating a public contact if it contains different default info."""
if record.registry_id and len(record.registry_id) < 16:
message = (
f"Skipping legacy verisign contact '{record}'. "
f"The registry_id field has a length less than 16 characters."
)
logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
return True
for key, expected_values in self.old_and_new_default_contact_values.items():
record_field = normalize_string(getattr(record, key))
if record_field not in expected_values:
message = (
f"Skipping '{record}' to avoid potential data corruption. "
f"The field '{key}' does not match the default.\n"
f"Details: DB value - {record_field}, expected value(s) - {expected_values}"
)
logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
return True
return False

View file

@ -32,7 +32,7 @@ class ScriptDataHelper:
"""Helper method with utilities to speed up development of scripts that do DB operations"""
@staticmethod
def bulk_update_fields(model_class, update_list, fields_to_update, batch_size=1000):
def bulk_update_fields(model_class, update_list, fields_to_update, batch_size=1000, quiet=False):
"""
This function performs a bulk update operation on a specified Django model class in batches.
It uses Django's Paginator to handle large datasets in a memory-efficient manner.
@ -51,9 +51,12 @@ class ScriptDataHelper:
fields_to_update: Specifies which fields to update.
Usage:
bulk_update_fields(Domain, page.object_list, ["first_ready"])
ScriptDataHelper.bulk_update_fields(Domain, page.object_list, ["first_ready"])
Returns: A queryset of the updated objets
"""
logger.info(f"{TerminalColors.YELLOW} Bulk updating fields... {TerminalColors.ENDC}")
if not quiet:
logger.info(f"{TerminalColors.YELLOW} Bulk updating fields... {TerminalColors.ENDC}")
# Create a Paginator object. Bulk_update on the full dataset
# is too memory intensive for our current app config, so we can chunk this data instead.
paginator = Paginator(update_list, batch_size)
@ -61,6 +64,41 @@ class ScriptDataHelper:
page = paginator.page(page_num)
model_class.objects.bulk_update(page.object_list, fields_to_update)
@staticmethod
def bulk_create_fields(model_class, update_list, batch_size=1000, return_created=False, quiet=False):
"""
This function performs a bulk create operation on a specified Django model class in batches.
It uses Django's Paginator to handle large datasets in a memory-efficient manner.
Parameters:
model_class: The Django model class that you want to perform the bulk update on.
This should be the actual class, not a string of the class name.
update_list: A list of model instances that you want to update. Each instance in the list
should already have the updated values set on the instance.
batch_size: The maximum number of model instances to update in a single database query.
Defaults to 1000. If you're dealing with models that have a large number of fields,
or large field values, you may need to decrease this value to prevent out-of-memory errors.
Usage:
ScriptDataHelper.bulk_add_fields(Domain, page.object_list)
Returns: A queryset of the added objects
"""
if not quiet:
logger.info(f"{TerminalColors.YELLOW} Bulk adding fields... {TerminalColors.ENDC}")
created_objs = []
paginator = Paginator(update_list, batch_size)
for page_num in paginator.page_range:
page = paginator.page(page_num)
all_created = model_class.objects.bulk_create(page.object_list)
if return_created:
created_objs.extend([created.id for created in all_created])
if return_created:
return model_class.objects.filter(id__in=created_objs)
return None
class PopulateScriptTemplate(ABC):
"""
@ -86,7 +124,9 @@ class PopulateScriptTemplate(ABC):
"""
raise NotImplementedError
def mass_update_records(self, object_class, filter_conditions, fields_to_update, debug=True, verbose=False):
def mass_update_records(
self, object_class, filter_conditions, fields_to_update, debug=True, verbose=False, show_record_count=False
):
"""Loops through each valid "object_class" object - specified by filter_conditions - and
updates fields defined by fields_to_update using update_record.
@ -106,6 +146,9 @@ class PopulateScriptTemplate(ABC):
verbose: Whether to print a detailed run summary *before* run confirmation.
Default: False.
show_record_count: Whether to show a 'Record 1/10' dialog when running update.
Default: False.
Raises:
NotImplementedError: If you do not define update_record before using this function.
TypeError: If custom_filter is not Callable.
@ -115,19 +158,19 @@ class PopulateScriptTemplate(ABC):
# apply custom filter
records = self.custom_filter(records)
records_length = len(records)
readable_class_name = self.get_class_name(object_class)
# for use in the execution prompt.
proposed_changes = f"""==Proposed Changes==
Number of {readable_class_name} objects to change: {len(records)}
These fields will be updated on each record: {fields_to_update}
"""
proposed_changes = (
"==Proposed Changes==\n"
f"Number of {readable_class_name} objects to change: {records_length}\n"
f"These fields will be updated on each record: {fields_to_update}"
)
if verbose:
proposed_changes = f"""{proposed_changes}
These records will be updated: {list(records.all())}
"""
proposed_changes = f"{proposed_changes}\n" f"These records will be updated: {list(records.all())}"
# Code execution will stop here if the user prompts "N"
TerminalHelper.prompt_for_execution(
@ -140,7 +183,9 @@ class PopulateScriptTemplate(ABC):
to_update: List[object_class] = []
to_skip: List[object_class] = []
failed_to_update: List[object_class] = []
for record in records:
for i, record in enumerate(records, start=1):
if show_record_count:
logger.info(f"{TerminalColors.BOLD}Record {i}/{records_length}{TerminalColors.ENDC}")
try:
if not self.should_skip_record(record):
self.update_record(record)
@ -154,18 +199,23 @@ class PopulateScriptTemplate(ABC):
logger.error(fail_message)
# Do a bulk update on the desired field
ScriptDataHelper.bulk_update_fields(object_class, to_update, fields_to_update)
self.bulk_update_fields(object_class, to_update, fields_to_update)
# Log what happened
TerminalHelper.log_script_run_summary(
to_update,
failed_to_update,
to_skip,
[],
debug=debug,
log_header=self.run_summary_header,
display_as_str=True,
)
def bulk_update_fields(self, object_class, to_update, fields_to_update):
"""Bulk updates the given fields"""
ScriptDataHelper.bulk_update_fields(object_class, to_update, fields_to_update)
def get_class_name(self, sender) -> str:
"""Returns the class name that we want to display for the terminal prompt.
Example: DomainRequest => "Domain Request"
@ -190,81 +240,96 @@ class PopulateScriptTemplate(ABC):
class TerminalHelper:
@staticmethod
def log_script_run_summary(
to_update, failed_to_update, skipped, debug: bool, log_header=None, skipped_header=None, display_as_str=False
create,
update,
skip,
fail,
debug: bool,
log_header="============= FINISHED =============",
skipped_header="----- SOME DATA WAS INVALID (NEEDS MANUAL PATCHING) -----",
failed_header="----- UPDATE FAILED -----",
display_as_str=False,
detailed_prompt_title="Do you wish to see the full list of failed, skipped and updated records?",
):
"""Prints success, failed, and skipped counts, as well as
all affected objects."""
update_success_count = len(to_update)
update_failed_count = len(failed_to_update)
update_skipped_count = len(skipped)
"""Generates a formatted summary of script execution results with colored output.
if log_header is None:
log_header = "============= FINISHED ==============="
Displays counts and details of successful, failed, and skipped operations.
In debug mode or when prompted, shows full record details.
Uses color coding: green for success, yellow for skipped, red for failures.
if skipped_header is None:
skipped_header = "----- SOME DATA WAS INVALID (NEEDS MANUAL PATCHING) -----"
Args:
to_update: Records that were successfully updated
failed_to_update: Records that failed to update
skipped: Records that were intentionally skipped
to_add: Records that were newly added
debug: If True, shows detailed record information
log_header: Custom header for the summary (default: "FINISHED")
skipped_header: Custom header for skipped records section
failed_header: Custom header for failed records section
display_as_str: If True, converts records to strings for display
Output Format (if count > 0 for each category):
[log_header]
Created W entries
Updated X entries
[skipped_header]
Skipped updating Y entries
[failed_header]
Failed to update Z entries
Debug output (if enabled):
- Directly prints each list for each category (add, update, etc)
- Converts each item to string if display_as_str is True
"""
counts = {
"created": len(create),
"updated": len(update),
"skipped": len(skip),
"failed": len(fail),
}
# Give the user the option to see failed / skipped records if any exist.
display_detailed_logs = False
if not debug and update_failed_count > 0 or update_skipped_count > 0:
if not debug and counts["failed"] > 0 or counts["skipped"] > 0:
display_detailed_logs = TerminalHelper.prompt_for_execution(
system_exit_on_terminate=False,
prompt_message=f"You will see {update_failed_count} failed and {update_skipped_count} skipped records.",
prompt_message=f'You will see {counts["failed"]} failed and {counts["skipped"]} skipped records.',
verify_message="** Some records were skipped, or some failed to update. **",
prompt_title="Do you wish to see the full list of failed, skipped and updated records?",
prompt_title=detailed_prompt_title,
)
# Prepare debug messages
if debug or display_detailed_logs:
updated_display = [str(u) for u in to_update] if display_as_str else to_update
skipped_display = [str(s) for s in skipped] if display_as_str else skipped
failed_display = [str(f) for f in failed_to_update] if display_as_str else failed_to_update
debug_messages = {
"success": (f"{TerminalColors.OKCYAN}Updated: {updated_display}{TerminalColors.ENDC}\n"),
"skipped": (f"{TerminalColors.YELLOW}Skipped: {skipped_display}{TerminalColors.ENDC}\n"),
"failed": (f"{TerminalColors.FAIL}Failed: {failed_display}{TerminalColors.ENDC}\n"),
}
non_zero_counts = {category: count for category, count in counts.items() if count > 0}
messages = []
for category, count in non_zero_counts.items():
match category:
case "created":
label, values, debug_color = "Created", create, TerminalColors.OKBLUE
case "updated":
label, values, debug_color = "Updated", update, TerminalColors.OKCYAN
case "skipped":
label, values, debug_color = "Skipped updating", skip, TerminalColors.YELLOW
messages.append(skipped_header)
case "failed":
label, values, debug_color = "Failed to update", fail, TerminalColors.FAIL
messages.append(failed_header)
messages.append(f"{label} {count} entries")
# Print out a list of everything that was changed, if we have any changes to log.
# Otherwise, don't print anything.
TerminalHelper.print_conditional(
True,
f"{debug_messages.get('success') if update_success_count > 0 else ''}"
f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}"
f"{debug_messages.get('failed') if update_failed_count > 0 else ''}",
)
# Print debug messages (prints the internal add, update, skip, fail lists)
if debug or display_detailed_logs:
display_values = [str(v) for v in values] if display_as_str else values
debug_message = f"{label}: {display_values}"
logger.info(f"{debug_color}{debug_message}{TerminalColors.ENDC}")
if update_failed_count == 0 and update_skipped_count == 0:
logger.info(
f"""{TerminalColors.OKGREEN}
{log_header}
Updated {update_success_count} entries
{TerminalColors.ENDC}
"""
)
elif update_failed_count == 0:
logger.warning(
f"""{TerminalColors.YELLOW}
{log_header}
Updated {update_success_count} entries
{skipped_header}
Skipped updating {update_skipped_count} entries
{TerminalColors.ENDC}
"""
)
final_message = f"\n{log_header}\n" + "\n".join(messages)
if counts["failed"] > 0:
logger.error(f"{TerminalColors.FAIL}{final_message}{TerminalColors.ENDC}")
elif counts["skipped"] > 0:
logger.warning(f"{TerminalColors.YELLOW}{final_message}{TerminalColors.ENDC}")
else:
logger.error(
f"""{TerminalColors.FAIL}
{log_header}
Updated {update_success_count} entries
----- UPDATE FAILED -----
Failed to update {update_failed_count} entries,
Skipped updating {update_skipped_count} entries
{TerminalColors.ENDC}
"""
)
logger.info(f"{TerminalColors.OKGREEN}{final_message}{TerminalColors.ENDC}")
@staticmethod
def query_yes_no(question: str, default="yes"):
@ -402,11 +467,11 @@ class TerminalHelper:
# and ask if they wish to proceed
proceed_execution = TerminalHelper.query_yes_no_exit(
f"\n{TerminalColors.OKCYAN}"
"====================================================="
f"\n{prompt_title}\n"
"====================================================="
f"\n{verify_message}\n"
f"\n{prompt_message}\n"
"=====================================================\n"
f"{prompt_title}\n"
"=====================================================\n"
f"{verify_message}\n"
f"{prompt_message}\n"
f"{TerminalColors.FAIL}"
f"Proceed? (Y = proceed, N = {action_description_for_selecting_no})"
f"{TerminalColors.ENDC}"
@ -463,4 +528,4 @@ class TerminalHelper:
terminal_color = color
colored_message = f"{terminal_color}{message}{TerminalColors.ENDC}"
log_method(colored_message, exc_info=exc_info)
return log_method(colored_message, exc_info=exc_info)

View file

@ -0,0 +1,37 @@
# This migration creates the create_full_access_group and create_cisa_analyst_group groups
# If permissions on the groups need changing, edit CISA_ANALYST_GROUP_PERMISSIONS
# in the user_group model then:
# [NOT RECOMMENDED]
# step 1: docker-compose exec app ./manage.py migrate --fake registrar 0035_contenttypes_permissions
# step 2: docker-compose exec app ./manage.py migrate registrar 0036_create_groups
# step 3: fake run the latest migration in the migrations list
# [RECOMMENDED]
# Alternatively:
# step 1: duplicate the migration that loads data
# step 2: docker-compose exec app ./manage.py migrate
from django.db import migrations
from registrar.models import UserGroup
from typing import Any
# For linting: RunPython expects a function reference,
# so let's give it one
def create_groups(apps, schema_editor) -> Any:
UserGroup.create_cisa_analyst_group(apps, schema_editor)
UserGroup.create_omb_analyst_group(apps, schema_editor)
UserGroup.create_full_access_group(apps, schema_editor)
class Migration(migrations.Migration):
dependencies = [
("registrar", "0144_domainrequest_eop_stakeholder_email_and_more"),
]
operations = [
migrations.RunPython(
create_groups,
reverse_code=migrations.RunPython.noop,
atomic=True,
),
]

View file

@ -256,6 +256,29 @@ class Domain(TimeStampedModel, DomainHelper):
req = commands.CheckDomain([domain_name])
return registry.send(req, cleaned=True).res_data[0].avail
@classmethod
def is_pending_delete(cls, domain: str) -> bool:
"""Check if domain is pendingDelete state via response from registry."""
domain_name = domain.lower()
try:
info_req = commands.InfoDomain(domain_name)
info_response = registry.send(info_req, cleaned=True)
# Ensure res_data exists and is not empty
if info_response and info_response.res_data:
# Use _extract_data_from_response bc it's same thing but jsonified
domain_response = cls._extract_data_from_response(cls, info_response) # type: ignore
domain_status_state = domain_response.get("statuses")
if "pendingDelete" in str(domain_status_state):
return True
except RegistryError as err:
if not err.is_connection_error():
logger.info(f"Domain does not exist yet so it won't be in pending delete -- {err}")
return False
else:
raise err
return False
@classmethod
def registered(cls, domain: str) -> bool:
"""Check if a domain is _not_ available."""
@ -2026,7 +2049,9 @@ class Domain(TimeStampedModel, DomainHelper):
def _extract_data_from_response(self, data_response):
"""extract data from response from registry"""
data = data_response.res_data[0]
return {
"auth_info": getattr(data, "auth_info", ...),
"_contacts": getattr(data, "contacts", ...),

View file

@ -164,4 +164,4 @@ class PublicContact(TimeStampedModel):
return cls._meta.get_field("registry_id").max_length
def __str__(self):
return f"{self.name} <{self.email}>" f"id: {self.registry_id} " f"type: {self.contact_type}"
return self.registry_id

View file

@ -474,7 +474,7 @@ class User(AbstractUser):
admin_count = admins.count()
# Check if the current user is in the list of admins
if admin_count == 1 and admins.first().user == self:
if admin_count == 1 and admins.first() and admins.first().user == self:
return True # The user is the only admin
# If there are other admins or the user is not the only one

View file

@ -90,6 +90,14 @@ class UserGroup(Group):
"delete_userportfoliopermission",
],
},
{
"app_label": "registrar",
"model": "portfolioinvitation",
"permissions": [
"add_portfolioinvitation",
"view_portfolioinvitation",
],
},
]
# Avoid error: You can't execute queries until the end

View file

@ -0,0 +1,229 @@
"""
Centralized permissions management for the registrar.
"""
from django.urls import URLResolver, get_resolver, URLPattern
from registrar.decorators import (
HAS_PORTFOLIO_DOMAIN_REQUESTS_ANY_PERM,
IS_STAFF,
IS_DOMAIN_MANAGER,
IS_DOMAIN_MANAGER_AND_NOT_PORTFOLIO_MEMBER,
IS_PORTFOLIO_MEMBER_AND_DOMAIN_MANAGER,
IS_CISA_ANALYST,
IS_OMB_ANALYST,
IS_FULL_ACCESS,
IS_DOMAIN_REQUEST_CREATOR,
IS_STAFF_MANAGING_DOMAIN,
IS_PORTFOLIO_MEMBER,
HAS_PORTFOLIO_DOMAINS_ANY_PERM,
HAS_PORTFOLIO_DOMAINS_VIEW_ALL,
HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT,
HAS_PORTFOLIO_DOMAIN_REQUESTS_VIEW_ALL,
HAS_PORTFOLIO_MEMBERS_EDIT,
HAS_PORTFOLIO_MEMBERS_ANY_PERM,
HAS_PORTFOLIO_MEMBERS_VIEW,
ALL,
)
# Define permissions for each URL pattern by name
URL_PERMISSIONS = {
# Home & general pages
"home": [ALL],
"health": [ALL], # Intentionally no decorator
# Domain management
"domain": [HAS_PORTFOLIO_DOMAINS_VIEW_ALL, IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN],
"domain-dns": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN],
"domain-dns-nameservers": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN],
"domain-dns-dnssec": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN],
"domain-dns-dnssec-dsdata": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN],
"domain-org-name-address": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN],
"domain-suborganization": [IS_PORTFOLIO_MEMBER_AND_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN],
"domain-senior-official": [IS_DOMAIN_MANAGER_AND_NOT_PORTFOLIO_MEMBER, IS_STAFF_MANAGING_DOMAIN],
"domain-security-email": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN],
"domain-renewal": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN],
"domain-users": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN],
"domain-users-add": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN],
"domain-user-delete": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN],
# Portfolio management
"domains": [HAS_PORTFOLIO_DOMAINS_ANY_PERM],
"no-portfolio-domains": [IS_PORTFOLIO_MEMBER],
"no-organization-domains": [IS_PORTFOLIO_MEMBER],
"members": [HAS_PORTFOLIO_MEMBERS_ANY_PERM],
"member": [HAS_PORTFOLIO_MEMBERS_ANY_PERM],
"member-delete": [HAS_PORTFOLIO_MEMBERS_EDIT],
"member-permissions": [HAS_PORTFOLIO_MEMBERS_EDIT],
"member-domains": [HAS_PORTFOLIO_MEMBERS_ANY_PERM],
"member-domains-edit": [HAS_PORTFOLIO_MEMBERS_EDIT],
"invitedmember": [HAS_PORTFOLIO_MEMBERS_ANY_PERM],
"invitedmember-delete": [HAS_PORTFOLIO_MEMBERS_EDIT],
"invitedmember-permissions": [HAS_PORTFOLIO_MEMBERS_EDIT],
"invitedmember-domains": [HAS_PORTFOLIO_MEMBERS_ANY_PERM],
"invitedmember-domains-edit": [HAS_PORTFOLIO_MEMBERS_EDIT],
"new-member": [HAS_PORTFOLIO_MEMBERS_EDIT],
"domain-requests": [HAS_PORTFOLIO_DOMAIN_REQUESTS_ANY_PERM],
"no-portfolio-requests": [IS_PORTFOLIO_MEMBER],
"organization": [IS_PORTFOLIO_MEMBER],
"senior-official": [IS_PORTFOLIO_MEMBER],
# Domain requests
"domain-request-status": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"domain-request-status-viewonly": [HAS_PORTFOLIO_DOMAIN_REQUESTS_VIEW_ALL],
"domain-request-withdraw-confirmation": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"domain-request-withdrawn": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"domain-request-delete": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"edit-domain-request": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
# Admin functions
"analytics": [IS_CISA_ANALYST, IS_FULL_ACCESS],
"export_data_type": [IS_CISA_ANALYST, IS_FULL_ACCESS],
"export_data_full": [IS_CISA_ANALYST, IS_FULL_ACCESS],
"export_data_domain_requests_full": [IS_CISA_ANALYST, IS_FULL_ACCESS],
"export_data_federal": [IS_CISA_ANALYST, IS_FULL_ACCESS],
"export_domains_growth": [IS_CISA_ANALYST, IS_FULL_ACCESS],
"export_requests_growth": [IS_CISA_ANALYST, IS_FULL_ACCESS],
"export_managed_domains": [IS_CISA_ANALYST, IS_FULL_ACCESS],
"export_unmanaged_domains": [IS_CISA_ANALYST, IS_FULL_ACCESS],
"transfer_user": [IS_CISA_ANALYST, IS_FULL_ACCESS],
# Analytics
"all-domain-metadata": [IS_STAFF],
"current-full": [IS_STAFF],
"all-domain-requests-metadata": [IS_STAFF],
"domain-growth": [IS_STAFF],
"request-growth": [IS_STAFF],
"managed-domains": [IS_STAFF],
"unmanaged-domains": [IS_STAFF],
# Reports
"export-user-domains-as-csv": [IS_STAFF],
"export-portfolio-members-as-csv": [IS_STAFF],
"export_members_portfolio": [HAS_PORTFOLIO_MEMBERS_VIEW],
"export_data_type_user": [ALL],
# API endpoints
"get-senior-official-from-federal-agency-json": [IS_CISA_ANALYST, IS_FULL_ACCESS, IS_OMB_ANALYST],
"get-portfolio-json": [IS_CISA_ANALYST, IS_FULL_ACCESS, IS_OMB_ANALYST],
"get-suborganization-list-json": [IS_CISA_ANALYST, IS_FULL_ACCESS, IS_OMB_ANALYST],
"get-federal-and-portfolio-types-from-federal-agency-json": [IS_CISA_ANALYST, IS_FULL_ACCESS, IS_OMB_ANALYST],
"get-action-needed-email-for-user-json": [IS_CISA_ANALYST, IS_FULL_ACCESS, IS_OMB_ANALYST],
"get-rejection-email-for-user-json": [IS_CISA_ANALYST, IS_FULL_ACCESS, IS_OMB_ANALYST],
"get_domains_json": [ALL],
"get_domain_requests_json": [ALL],
"get_portfolio_members_json": [HAS_PORTFOLIO_MEMBERS_ANY_PERM],
"get_member_domains_json": [HAS_PORTFOLIO_MEMBERS_ANY_PERM],
# User profile
"finish-user-profile-setup": [ALL],
"user-profile": [ALL],
# Invitation
"invitation-cancel": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN],
# DNS Hosting
"prototype-domain-dns": [IS_STAFF],
# Domain request wizard
"start": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"finished": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"generic_org_type": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"tribal_government": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"organization_federal": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"organization_election": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"organization_contact": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"about_your_organization": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"senior_official": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"current_sites": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"dotgov_domain": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"purpose": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"other_contacts": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"additional_details": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"requirements": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"review": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"portfolio_requesting_entity": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
"portfolio_additional_details": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR],
}
UNCHECKED_URLS = [
"health",
"openid/",
"get-current-federal",
"get-current-full",
"available",
"rdap",
"todo",
"logout",
]
def verify_all_urls_have_permissions():
"""
Utility function to verify that all URLs in the application have defined permissions
in the permissions mapping.
"""
resolver = get_resolver()
missing_permissions = []
missing_names = []
# Collect all URL pattern names
for pattern in resolver.url_patterns:
# Skip URLResolver objects (like admin.site.urls)
if isinstance(pattern, URLResolver):
continue
if hasattr(pattern, "name") and pattern.name:
if pattern.name not in URL_PERMISSIONS and pattern.name not in UNCHECKED_URLS:
missing_permissions.append(pattern.name)
else:
raise ValueError(f"URL pattern {pattern} has no name")
if missing_names:
raise ValueError(f"The following URL patterns have no name: {missing_names}")
return missing_permissions
def validate_permissions(): # noqa: C901
"""
Validates that all URL patterns have consistent permission rules between
the centralized mapping and view decorators.
Returns a dictionary of issues found.
"""
resolver = get_resolver()
issues = {
"missing_in_mapping": [], # URLs with decorators but not in mapping
"missing_decorator": [], # URLs in mapping but missing decorators
"permission_mismatch": [], # URLs with different permissions
}
def check_url_pattern(pattern, parent_path=""):
if isinstance(pattern, URLPattern):
view_func = pattern.callback
path = f"{parent_path}/{pattern.pattern}"
url_name = pattern.name
if url_name:
# Skip check for endpoints that intentionally have no decorator
if url_name in UNCHECKED_URLS:
return
# Check if view has decorator but missing from mapping
if getattr(view_func, "has_explicit_access", False) and url_name not in URL_PERMISSIONS:
issues["missing_in_mapping"].append((url_name, path))
# Check if view is in mapping but missing decorator
elif url_name in URL_PERMISSIONS and not getattr(view_func, "has_explicit_access", False):
issues["missing_decorator"].append((url_name, path))
# Check if permissions match (more complex, may need refinement)
elif getattr(view_func, "has_explicit_access", False) and url_name in URL_PERMISSIONS:
view_permissions = getattr(view_func, "_access_rules", set())
mapping_permissions = set(URL_PERMISSIONS[url_name])
if view_permissions != mapping_permissions:
issues["permission_mismatch"].append((url_name, path, view_permissions, mapping_permissions))
elif isinstance(pattern, URLResolver):
# Handle included URL patterns (nested)
new_parent = f"{parent_path}/{pattern.pattern}"
for p in pattern.url_patterns:
check_url_pattern(p, new_parent)
# Check all URL patterns
for pattern in resolver.url_patterns:
check_url_pattern(pattern)
return issues

View file

@ -222,3 +222,31 @@ class RestrictAccessMiddleware:
raise PermissionDenied # Deny access if the view lacks explicit permission handling
return self.get_response(request)
class RequestLoggingMiddleware:
"""
Middleware to log user email, remote address, and request path.
"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
# Only log in production (stable)
if getattr(settings, "IS_PRODUCTION", False):
# Get user email (if authenticated), else "Anonymous"
user_email = request.user.email if request.user.is_authenticated else "Anonymous"
# Get remote IP address
remote_ip = request.META.get("REMOTE_ADDR", "Unknown IP")
# Get request path
request_path = request.path
# Log user information
logger.info(f"Router log | User: {user_email} | IP: {remote_ip} | Path: {request_path}")
return response

View file

@ -73,9 +73,16 @@
{% if is_portfolio_user and not is_domain_manager %}
<div class="usa-alert usa-alert--info usa-alert--slim">
<div class="usa-alert__body">
{% if not is_portfolio_admin %}
<p class="usa-alert__text ">
You don't have access to manage {{domain.name}}. If you need to make updates, contact one of the listed domain managers.
</p>
{% else %}
{% url 'member' member_pk=user_portfolio_permission.id as edit_member_url %}
<p class="usa-alert__text ">
You don't have access to manage {{domain.name}}. If you need to become a domain manager, edit the domain assignments in your <a href="{{edit_member_url}}">member profile</a>.
</p>
{% endif %}
</div>
</div>
{% endif %}

View file

@ -19,7 +19,7 @@
<p>Well use the information you provide to verify your organizations eligibility for a .gov domain. Well also verify that the domain you request meets our guidelines.</p>
{% endif %}
<h2>Time to complete the form</h2>
<p>If you have <a href="{% public_site_url 'domains/before/#information-you%E2%80%99ll-need-to-complete-the-domain-request-form' %}" target="_blank" class="usa-link">all the information you need</a>,
<p>If you have all the information you need,
completing your domain request might take around 15 minutes.</p>
<h2>How well reach you</h2>
<p>While reviewing your domain request, we may need to reach out with questions. Well also email you when we complete our review. If the contact information below is not correct, visit <a href="{% url 'user-profile' %}?redirect=domain-request:start" class="usa-link">your profile</a> to make updates.</p>

View file

@ -49,7 +49,7 @@
<h2>Domain renewal</h2>
<p>.Gov domains are registered for a one-year period. To renew your domain, you'll be asked to verify your organizations eligibility and your contact information. </p>
<p>.Gov domains are registered for a one-year period. To renew the domain, youll be asked to verify your contact information and some details about the domain.</p>
<p>Though a domain may expire, it will not automatically be put on hold or deleted. Well make extensive efforts to contact your organization before holding or deleting a domain.</p>
{% endblock %}

View file

@ -8,7 +8,7 @@
<section class="section-outlined members margin-top-0 section-outlined--border-base-light" id="members">
<div class="section-outlined__header margin-bottom-3 grid-row">
<!-- ---------- SEARCH ---------- -->
{% with label_text="Search by member name" item_name="members" aria_label_text="Members search component"%}
{% with label_text="Search by member email address" item_name="members" aria_label_text="Members search component"%}
{% include "includes/search.html" %}
{% endwith %}
{% with export_aria="Members report component" export_url='export_members_portfolio' %}

View file

@ -98,6 +98,18 @@ Organization member
</address>
{% if portfolio_permission %}
{% if member and member.id == request.user.id and is_only_admin %}
<div class="usa-alert usa-alert--info usa-alert--slim">
<div class="usa-alert__body">
<p class="usa-alert__text ">
You're the only admin for this organization.
Organizations must have at least one admin.
To remove yourself or change your member access,
you'll need to add another admin.
</p>
</div>
</div>
{% endif %}
{% include "includes/summary_item.html" with title='Member access and permissions' permissions=True value=portfolio_permission edit_link=edit_url editable=has_edit_members_portfolio_permission %}
{% elif portfolio_invitation %}
{% include "includes/summary_item.html" with title='Member access and permissions' permissions=True value=portfolio_invitation edit_link=edit_url editable=has_edit_members_portfolio_permission %}

View file

@ -75,12 +75,26 @@
<!-- Member email -->
</fieldset>
<!-- Member access radio buttons (Toggles other sections) -->
<fieldset class="usa-fieldset">
<legend>
<h2 class="margin-top-0">Member access</h2>
</legend>
{% if member and member.id == request.user.id and is_only_admin %}
<div class="usa-alert usa-alert--info usa-alert--slim margin-top-1 margin-bottom-1">
<div class="usa-alert__body">
<p class="usa-alert__text ">
You're the only admin for this organization.
Organizations must have at least one admin.
To remove yourself or change your member access,
you'll need to add another admin.
</p>
</div>
</div>
{% endif %}
<em>Select the level of access for this member. <abbr class="usa-hint usa-hint--required" title="required">*</abbr></em>
{% with group_classes="margin-top-0" add_class="usa-radio__input--tile" add_legend_class="usa-sr-only" %}
@ -109,4 +123,22 @@
</div>
</form>
</div>
{% comment %} If an admin is trying to edit themselves, show a modal {% endcomment %}
{% if member and member.id == request.user.id and not is_only_admin %}
<a
id="toggle-member-permissions-edit-self"
href="#modal-member-permissions-edit-self"
class="display-none"
aria-controls="modal-member-permissions-edit-self"
data-open-modal
>Edit self</a>
<div
class="usa-modal"
id="modal-member-permissions-edit-self"
data-force-action
>
{% include 'includes/modal.html' with modal_heading="Are you sure you want to change your member access?" modal_description="Youve selected basic access, which means youll no longer be able to manage member permissions. This action cannot be undone." modal_button_id="member-permissions-edit-self" modal_button_text="Yes, change my access" %}
</div>
{% endif %}
{% endblock portfolio_content%}

View file

@ -1092,7 +1092,7 @@ def completed_domain_request( # noqa
email="testy@town.com",
phone="(555) 555 5555",
)
domain, _ = DraftDomain.objects.get_or_create(name=name)
domain = DraftDomain.objects.create(name=name)
other, _ = Contact.objects.get_or_create(
first_name="Testy",
last_name="Tester",
@ -1931,7 +1931,14 @@ class MockEppLib(TestCase):
return MagicMock(res_data=[mocked_result])
def mockCreateContactCommands(self, _request, cleaned):
if getattr(_request, "id", None) == "fail" and self.mockedSendFunction.call_count == 3:
ids_to_throw_already_exists = [
"failAdmin1234567",
"failTech12345678",
"failSec123456789",
"failReg123456789",
"fail",
]
if getattr(_request, "id", None) in ids_to_throw_already_exists and self.mockedSendFunction.call_count == 3:
# use this for when a contact is being updated
# sets the second send() to fail
raise RegistryError(code=ErrorCode.OBJECT_EXISTS)
@ -1946,7 +1953,14 @@ class MockEppLib(TestCase):
return MagicMock(res_data=[self.mockDataInfoHosts])
def mockDeleteContactCommands(self, _request, cleaned):
if getattr(_request, "id", None) == "fail":
ids_to_throw_already_exists = [
"failAdmin1234567",
"failTech12345678",
"failSec123456789",
"failReg123456789",
"fail",
]
if getattr(_request, "id", None) in ids_to_throw_already_exists:
raise RegistryError(code=ErrorCode.OBJECT_EXISTS)
else:
return MagicMock(

View file

@ -50,6 +50,7 @@ from unittest.mock import ANY, patch
from django.conf import settings
import boto3_mocking # type: ignore
import logging
from django.contrib.messages.storage.fallback import FallbackStorage
logger = logging.getLogger(__name__)
@ -2315,7 +2316,6 @@ class TestDomainRequestAdmin(MockEppLib):
Used to test errors when saving a change with an active domain, also used to test side effects
when saving a change goes through."""
with less_console_noise():
# Create an instance of the model
domain_request = completed_domain_request(status=DomainRequest.DomainRequestStatus.APPROVED)
@ -2428,6 +2428,44 @@ class TestDomainRequestAdmin(MockEppLib):
"Cannot approve. Requested domain is already in use.",
)
def test_error_when_approving_domain_in_pending_delete_state(self):
"""If in pendingDelete state from in review -> approve not allowed."""
# 1. Create domain
to_be_in_pending_deleted = completed_domain_request(
status=DomainRequest.DomainRequestStatus.SUBMITTED, name="meoward1.gov"
)
to_be_in_pending_deleted.creator = self.superuser
# 2. Put domain into in-review state
to_be_in_pending_deleted.status = DomainRequest.DomainRequestStatus.IN_REVIEW
to_be_in_pending_deleted.save()
# 3. Update request as a superuser
request = self.factory.post(f"/admin/registrar/domainrequest/{to_be_in_pending_deleted.pk}/change/")
request.user = self.superuser
request.session = {}
setattr(request, "_messages", FallbackStorage(request))
# 4. Use ExitStack for combine patching like above
with boto3_mocking.clients.handler_for("sesv2", self.mock_client), ExitStack() as stack:
# Patch django.contrib.messages.error
stack.enter_context(patch.object(messages, "error"))
with patch("registrar.models.domain.Domain.is_pending_delete", return_value=True):
# Attempt to approve to_be_in_pending_deleted
# (which should fail due is_pending_delete == True so nothing can get approved)
to_be_in_pending_deleted.status = DomainRequest.DomainRequestStatus.APPROVED
# Save the model with the patched request
self.admin.save_model(request, to_be_in_pending_deleted, None, True)
messages.error.assert_called_once_with(
request,
"Domain of same name is currently in pending delete state.",
)
@less_console_noise
def test_no_error_when_saving_to_approved_and_domain_exists(self):
"""The negative of the redundant admin check on model transition not allowed."""

View file

@ -477,8 +477,8 @@ class TestBasePortfolioMemberForms(TestCase):
self.assertTrue(form.is_valid(), f"Form {form_class.__name__} failed validation with data: {data}")
return form
def _assert_form_has_error(self, form_class, data, field_name):
form = form_class(data=data)
def _assert_form_has_error(self, form_class, data, field_name, instance=None):
form = form_class(data=data, instance=instance)
self.assertFalse(form.is_valid())
self.assertIn(field_name, form.errors)
@ -504,17 +504,23 @@ class TestBasePortfolioMemberForms(TestCase):
"domain_permissions": "", # Simulate missing field
"member_permissions": "", # Simulate missing field
}
user_portfolio_permission, _ = UserPortfolioPermission.objects.get_or_create(
portfolio=self.portfolio, user=self.user
)
portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create(portfolio=self.portfolio, email="hi@ho")
# Check required fields for all forms
self._assert_form_has_error(PortfolioMemberForm, data, "domain_request_permissions")
self._assert_form_has_error(PortfolioMemberForm, data, "domain_permissions")
self._assert_form_has_error(PortfolioMemberForm, data, "member_permissions")
self._assert_form_has_error(PortfolioInvitedMemberForm, data, "domain_request_permissions")
self._assert_form_has_error(PortfolioInvitedMemberForm, data, "domain_permissions")
self._assert_form_has_error(PortfolioInvitedMemberForm, data, "member_permissions")
self._assert_form_has_error(PortfolioNewMemberForm, data, "domain_request_permissions")
self._assert_form_has_error(PortfolioNewMemberForm, data, "domain_permissions")
self._assert_form_has_error(PortfolioNewMemberForm, data, "member_permissions")
self._assert_form_has_error(PortfolioMemberForm, data, "domain_request_permissions", user_portfolio_permission)
self._assert_form_has_error(PortfolioMemberForm, data, "domain_permissions", user_portfolio_permission)
self._assert_form_has_error(PortfolioMemberForm, data, "member_permissions", user_portfolio_permission)
self._assert_form_has_error(
PortfolioInvitedMemberForm, data, "domain_request_permissions", portfolio_invitation
)
self._assert_form_has_error(PortfolioInvitedMemberForm, data, "domain_permissions", portfolio_invitation)
self._assert_form_has_error(PortfolioInvitedMemberForm, data, "member_permissions", portfolio_invitation)
self._assert_form_has_error(PortfolioNewMemberForm, data, "domain_request_permissions", portfolio_invitation)
self._assert_form_has_error(PortfolioNewMemberForm, data, "domain_permissions", portfolio_invitation)
self._assert_form_has_error(PortfolioNewMemberForm, data, "member_permissions", portfolio_invitation)
@less_console_noise_decorator
def test_clean_validates_required_fields_for_admin_role(self):
@ -529,7 +535,6 @@ class TestBasePortfolioMemberForms(TestCase):
portfolio=self.portfolio, user=self.user
)
portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create(portfolio=self.portfolio, email="hi@ho")
data = {
"role": UserPortfolioRoleChoices.ORGANIZATION_ADMIN.value,
}
@ -677,6 +682,7 @@ class TestBasePortfolioMemberForms(TestCase):
@less_console_noise_decorator
def test_invalid_data_for_member(self):
"""Test invalid form submission for a member role with missing permissions."""
portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create(portfolio=self.portfolio, email="hi@ho")
data = {
"email": "hi@ho.com",
"portfolio": self.portfolio.id,
@ -685,5 +691,5 @@ class TestBasePortfolioMemberForms(TestCase):
"member_permissions": "", # Missing field
"domain_permissions": "", # Missing field
}
self._assert_form_has_error(PortfolioMemberForm, data, "domain_request_permissions")
self._assert_form_has_error(PortfolioInvitedMemberForm, data, "member_permissions")
self._assert_form_has_error(PortfolioMemberForm, data, "domain_request_permissions", portfolio_invitation)
self._assert_form_has_error(PortfolioInvitedMemberForm, data, "member_permissions", portfolio_invitation)

View file

@ -32,6 +32,7 @@ from registrar.models import (
Portfolio,
Suborganization,
)
from registrar.utility.enums import DefaultEmail
import tablib
from unittest.mock import patch, call, MagicMock, mock_open
from epplibwrapper import commands, common
@ -1473,6 +1474,7 @@ class TestCreateFederalPortfolio(TestCase):
generic_org_type=DomainRequest.OrganizationChoices.CITY,
federal_agency=self.federal_agency,
user=self.user,
organization_name="Testorg",
)
self.domain_request.approve()
self.domain_info = DomainInformation.objects.filter(domain_request=self.domain_request).get()
@ -1529,13 +1531,10 @@ class TestCreateFederalPortfolio(TestCase):
@less_console_noise_decorator
def test_post_process_started_domain_requests_existing_portfolio(self):
"""Ensures that federal agency is cleared when agency name matches portfolio name.
As the name implies, this implicitly tests the "post_process_started_domain_requests" function.
"""
"""Ensures that federal agency is cleared when agency name matches portfolio name."""
federal_agency_2 = FederalAgency.objects.create(agency="Sugarcane", federal_type=BranchChoices.EXECUTIVE)
# Test records with portfolios and no org names
# Create a portfolio. This script skips over "started"
portfolio = Portfolio.objects.create(organization_name="Sugarcane", creator=self.user)
# Create a domain request with matching org name
matching_request = completed_domain_request(
@ -1822,12 +1821,12 @@ class TestCreateFederalPortfolio(TestCase):
# We expect a error to be thrown when we dont pass parse requests or domains
with self.assertRaisesRegex(
CommandError, "You must specify at least one of --parse_requests, --parse_domains, or --add_managers."
CommandError, "You must specify at least one of --parse_requests, --parse_domains, or --parse_managers."
):
self.run_create_federal_portfolio(branch="executive")
with self.assertRaisesRegex(
CommandError, "You must specify at least one of --parse_requests, --parse_domains, or --add_managers."
CommandError, "You must specify at least one of --parse_requests, --parse_domains, or --parse_managers."
):
self.run_create_federal_portfolio(agency_name="test")
@ -1877,7 +1876,9 @@ class TestCreateFederalPortfolio(TestCase):
UserDomainRole.objects.create(user=manager2, domain=self.domain, role=UserDomainRole.Roles.MANAGER)
# Run the management command
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_domains=True, add_managers=True)
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency, parse_domains=True, parse_managers=True
)
# Check that the portfolio was created
self.portfolio = Portfolio.objects.get(federal_agency=self.federal_agency)
@ -1900,7 +1901,9 @@ class TestCreateFederalPortfolio(TestCase):
)
# Run the management command
self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_domains=True, add_managers=True)
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency, parse_domains=True, parse_managers=True
)
# Check that the portfolio was created
self.portfolio = Portfolio.objects.get(federal_agency=self.federal_agency)
@ -1917,7 +1920,7 @@ class TestCreateFederalPortfolio(TestCase):
# Verify that no duplicate invitations are created
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True
agency_name=self.federal_agency.agency, parse_requests=True, parse_managers=True
)
invitations = PortfolioInvitation.objects.filter(email="manager1@example.com", portfolio=self.portfolio)
self.assertEqual(
@ -1945,7 +1948,7 @@ class TestCreateFederalPortfolio(TestCase):
# Run the management command
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True
agency_name=self.federal_agency.agency, parse_requests=True, parse_managers=True
)
# Ensure that the manager is not duplicated
@ -1993,13 +1996,13 @@ class TestCreateFederalPortfolio(TestCase):
self.run_create_federal_portfolio(
agency_name=self.federal_agency.agency,
parse_requests=True,
add_managers=True,
parse_managers=True,
skip_existing_portfolios=True,
)
# Check that managers were added to the portfolio
# Check that managers weren't added to the portfolio
permissions = UserPortfolioPermission.objects.filter(portfolio=self.portfolio, user__in=[manager1, manager2])
self.assertEqual(permissions.count(), 2)
self.assertEqual(permissions.count(), 0)
for perm in permissions:
self.assertIn(UserPortfolioRoleChoices.ORGANIZATION_MEMBER, perm.roles)
@ -2506,3 +2509,189 @@ class TestRemovePortfolios(TestCase):
# Check that the portfolio was deleted
self.assertFalse(Portfolio.objects.filter(organization_name="Test with suborg").exists())
class TestUpdateDefaultPublicContacts(MockEppLib):
"""Tests for the update_default_public_contacts management command."""
@less_console_noise_decorator
def setUp(self):
"""Setup test data with PublicContact records."""
super().setUp()
self.domain_request = completed_domain_request(
name="testdomain.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
)
self.domain_request.approve()
self.domain = self.domain_request.approved_domain
# 1. PublicContact with all old default values
self.old_default_contact = self.domain.get_default_administrative_contact()
self.old_default_contact.registry_id = "failAdmin1234567"
self.old_default_contact.name = "CSD/CB ATTN: Cameron Dixon"
self.old_default_contact.street1 = "CISA NGR STOP 0645"
self.old_default_contact.pc = "20598-0645"
self.old_default_contact.email = DefaultEmail.OLD_PUBLIC_CONTACT_DEFAULT
self.old_default_contact.save()
# 2. PublicContact with current default email but old values for other fields
self.mixed_default_contact = self.domain.get_default_technical_contact()
self.mixed_default_contact.registry_id = "failTech12345678"
self.mixed_default_contact.save(skip_epp_save=True)
self.mixed_default_contact.name = "registry customer service"
self.mixed_default_contact.street1 = "4200 Wilson Blvd."
self.mixed_default_contact.pc = "22201"
self.mixed_default_contact.email = DefaultEmail.PUBLIC_CONTACT_DEFAULT
self.mixed_default_contact.save()
# 3. PublicContact with non-default values
self.non_default_contact = self.domain.get_default_security_contact()
self.non_default_contact.registry_id = "failSec123456789"
self.non_default_contact.domain = self.domain
self.non_default_contact.save(skip_epp_save=True)
self.non_default_contact.name = "Hotdogs"
self.non_default_contact.street1 = "123 hotdog town"
self.non_default_contact.pc = "22111"
self.non_default_contact.email = "thehotdogman@igorville.gov"
self.non_default_contact.save()
# 4. Create a default contact but with an old email
self.default_registrant_old_email = self.domain.get_default_registrant_contact()
self.default_registrant_old_email.registry_id = "failReg123456789"
self.default_registrant_old_email.email = DefaultEmail.LEGACY_DEFAULT
self.default_registrant_old_email.save()
DF = common.DiscloseField
excluded_disclose_fields = {DF.NOTIFY_EMAIL, DF.VAT, DF.IDENT}
self.all_disclose_fields = {field for field in DF} - excluded_disclose_fields
def tearDown(self):
"""Clean up test data."""
super().tearDown()
PublicContact.objects.all().delete()
Domain.objects.all().delete()
DomainRequest.objects.all().delete()
DomainInformation.objects.all().delete()
User.objects.all().delete()
@patch("registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", return_value=True)
@less_console_noise_decorator
def run_update_default_public_contacts(self, mock_prompt, **kwargs):
"""Execute the update_default_public_contacts command with options."""
call_command("update_default_public_contacts", **kwargs)
# @less_console_noise_decorator
def test_updates_old_default_contact(self):
"""
Test that contacts with old default values are updated to new default values.
Also tests for string normalization.
"""
self.run_update_default_public_contacts()
self.old_default_contact.refresh_from_db()
# Verify updates occurred
self.assertEqual(self.old_default_contact.name, "CSD/CB Attn: .gov TLD")
self.assertEqual(self.old_default_contact.street1, "1110 N. Glebe Rd")
self.assertEqual(self.old_default_contact.pc, "22201")
self.assertEqual(self.old_default_contact.email, DefaultEmail.PUBLIC_CONTACT_DEFAULT)
# Verify EPP create/update calls were made
expected_update = self._convertPublicContactToEpp(
self.old_default_contact,
disclose=False,
disclose_fields=self.all_disclose_fields - {"name", "email", "voice", "addr"},
)
self.mockedSendFunction.assert_any_call(expected_update, cleaned=True)
@less_console_noise_decorator
def test_updates_with_default_contact_values(self):
"""
Test that contacts created from the default helper function with old email are updated.
"""
self.run_update_default_public_contacts()
self.default_registrant_old_email.refresh_from_db()
# Verify updates occurred
self.assertEqual(self.default_registrant_old_email.name, "CSD/CB Attn: .gov TLD")
self.assertEqual(self.default_registrant_old_email.street1, "1110 N. Glebe Rd")
self.assertEqual(self.default_registrant_old_email.pc, "22201")
self.assertEqual(self.default_registrant_old_email.email, DefaultEmail.PUBLIC_CONTACT_DEFAULT)
# Verify values match the default
default_reg = PublicContact.get_default_registrant()
self.assertEqual(self.default_registrant_old_email.name, default_reg.name)
self.assertEqual(self.default_registrant_old_email.street1, default_reg.street1)
self.assertEqual(self.default_registrant_old_email.pc, default_reg.pc)
self.assertEqual(self.default_registrant_old_email.email, default_reg.email)
# Verify EPP create/update calls were made
expected_update = self._convertPublicContactToEpp(
self.default_registrant_old_email, disclose=False, disclose_fields=self.all_disclose_fields
)
self.mockedSendFunction.assert_any_call(expected_update, cleaned=True)
@less_console_noise_decorator
def test_skips_non_default_contacts(self):
"""
Test that contacts with non-default values are skipped.
"""
original_name = self.non_default_contact.name
original_street1 = self.non_default_contact.street1
original_pc = self.non_default_contact.pc
original_email = self.non_default_contact.email
self.run_update_default_public_contacts()
self.non_default_contact.refresh_from_db()
# Verify no updates occurred
self.assertEqual(self.non_default_contact.name, original_name)
self.assertEqual(self.non_default_contact.street1, original_street1)
self.assertEqual(self.non_default_contact.pc, original_pc)
self.assertEqual(self.non_default_contact.email, original_email)
# Ensure that the update is still skipped even with the override flag
self.run_update_default_public_contacts(overwrite_updated_contacts=True)
self.non_default_contact.refresh_from_db()
# Verify no updates occurred
self.assertEqual(self.non_default_contact.name, original_name)
self.assertEqual(self.non_default_contact.street1, original_street1)
self.assertEqual(self.non_default_contact.pc, original_pc)
self.assertEqual(self.non_default_contact.email, original_email)
@less_console_noise_decorator
def test_skips_contacts_with_current_default_email_by_default(self):
"""
Test that contacts with the current default email are skipped when not using the override flag.
"""
# Get original values
original_name = self.mixed_default_contact.name
original_street1 = self.mixed_default_contact.street1
self.run_update_default_public_contacts()
self.mixed_default_contact.refresh_from_db()
# Verify no updates occurred
self.assertEqual(self.mixed_default_contact.name, original_name)
self.assertEqual(self.mixed_default_contact.street1, original_street1)
self.assertEqual(self.mixed_default_contact.email, DefaultEmail.PUBLIC_CONTACT_DEFAULT)
@less_console_noise_decorator
def test_updates_with_overwrite_flag(self):
"""
Test that contacts with the current default email are updated when using the override flag.
"""
# Run the command with the override flag
self.run_update_default_public_contacts(overwrite_updated_contacts=True)
self.mixed_default_contact.refresh_from_db()
# Verify updates occurred
self.assertEqual(self.mixed_default_contact.name, "CSD/CB Attn: .gov TLD")
self.assertEqual(self.mixed_default_contact.street1, "1110 N. Glebe Rd")
self.assertEqual(self.mixed_default_contact.pc, "22201")
self.assertEqual(self.mixed_default_contact.email, DefaultEmail.PUBLIC_CONTACT_DEFAULT)
# Verify EPP create/update calls were made
expected_update = self._convertPublicContactToEpp(
self.mixed_default_contact, disclose=False, disclose_fields=self.all_disclose_fields
)
self.mockedSendFunction.assert_any_call(expected_update, cleaned=True)

View file

@ -0,0 +1,48 @@
from django.test import TestCase, RequestFactory, override_settings
from unittest.mock import patch, MagicMock
from django.contrib.auth.models import AnonymousUser, User
from registrar.registrar_middleware import RequestLoggingMiddleware
class RequestLoggingMiddlewareTest(TestCase):
"""Test 'our' middleware logging."""
def setUp(self):
self.factory = RequestFactory()
self.get_response_mock = MagicMock()
self.middleware = RequestLoggingMiddleware(self.get_response_mock)
@override_settings(IS_PRODUCTION=True) # Scopes change to this test only
@patch("logging.Logger.info")
def test_logging_enabled_in_production(self, mock_logger):
"""Test that logging occurs when IS_PRODUCTION is True"""
request = self.factory.get("/test-path", **{"REMOTE_ADDR": "Unknown IP"}) # Override IP
request.user = User(username="testuser", email="testuser@example.com")
self.middleware(request) # Call middleware
mock_logger.assert_called_once_with(
"Router log | User: testuser@example.com | IP: Unknown IP | Path: /test-path"
)
@patch("logging.Logger.info")
def test_logging_disabled_in_non_production(self, mock_logger):
"""Test that logging does not occur when IS_PRODUCTION is False"""
request = self.factory.get("/test-path")
request.user = User(username="testuser", email="testuser@example.com")
self.middleware(request) # Call middleware
mock_logger.assert_not_called() # Ensure no logs are generated
@override_settings(IS_PRODUCTION=True) # Scopes change to this test only
@patch("logging.Logger.info")
def test_logging_anonymous_user(self, mock_logger):
"""Test logging for an anonymous user"""
request = self.factory.get("/anonymous-path", **{"REMOTE_ADDR": "Unknown IP"}) # Override IP
request.user = AnonymousUser() # Simulate an anonymous user
self.middleware(request) # Call middleware
mock_logger.assert_called_once_with("Router log | User: Anonymous | IP: Unknown IP | Path: /anonymous-path")

View file

@ -43,6 +43,8 @@ class TestGroups(TestCase):
"add_portfolio",
"change_portfolio",
"delete_portfolio",
"add_portfolioinvitation",
"view_portfolioinvitation",
"add_seniorofficial",
"change_seniorofficial",
"delete_seniorofficial",

View file

@ -666,6 +666,66 @@ class TestDomainAvailable(MockEppLib):
self.assertFalse(available)
patcher.stop()
def test_is_pending_delete(self):
"""
Scenario: Testing if a domain is in pendingDelete status from the registry
Should return True
* Mock EPP response with pendingDelete status
* Validate InfoDomain command is called
* Validate response given mock
"""
with patch("registrar.models.domain.registry.send") as mocked_send, patch(
"registrar.models.domain.Domain._extract_data_from_response"
) as mocked_extract:
# Mock the registry response
mock_response = MagicMock()
mock_response.res_data = [MagicMock(statuses=[MagicMock(state="pendingDelete")])]
mocked_send.return_value = mock_response
# Mock JSONified response
mocked_extract.return_value = {"statuses": ["pendingDelete"]}
result = Domain.is_pending_delete("is-pending-delete.gov")
mocked_send.assert_called_once_with(commands.InfoDomain("is-pending-delete.gov"), cleaned=True)
mocked_extract.assert_called_once()
self.assertTrue(result)
def test_is_not_pending_delete(self):
"""
Scenario: Testing if a domain is NOT in pendingDelete status.
Should return False.
* Mock EPP response without pendingDelete status (isserverTransferProhibited)
* Validate response given mock
"""
with patch("registrar.models.domain.registry.send") as mocked_send, patch(
"registrar.models.domain.Domain._extract_data_from_response"
) as mocked_extract:
# Mock the registry response
mock_response = MagicMock()
mock_response.res_data = [
MagicMock(statuses=[MagicMock(state="serverTransferProhibited"), MagicMock(state="ok")])
]
mocked_send.return_value = mock_response
# Mock JSONified response
mocked_extract.return_value = {"statuses": ["serverTransferProhibited", "ok"]}
result = Domain.is_pending_delete("is-not-pending.gov")
# Assertions
mocked_send.assert_called_once_with(commands.InfoDomain("is-not-pending.gov"), cleaned=True)
mocked_extract.assert_called_once()
self.assertFalse(result)
def test_domain_available_with_invalid_error(self):
"""
Scenario: Testing whether an invalid domain is available

View file

@ -0,0 +1,70 @@
"""
Tests for validating the permissions system consistency.
These tests ensure that:
1. All URLs have permissions defined in the centralized mapping
2. All views with permission decorators are in the mapping
3. The permissions in the decorators match those in the mapping
"""
from django.test import TestCase
from registrar.permissions import verify_all_urls_have_permissions, validate_permissions
class TestPermissionsMapping(TestCase):
"""Test the centralized permissions mapping for completeness and consistency."""
def test_all_urls_have_permissions(self):
"""Verify that all URL patterns in the application have permissions defined in the mapping."""
missing_urls = verify_all_urls_have_permissions()
# Format URLs for better readability in case of failure
if missing_urls:
formatted_urls = "\n".join([f" - {url}" for url in missing_urls])
self.fail(
f"The following URL patterns are missing from URL_PERMISSIONS mapping:\n{formatted_urls}\n"
f"Please add them to the URL_PERMISSIONS dictionary in registrar/permissions.py"
)
def test_permission_decorator_consistency(self):
"""
Test that all views have consistent permission rules between
the centralized mapping and view decorators.
"""
issues = validate_permissions()
error_messages = []
if issues["missing_in_mapping"]:
urls = "\n".join([f" - {name} (at {path})" for name, path in issues["missing_in_mapping"]])
error_messages.append(
f"The following URLs have permission decorators but are missing from the mapping:\n{urls}\n"
"Add these URLs to the URL_PERMISSIONS dictionary in registrar/permissions.py"
)
if issues["missing_decorator"]:
urls = "\n".join([f" - {name} (at {path})" for name, path in issues["missing_decorator"]])
error_messages.append(
f"The following URLs are in the mapping but missing @grant_access decorators:\n{urls}\n"
"Add appropriate @grant_access decorators to these views"
)
if issues["permission_mismatch"]:
mismatches = []
for name, path, view_perms, mapping_perms in issues["permission_mismatch"]:
view_perms_str = ", ".join(sorted(str(p) for p in view_perms))
mapping_perms_str = ", ".join(sorted(str(p) for p in mapping_perms))
mismatches.append(
f" - {name} (at {path}):\n"
f" Decorator: [{view_perms_str}]\n"
f" Mapping: [{mapping_perms_str}]"
)
error_messages.append(
f"The following URLs have mismatched permissions between decorators and mapping:\n"
f"{chr(10).join(mismatches)}\n"
"Update either the decorator or the mapping to ensure consistency"
)
if error_messages:
self.fail("\n\n".join(error_messages))

View file

@ -438,12 +438,6 @@ class ExportDataTest(MockDbForIndividualTests, MockEppLib):
self.assertIn(dd_3.name, csv_content)
self.assertNotIn(dd_2.name, csv_content)
# Get the csv content
csv_content = self._run_domain_request_data_type_user_export(request)
self.assertIn(dd_1.name, csv_content)
self.assertIn(dd_3.name, csv_content)
self.assertNotIn(dd_2.name, csv_content)
portfolio_permission.roles = [UserPortfolioRoleChoices.ORGANIZATION_MEMBER]
portfolio_permission.save()
portfolio_permission.refresh_from_db()

View file

@ -441,10 +441,11 @@ class TestDomainDetail(TestDomainOverview):
user.refresh_from_db()
self.client.force_login(user)
detail_page = self.client.get(f"/domain/{domain.id}")
# Check that alert message displays properly
# Check that alert message displays properly.
# This message is different for one user on the portfolio vs multiple.
self.assertContains(
detail_page,
"If you need to make updates, contact one of the listed domain managers.",
"If you need to become a domain manager, edit the domain assignments",
)
# Check that user does not have option to Edit domain
self.assertNotContains(detail_page, "Edit")

View file

@ -1826,10 +1826,7 @@ class TestPortfolioMemberDeleteView(WebTest):
)
self.assertEqual(response.status_code, 400)
expected_error_message = (
"There must be at least one admin in your organization. Give another member admin "
"permissions, make sure they log into the registrar, and then remove this member."
)
expected_error_message = "the only admin for this organization"
self.assertContains(response, expected_error_message, status_code=400)
# assert that send_portfolio_admin_removal_emails is not called
@ -2155,17 +2152,14 @@ class TestPortfolioMemberDeleteView(WebTest):
self.assertEqual(response.status_code, 302)
expected_error_message = (
"There must be at least one admin in your organization. Give another member admin "
"permissions, make sure they log into the registrar, and then remove this member."
)
expected_error_message = "the only admin for this organization."
args, kwargs = mock_error.call_args
# Check if first arg is a WSGIRequest, confirms request object passed correctly
# WSGIRequest protocol is basically the HTTPRequest but in Django form (ie POST '/member/1/delete')
self.assertIsInstance(args[0], WSGIRequest)
# Check that the error message matches the expected error message
self.assertEqual(args[1], expected_error_message)
self.assertIn(expected_error_message, args[1])
# Location is used for a 3xx HTTP status code to indicate that the URL was redirected
# and then confirm that we're still on the Manage Members page
@ -4639,6 +4633,17 @@ class TestPortfolioMemberEditView(WebTest):
# Get the user's admin permission
admin_permission = UserPortfolioPermission.objects.get(user=self.user, portfolio=self.portfolio)
# Create a second permission so the user isn't just deleting themselves
member = create_test_user()
UserPortfolioPermission.objects.create(
user=member, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
)
# First, verify that the change modal is on the page
response = self.client.get(reverse("member-permissions", kwargs={"member_pk": admin_permission.id}))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Yes, change my access")
response = self.client.post(
reverse("member-permissions", kwargs={"member_pk": admin_permission.id}),
{
@ -4652,6 +4657,39 @@ class TestPortfolioMemberEditView(WebTest):
self.assertEqual(response.status_code, 302)
self.assertEqual(response["Location"], reverse("home"))
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_admin_removing_own_admin_role_only_admin(self):
"""Tests that admin removing their own admin role when they are the only admin
throws a validation error.
"""
self.client.force_login(self.user)
# Get the user's admin permission
admin_permission = UserPortfolioPermission.objects.get(user=self.user, portfolio=self.portfolio)
# First, verify that the info alert is present on the page
response = self.client.get(reverse("member-permissions", kwargs={"member_pk": admin_permission.id}))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "To remove yourself or change your member access")
# Then, verify that the right form error is shown
response = self.client.post(
reverse("member-permissions", kwargs={"member_pk": admin_permission.id}),
{
"role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER,
"domain_permissions": UserPortfolioPermissionChoices.VIEW_MANAGED_DOMAINS,
"member_permissions": "no_access",
"domain_request_permissions": "no_access",
},
)
self.assertEqual(response.status_code, 200)
error_message = "the only admin for this organization"
self.assertIn(error_message, str(response.context["form"].errors))
class TestPortfolioInvitedMemberEditView(WebTest):
"""Tests for the edit invited member page on portfolios"""

View file

@ -37,7 +37,6 @@ from django.contrib.postgres.aggregates import ArrayAgg, StringAgg
from django.contrib.admin.models import LogEntry, ADDITION
from django.contrib.contenttypes.models import ContentType
from registrar.models.utility.generic_helper import convert_queryset_to_dict
from registrar.models.utility.orm_helper import ArrayRemoveNull
from registrar.templatetags.custom_filters import get_region
from registrar.utility.constants import BranchChoices
from registrar.utility.enums import DefaultEmail, DefaultUserValues
@ -392,10 +391,17 @@ class MemberExport(BaseExport):
)
# Invitations
domain_invitations = DomainInvitation.objects.filter(
email=OuterRef("email"), # Check if email matches the OuterRef("email")
domain__domain_info__portfolio=portfolio, # Check if the domain's portfolio matches the given portfolio
).annotate(domain_info=F("domain__name"))
domain_invitations = Subquery(
DomainInvitation.objects.filter(
email=OuterRef("email"),
domain__domain_info__portfolio=portfolio,
status=DomainInvitation.DomainInvitationStatus.INVITED,
)
.values("email") # Select a stable field
.annotate(domain_list=ArrayAgg("domain__name", distinct=True)) # Aggregate within subquery
.values("domain_list") # Ensure only one value is returned
)
invitations = (
PortfolioInvitation.objects.exclude(status=PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED)
.filter(portfolio=portfolio)
@ -407,23 +413,19 @@ class MemberExport(BaseExport):
additional_permissions_display=F("additional_permissions"),
member_display=F("email"),
# Use ArrayRemove to return an empty list when no domain invitations are found
domain_info=ArrayRemoveNull(
ArrayAgg(
Subquery(domain_invitations.values("domain_info")),
distinct=True,
)
),
domain_info=domain_invitations,
type=Value("invitedmember", output_field=CharField()),
joined_date=Value("Unretrieved", output_field=CharField()),
invited_by=cls.get_invited_by_query(object_id_query=Cast(OuterRef("id"), output_field=CharField())),
)
.values(*shared_columns)
)
# Adding a order_by increases output predictability.
# Doesn't matter as much for normal use, but makes tests easier.
# We should also just be ordering by default anyway.
members = permissions.union(invitations).order_by("email_display", "member_display", "first_name", "last_name")
return convert_queryset_to_dict(members, is_model=False)
return convert_queryset_to_dict(members, is_model=False, key="email_display")
@classmethod
def get_invited_by_query(cls, object_id_query):

View file

@ -129,6 +129,7 @@ class FSMErrorCodes(IntEnum):
- 3 INVESTIGATOR_NOT_STAFF Investigator is a non-staff user
- 4 NO_REJECTION_REASON No rejection reason is specified
- 5 NO_ACTION_NEEDED_REASON No action needed reason is specified
- 6 DOMAIN_IS_PENDING_DELETE Domain is in pending delete state
"""
APPROVE_DOMAIN_IN_USE = 1
@ -136,6 +137,7 @@ class FSMErrorCodes(IntEnum):
INVESTIGATOR_NOT_STAFF = 3
NO_REJECTION_REASON = 4
NO_ACTION_NEEDED_REASON = 5
DOMAIN_IS_PENDING_DELETE = 6
class FSMDomainRequestError(Exception):
@ -150,6 +152,7 @@ class FSMDomainRequestError(Exception):
FSMErrorCodes.INVESTIGATOR_NOT_STAFF: ("Investigator is not a staff user."),
FSMErrorCodes.NO_REJECTION_REASON: ("A reason is required for this status."),
FSMErrorCodes.NO_ACTION_NEEDED_REASON: ("A reason is required for this status."),
FSMErrorCodes.DOMAIN_IS_PENDING_DELETE: ("Domain of same name is currently in pending delete state."),
}
def __init__(self, *args, code=None, **kwargs):

View file

@ -15,6 +15,7 @@ from registrar.decorators import (
IS_DOMAIN_MANAGER,
IS_DOMAIN_MANAGER_AND_NOT_PORTFOLIO_MEMBER,
IS_PORTFOLIO_MEMBER_AND_DOMAIN_MANAGER,
IS_STAFF,
IS_STAFF_MANAGING_DOMAIN,
grant_access,
)
@ -405,6 +406,9 @@ class DomainView(DomainBaseView):
default_emails = DefaultEmail.get_all_emails()
context["hidden_security_emails"] = default_emails
context["user_portfolio_permission"] = UserPortfolioPermission.objects.filter(
user=self.request.user, portfolio=self.request.session.get("portfolio")
).first()
security_email = self.object.get_security_email()
if security_email is None or security_email in default_emails:
@ -707,6 +711,7 @@ class PrototypeDomainDNSRecordForm(forms.Form):
)
@grant_access(IS_STAFF)
class PrototypeDomainDNSRecordView(DomainFormBaseView):
template_name = "prototype_domain_dns.html"
form_class = PrototypeDomainDNSRecordForm

View file

@ -205,12 +205,7 @@ class PortfolioMembersJson(View):
return queryset
def serialize_members(self, request, portfolio, item, user):
# Check if the user can edit other users
user_can_edit_other_users = any(
user.has_perm(perm) for perm in ["registrar.full_access_permission", "registrar.change_user"]
)
view_only = not user.has_edit_members_portfolio_permission(portfolio) or not user_can_edit_other_users
view_only = not user.has_edit_members_portfolio_permission(portfolio)
is_admin = UserPortfolioRoleChoices.ORGANIZATION_ADMIN in (item.get("roles") or [])

View file

@ -114,6 +114,7 @@ class PortfolioMemberView(DetailView, View):
"member_has_view_members_portfolio_permission": member_has_view_members_portfolio_permission,
"member_has_edit_members_portfolio_permission": member_has_edit_members_portfolio_permission,
"member_has_view_all_domains_portfolio_permission": member_has_view_all_domains_portfolio_permission,
"is_only_admin": request.user.is_only_admin_of_portfolio(portfolio_permission.portfolio),
},
)
@ -160,8 +161,8 @@ class PortfolioMemberDeleteView(View):
)
if member.is_only_admin_of_portfolio(portfolio):
return (
"There must be at least one admin in your organization. Give another member admin "
"permissions, make sure they log into the registrar, and then remove this member."
"You can't remove yourself because you're the only admin for this organization. "
"To remove yourself, you'll need to add another admin."
)
return None
@ -257,9 +258,7 @@ class PortfolioMemberEditView(DetailView, View):
def get(self, request, member_pk):
portfolio_permission = get_object_or_404(UserPortfolioPermission, pk=member_pk)
user = portfolio_permission.user
form = self.form_class(instance=portfolio_permission)
return render(
request,
self.template_name,
@ -267,6 +266,7 @@ class PortfolioMemberEditView(DetailView, View):
"form": form,
"member": user,
"portfolio_permission": portfolio_permission,
"is_only_admin": request.user.is_only_admin_of_portfolio(portfolio_permission.portfolio),
},
)
@ -307,15 +307,17 @@ class PortfolioMemberEditView(DetailView, View):
form.save()
messages.success(self.request, "The member access and permission changes have been saved.")
return redirect("member", member_pk=member_pk) if not removing_admin_role_on_self else redirect("home")
return render(
request,
self.template_name,
{
"form": form,
"member": user, # Pass the user object again to the template
},
)
else:
return render(
request,
self.template_name,
{
"form": form,
"member": user,
"portfolio_permission": portfolio_permission,
"is_only_admin": request.user.is_only_admin_of_portfolio(portfolio_permission.portfolio),
},
)
def _handle_exceptions(self, exception):
"""Handle exceptions raised during the process."""