diff --git a/docs/operations/data_migration.md b/docs/operations/data_migration.md index 8185922a4..402489a47 100644 --- a/docs/operations/data_migration.md +++ b/docs/operations/data_migration.md @@ -894,32 +894,32 @@ Example: `cf ssh getgov-za` #### Step 5: Running the script To create a specific portfolio: -```./manage.py create_federal_portfolio --agency_name "{federal_agency_name}" --both``` +```./manage.py create_federal_portfolio --agency_name "{federal_agency_name}" --parse_domains --parse_requests --parse_managers``` Example (only requests): `./manage.py create_federal_portfolio "AMTRAK" --parse_requests` To create a portfolios for all federal agencies in a branch: -```./manage.py create_federal_portfolio --branch "{executive|legislative|judicial}" --both``` +```./manage.py create_federal_portfolio --branch "{executive|legislative|judicial}" --parse_domains --parse_requests --parse_managers``` Example (only requests): `./manage.py create_federal_portfolio --branch "executive" --parse_requests` ### Running locally #### Step 1: Running the script -```docker-compose exec app ./manage.py create_federal_portfolio --agency_name "{federal_agency_name}" --both``` +```docker-compose exec app ./manage.py create_federal_portfolio --agency_name "{federal_agency_name}" --parse_domains``` ##### Parameters | | Parameter | Description | |:-:|:---------------------------- |:-------------------------------------------------------------------------------------------| | 1 | **agency_name** | Name of the FederalAgency record surrounded by quotes. For instance,"AMTRAK". | | 2 | **branch** | Creates a portfolio for each federal agency in a branch: executive, legislative, judicial | -| 3 | **both** | If True, runs parse_requests and parse_domains. | -| 4 | **parse_requests** | If True, then the created portfolio is added to all related DomainRequests. | -| 5 | **parse_domains** | If True, then the created portfolio is added to all related Domains. | -| 6 | **add_managers** | If True, then the created portfolio will add all managers of the portfolio domains as members of the portfolio, including invited managers. | -| 7 | **skip_existing_portfolios** | If True, then the script will only create suborganizations, modify DomainRequest, and modify DomainInformation records only when creating a new portfolio. Use this flag when you do not want to modify existing records. | +| 3 | **parse_requests** | If True, then the created portfolio is added to all related DomainRequests. | +| 4 | **parse_domains** | If True, then the created portfolio is added to all related Domains. | +| 5 | **parse_members** | If True, then the created portfolio will add all managers of the portfolio domains as members of the portfolio, including invited managers. | +| 6 | **skip_existing_portfolios** | If True, then the script will only create suborganizations, modify DomainRequest, and modify DomainInformation records only when creating a new portfolio. Use this flag when you do not want to modify existing records. | +| 7 | **Debug** | Increases log verbosity | - Parameters #1-#2: Either `--agency_name` or `--branch` must be specified. Not both. -- Parameters #2-#3, you cannot use `--both` while using these. You must specify either `--parse_requests` or `--parse_domains` seperately. While all of these parameters are optional in that you do not need to specify all of them, -you must specify at least one to run this script. +- Parameters #3-#5, while all of these parameters are optional in that you do not need to specify all of them, +you must specify at least one to run this script. You can also chain all of them together. ## Patch suborganizations diff --git a/src/epplibwrapper/client.py b/src/epplibwrapper/client.py index 9d203b246..a7102b0e9 100644 --- a/src/epplibwrapper/client.py +++ b/src/epplibwrapper/client.py @@ -151,7 +151,7 @@ class EPPLibWrapper: raise RegistryError(message) from err else: if response.code >= 2000: - raise RegistryError(response.msg, code=response.code) + raise RegistryError(response.msg, code=response.code, response=response) else: return response @@ -174,6 +174,8 @@ class EPPLibWrapper: try: return self._send(command) except RegistryError as err: + if err.response: + logger.info(f"cltrid is {err.response.cl_tr_id} svtrid is {err.response.sv_tr_id}") if ( err.is_transport_error() or err.is_connection_error() diff --git a/src/epplibwrapper/errors.py b/src/epplibwrapper/errors.py index 95db40ab8..234bed611 100644 --- a/src/epplibwrapper/errors.py +++ b/src/epplibwrapper/errors.py @@ -1,4 +1,4 @@ -from enum import IntEnum +from enum import IntEnum, Enum class ErrorCode(IntEnum): @@ -52,6 +52,10 @@ class ErrorCode(IntEnum): SESSION_LIMIT_EXCEEDED_SERVER_CLOSING_CONNECTION = 2502 +class RegistryErrorMessage(Enum): + REGISTRAR_NOT_LOGGED_IN = "Registrar is not logged in." + + class RegistryError(Exception): """ Overview of registry response codes from RFC 5730. See RFC 5730 for full text. @@ -62,14 +66,21 @@ class RegistryError(Exception): - 2501 - 2502 Something malicious or abusive may have occurred """ - def __init__(self, *args, code=None, note="", **kwargs): + def __init__(self, *args, code=None, note="", response=None, **kwargs): super().__init__(*args, **kwargs) self.code = code + self.response = response # note is a string that can be used to provide additional context self.note = note def should_retry(self): - return self.code == ErrorCode.COMMAND_FAILED + # COMMAND_USE_ERROR is returning with message, Registrar is not logged in, + # which can be recovered from with a retry + return self.code == ErrorCode.COMMAND_FAILED or ( + self.code == ErrorCode.COMMAND_USE_ERROR + and self.response + and getattr(self.response, "msg", None) == RegistryErrorMessage.REGISTRAR_NOT_LOGGED_IN.value + ) def is_transport_error(self): return self.code == ErrorCode.TRANSPORT_ERROR diff --git a/src/epplibwrapper/tests/test_client.py b/src/epplibwrapper/tests/test_client.py index 57c99a05f..2850ae316 100644 --- a/src/epplibwrapper/tests/test_client.py +++ b/src/epplibwrapper/tests/test_client.py @@ -264,6 +264,58 @@ class TestClient(TestCase): # send() is called 5 times: send(login), send(command) fail, send(logout), send(login), send(command) self.assertEquals(mock_send.call_count, 5) + @less_console_noise_decorator + @patch("epplibwrapper.client.Client") + @patch("epplibwrapper.client.logger") + def test_send_command_2002_failure_prompts_successful_retry(self, mock_logger, mock_client): + """Test when the send("InfoDomainCommand) call fails with a 2002, prompting a retry + and the subsequent send("InfoDomainCommand) call succeeds + Flow: + Initialization succeeds + Send command fails (with 2002 code) prompting retry + Client closes and re-initializes, and command succeeds""" + # Mock the Client instance and its methods + # connect() and close() should succeed throughout + mock_connect = MagicMock() + mock_close = MagicMock() + # create success and failure result messages + send_command_success_result = self.fake_result(1000, "Command completed successfully") + send_command_failure_result = self.fake_result(2002, "Registrar is not logged in.") + # side_effect for send call, initial send(login) succeeds during initialization, next send(command) + # fails, subsequent sends (logout, login, command) all succeed + send_call_count = 0 + + # Create a mock command + mock_command = MagicMock() + mock_command.__class__.__name__ = "InfoDomainCommand" + + def side_effect(*args, **kwargs): + nonlocal send_call_count + send_call_count += 1 + if send_call_count == 2: + return send_command_failure_result + else: + return send_command_success_result + + mock_send = MagicMock(side_effect=side_effect) + mock_client.return_value.connect = mock_connect + mock_client.return_value.close = mock_close + mock_client.return_value.send = mock_send + # Create EPPLibWrapper instance and initialize client + wrapper = EPPLibWrapper() + wrapper.send(mock_command, cleaned=True) + # connect() is called twice, once during initialization of app, once during retry + self.assertEquals(mock_connect.call_count, 2) + # close() is called once, during retry + mock_close.assert_called_once() + # send() is called 5 times: send(login), send(command) fail, send(logout), send(login), send(command) + self.assertEquals(mock_send.call_count, 5) + # Assertion proper logging; note that the + mock_logger.info.assert_any_call( + "InfoDomainCommand failed and will be retried Error: Registrar is not logged in." + ) + mock_logger.info.assert_any_call("cltrid is cl_tr_id svtrid is sv_tr_id") + @less_console_noise_decorator def fake_failure_send_concurrent_threads(self, command=None, cleaned=None): """ diff --git a/src/registrar/admin.py b/src/registrar/admin.py index bff9b86cb..800e26e4f 100644 --- a/src/registrar/admin.py +++ b/src/registrar/admin.py @@ -72,6 +72,7 @@ from django.contrib.admin.widgets import FilteredSelectMultiple from django.utils.html import format_html from django.utils.translation import gettext_lazy as _ + logger = logging.getLogger(__name__) @@ -1261,6 +1262,13 @@ class HostIpAdmin(AuditedAdmin, ImportExportRegistrarModelAdmin): resource_classes = [HostIpResource] model = models.HostIP + search_fields = ["host__name", "address"] + search_help_text = "Search by host name or address." + list_display = ( + "host", + "address", + ) + class ContactResource(resources.ModelResource): """defines how each field in the referenced model should be mapped to the corresponding fields in the @@ -3211,9 +3219,9 @@ class DomainRequestAdmin(ListHeaderAdmin, ImportExportRegistrarModelAdmin): Returns a tuple: (obj: DomainRequest, should_proceed: bool) """ - should_proceed = True error_message = None + domain_name = original_obj.requested_domain.name # Get the method that should be run given the status selected_method = self.get_status_method_mapping(obj) @@ -3235,6 +3243,17 @@ class DomainRequestAdmin(ListHeaderAdmin, ImportExportRegistrarModelAdmin): # duplicated in the model and the error is raised from the model. # This avoids an ugly Django error screen. error_message = "This action is not permitted. The domain is already active." + elif ( + original_obj.status != models.DomainRequest.DomainRequestStatus.APPROVED + and obj.status == models.DomainRequest.DomainRequestStatus.APPROVED + and original_obj.requested_domain is not None + and Domain.is_pending_delete(domain_name) + ): + # 1. If the domain request is not approved in previous state (original status) + # 2. If the new status that's supposed to be triggered IS approved + # 3. That it's a valid domain + # 4. AND that the domain is currently in pendingDelete state + error_message = FSMDomainRequestError.get_error_message(FSMErrorCodes.DOMAIN_IS_PENDING_DELETE) elif ( original_obj.status != models.DomainRequest.DomainRequestStatus.APPROVED and obj.status == models.DomainRequest.DomainRequestStatus.APPROVED @@ -4609,6 +4628,10 @@ class PublicContactAdmin(ListHeaderAdmin, ImportExportRegistrarModelAdmin): change_form_template = "django/admin/email_clipboard_change_form.html" autocomplete_fields = ["domain"] + list_display = ("registry_id", "contact_type", "domain", "name") + search_fields = ["registry_id", "domain__name", "name"] + search_help_text = "Search by registry id, domain, or name." + list_filter = ("contact_type",) def changeform_view(self, request, object_id=None, form_url="", extra_context=None): if extra_context is None: diff --git a/src/registrar/assets/src/js/getgov/main.js b/src/registrar/assets/src/js/getgov/main.js index 490874220..a496b76f3 100644 --- a/src/registrar/assets/src/js/getgov/main.js +++ b/src/registrar/assets/src/js/getgov/main.js @@ -11,7 +11,7 @@ import { initDomainRequestsTable } from './table-domain-requests.js'; import { initMembersTable } from './table-members.js'; import { initMemberDomainsTable } from './table-member-domains.js'; import { initEditMemberDomainsTable } from './table-edit-member-domains.js'; -import { initPortfolioNewMemberPageToggle, initAddNewMemberPageListeners, initPortfolioMemberPageRadio } from './portfolio-member-page.js'; +import { initPortfolioNewMemberPageToggle, initAddNewMemberPageListeners, initPortfolioMemberPage } from './portfolio-member-page.js'; import { initDomainRequestForm } from './domain-request-form.js'; import { initDomainManagersPage } from './domain-managers.js'; import { initDomainDNSSEC } from './domain-dnssec.js'; @@ -56,8 +56,10 @@ initDomainDNSSEC(); initFormErrorHandling(); +// Init the portfolio member page +initPortfolioMemberPage(); + // Init the portfolio new member page -initPortfolioMemberPageRadio(); initPortfolioNewMemberPageToggle(); initAddNewMemberPageListeners(); diff --git a/src/registrar/assets/src/js/getgov/portfolio-member-page.js b/src/registrar/assets/src/js/getgov/portfolio-member-page.js index 96961e5dc..faf2c7b18 100644 --- a/src/registrar/assets/src/js/getgov/portfolio-member-page.js +++ b/src/registrar/assets/src/js/getgov/portfolio-member-page.js @@ -193,10 +193,14 @@ export function initAddNewMemberPageListeners() { } // Initalize the radio for the member pages -export function initPortfolioMemberPageRadio() { +export function initPortfolioMemberPage() { document.addEventListener("DOMContentLoaded", () => { let memberForm = document.getElementById("member_form"); - let newMemberForm = document.getElementById("add_member_form") + let newMemberForm = document.getElementById("add_member_form"); + let editSelfWarningModal = document.getElementById("toggle-member-permissions-edit-self"); + let editSelfWarningModalConfirm = document.getElementById("member-permissions-edit-self"); + + // Init the radio if (memberForm || newMemberForm) { hookupRadioTogglerListener( 'role', @@ -206,5 +210,36 @@ export function initPortfolioMemberPageRadio() { } ); } + + // Init the "edit self" warning modal, which triggers when the user is trying to edit themselves. + // The dom will include these elements when this occurs. + // NOTE: This logic does not trigger when the user is the ONLY admin in the portfolio. + // This is because info alerts are used rather than modals in this case. + if (memberForm && editSelfWarningModal) { + // Only show the warning modal when the user is changing their ROLE. + var canSubmit = document.querySelector(`input[name="role"]:checked`)?.value != "organization_member"; + let radioButtons = document.querySelectorAll(`input[name="role"]`); + radioButtons.forEach(function (radioButton) { + radioButton.addEventListener("change", function() { + let selectedValue = radioButton.checked ? radioButton.value : null; + canSubmit = selectedValue != "organization_member"; + }); + }); + + // Prevent form submission assuming org member is selected for role, and open the modal. + memberForm.addEventListener("submit", function(e) { + if (!canSubmit) { + e.preventDefault(); + editSelfWarningModal.click(); + } + }); + + // Hook the confirm button on the modal to form submission. + editSelfWarningModalConfirm.addEventListener("click", function() { + canSubmit = true; + memberForm.submit(); + }); + } }); + } diff --git a/src/registrar/config/settings.py b/src/registrar/config/settings.py index acd319ee5..92c2b2d61 100644 --- a/src/registrar/config/settings.py +++ b/src/registrar/config/settings.py @@ -203,6 +203,8 @@ MIDDLEWARE = [ "registrar.registrar_middleware.CheckPortfolioMiddleware", # Restrict access using Opt-Out approach "registrar.registrar_middleware.RestrictAccessMiddleware", + # Our own router logs that included user info to speed up log tracing time on stable + "registrar.registrar_middleware.RequestLoggingMiddleware", ] # application object used by Django's built-in servers (e.g. `runserver`) diff --git a/src/registrar/config/urls.py b/src/registrar/config/urls.py index 56f0cfd0f..ba9661d1c 100644 --- a/src/registrar/config/urls.py +++ b/src/registrar/config/urls.py @@ -171,6 +171,7 @@ urlpatterns = [ path( "admin/logout/", RedirectView.as_view(pattern_name="logout", permanent=False), + name="logout", ), path( "admin/analytics/export_data_type/", diff --git a/src/registrar/forms/portfolio.py b/src/registrar/forms/portfolio.py index db1f58d88..43a391ae0 100644 --- a/src/registrar/forms/portfolio.py +++ b/src/registrar/forms/portfolio.py @@ -411,6 +411,27 @@ class PortfolioMemberForm(BasePortfolioMemberForm): model = UserPortfolioPermission fields = ["roles", "additional_permissions"] + def clean(self): + """ + Override of clean to ensure that the user isn't removing themselves + if they're the only portfolio admin + """ + super().clean() + role = self.cleaned_data.get("role") + if self.instance and hasattr(self.instance, "user") and hasattr(self.instance, "portfolio"): + if role and self.instance.user.is_only_admin_of_portfolio(self.instance.portfolio): + # This is how you associate a validation error to a particular field. + # The alternative is to do this in clean_role, but execution order matters. + raise forms.ValidationError( + { + "role": forms.ValidationError( + "You can't change your member access because you're " + "the only admin for this organization. " + "To change your access, you'll need to add another admin." + ) + } + ) + class PortfolioInvitedMemberForm(BasePortfolioMemberForm): """ diff --git a/src/registrar/management/commands/create_federal_portfolio.py b/src/registrar/management/commands/create_federal_portfolio.py index d753d0ce8..5e97bf44c 100644 --- a/src/registrar/management/commands/create_federal_portfolio.py +++ b/src/registrar/management/commands/create_federal_portfolio.py @@ -3,14 +3,14 @@ import argparse import logging from django.core.management import BaseCommand, CommandError -from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper +from registrar.management.commands.utility.terminal_helper import ScriptDataHelper, TerminalColors, TerminalHelper from registrar.models import DomainInformation, DomainRequest, FederalAgency, Suborganization, Portfolio, User from registrar.models.domain import Domain from registrar.models.domain_invitation import DomainInvitation from registrar.models.portfolio_invitation import PortfolioInvitation from registrar.models.user_domain_role import UserDomainRole from registrar.models.user_portfolio_permission import UserPortfolioPermission -from registrar.models.utility.generic_helper import normalize_string +from registrar.models.utility.generic_helper import count_capitals, normalize_string from django.db.models import F, Q from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices @@ -22,16 +22,56 @@ logger = logging.getLogger(__name__) class Command(BaseCommand): help = "Creates a federal portfolio given a FederalAgency name" + class ChangeTracker: + def __init__(self, model_class): + self.model_class = model_class + self.create = [] + self.update = [] + self.skip = [] + self.fail = [] + + def print_script_run_summary(self, no_changes_message, **kwargs): + """Helper function that runs TerminalHelper.log_script_run_summary on this object.""" + if self.has_changes(): + TerminalHelper.log_script_run_summary(self.create, self.update, self.skip, self.fail, **kwargs) + else: + logger.info(f"{TerminalColors.BOLD}{no_changes_message}{TerminalColors.ENDC}") + + def has_changes(self) -> bool: + changes = [self.create, self.update, self.skip, self.fail] + return any([change for change in changes if change]) + + def bulk_create(self): + try: + res = ScriptDataHelper.bulk_create_fields( + self.model_class, self.create, return_created=True, quiet=True + ) + self.create = res + return res + except Exception as err: + # In this case, just swap the fail and add lists + self.fail = self.create.copy() + self.create.clear() + raise err + + def bulk_update(self, fields_to_update): + try: + ScriptDataHelper.bulk_update_fields(self.model_class, self.update, fields_to_update, quiet=True) + except Exception as err: + # In this case, just swap the fail and update lists + self.fail = self.update.copy() + self.update.clear() + raise err + def __init__(self, *args, **kwargs): """Defines fields to track what portfolios were updated, skipped, or just outright failed.""" super().__init__(*args, **kwargs) - self.updated_portfolios = set() - self.skipped_portfolios = set() - self.failed_portfolios = set() - self.added_managers = set() - self.added_invitations = set() - self.skipped_invitations = set() - self.failed_managers = set() + self.portfolio_changes = self.ChangeTracker(model_class=Portfolio) + self.suborganization_changes = self.ChangeTracker(model_class=Suborganization) + self.domain_info_changes = self.ChangeTracker(model_class=DomainInformation) + self.domain_request_changes = self.ChangeTracker(model_class=DomainRequest) + self.user_portfolio_perm_changes = self.ChangeTracker(model_class=UserPortfolioPermission) + self.portfolio_invitation_changes = self.ChangeTracker(model_class=PortfolioInvitation) def add_arguments(self, parser): """Add command line arguments to create federal portfolios. @@ -44,14 +84,11 @@ class Command(BaseCommand): Required (at least one): --parse_requests: Add the created portfolio(s) to related DomainRequest records --parse_domains: Add the created portfolio(s) to related DomainInformation records - Note: You can use both --parse_requests and --parse_domains together - - Optional (mutually exclusive with parse options): - --both: Shorthand for using both --parse_requests and --parse_domains - Cannot be used with --parse_requests or --parse_domains + --parse_managers: Add all domain managers of the portfolio's domains to the organization. Optional: - --add_managers: Add all domain managers of the portfolio's domains to the organization. + --skip_existing_portfolios: Does not perform substeps on a portfolio if it already exists. + --debug: Increases log verbosity """ group = parser.add_mutually_exclusive_group(required=True) group.add_argument( @@ -74,19 +111,19 @@ class Command(BaseCommand): help="Adds portfolio to DomainInformation", ) parser.add_argument( - "--both", - action=argparse.BooleanOptionalAction, - help="Adds portfolio to both requests and domains", - ) - parser.add_argument( - "--add_managers", + "--parse_managers", action=argparse.BooleanOptionalAction, help="Add all domain managers of the portfolio's domains to the organization.", ) parser.add_argument( "--skip_existing_portfolios", action=argparse.BooleanOptionalAction, - help="Only add suborganizations to newly created portfolios, skip existing ones.", + help="Only parses newly created portfolios, skippubg existing ones.", + ) + parser.add_argument( + "--debug", + action=argparse.BooleanOptionalAction, + help="Shows additional log info.", ) def handle(self, **options): # noqa: C901 @@ -94,22 +131,20 @@ class Command(BaseCommand): branch = options.get("branch") parse_requests = options.get("parse_requests") parse_domains = options.get("parse_domains") - both = options.get("both") - add_managers = options.get("add_managers") + parse_managers = options.get("parse_managers") skip_existing_portfolios = options.get("skip_existing_portfolios") + debug = options.get("debug") - if not both: - if not (parse_requests or parse_domains or add_managers): - raise CommandError( - "You must specify at least one of --parse_requests, --parse_domains, or --add_managers." - ) - else: - if parse_requests or parse_domains: - raise CommandError("You cannot pass --parse_requests or --parse_domains when passing --both.") + # Parse script params + if not (parse_requests or parse_domains or parse_managers): + raise CommandError( + "You must specify at least one of --parse_requests, --parse_domains, or --parse_managers." + ) + # Get agencies federal_agency_filter = {"agency__iexact": agency_name} if agency_name else {"federal_type": branch} - agencies = FederalAgency.objects.filter(**federal_agency_filter) - if not agencies or agencies.count() < 1: + agencies = FederalAgency.objects.filter(agency__isnull=False, **federal_agency_filter).distinct() + if not agencies.exists(): if agency_name: raise CommandError( f"Cannot find the federal agency '{agency_name}' in our database. " @@ -118,421 +153,207 @@ class Command(BaseCommand): ) else: raise CommandError(f"Cannot find '{branch}' federal agencies in our database.") - portfolios = [] - for federal_agency in agencies: - message = f"Processing federal agency '{federal_agency.agency}'..." - TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message) - try: - # C901 'Command.handle' is too complex (12) - portfolio = self.handle_populate_portfolio( - federal_agency, parse_domains, parse_requests, both, skip_existing_portfolios - ) - portfolios.append(portfolio) - if add_managers: - self.add_managers_to_portfolio(portfolio) - except Exception as exec: - self.failed_portfolios.add(federal_agency) - logger.error(exec) - message = f"Failed to create portfolio '{federal_agency.agency}'" - TerminalHelper.colorful_logger(logger.info, TerminalColors.FAIL, message) - # POST PROCESS STEP: Add additional suborg info where applicable. - updated_suborg_count = self.post_process_all_suborganization_fields(agencies) - message = f"Added city and state_territory information to {updated_suborg_count} suborgs." - TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message) - TerminalHelper.log_script_run_summary( - self.updated_portfolios, - self.failed_portfolios, - self.skipped_portfolios, - debug=False, - log_header="============= FINISHED HANDLE PORTFOLIO STEP ===============", + # Store all portfolios and agencies in a dict to avoid extra db calls + existing_portfolios = Portfolio.objects.filter( + organization_name__in=agencies.values_list("agency", flat=True), organization_name__isnull=False + ) + existing_portfolios_dict = {normalize_string(p.organization_name): p for p in existing_portfolios} + agencies_dict = {normalize_string(agency.agency): agency for agency in agencies} + + # NOTE: exceptions to portfolio and suborg are intentionally uncaught. + # parse domains, requests, and managers all rely on these fields to function. + # An error here means everything down the line is compromised. + # The individual parse steps, however, are independent from eachother. + + # == Handle portfolios == # + # Loop through every agency we want to add and create a portfolio if the record is new. + for federal_agency in agencies_dict.values(): + norm_agency_name = normalize_string(federal_agency.agency) + portfolio = existing_portfolios_dict.get(norm_agency_name, None) + if portfolio is None: + portfolio = Portfolio( + organization_name=federal_agency.agency, + federal_agency=federal_agency, + organization_type=DomainRequest.OrganizationChoices.FEDERAL, + creator=User.get_default_user(), + notes="Auto-generated record", + senior_official=federal_agency.so_federal_agency.first(), + ) + self.portfolio_changes.create.append(portfolio) + logger.info(f"{TerminalColors.OKGREEN}Created portfolio '{portfolio}'.{TerminalColors.ENDC}") + elif skip_existing_portfolios: + message = f"Portfolio '{portfolio}' already exists. Skipped." + logger.info(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}") + self.portfolio_changes.skip.append(portfolio) + + # Create portfolios + self.portfolio_changes.bulk_create() + + # After create, get the list of all portfolios to use + portfolios_to_use = set(self.portfolio_changes.create) + if not skip_existing_portfolios: + portfolios_to_use.update(set(existing_portfolios)) + + portfolios_to_use_dict = {normalize_string(p.organization_name): p for p in portfolios_to_use} + + # == Handle suborganizations == # + created_suborgs = self.create_suborganizations(portfolios_to_use_dict, agencies_dict) + if created_suborgs: + self.suborganization_changes.create.extend(created_suborgs.values()) + self.suborganization_changes.bulk_create() + + # == Handle domains and requests == # + for portfolio_org_name, portfolio in portfolios_to_use_dict.items(): + federal_agency = agencies_dict.get(portfolio_org_name) + suborgs = portfolio.portfolio_suborganizations.in_bulk(field_name="name") + + if parse_domains: + updated_domains = self.update_domains(portfolio, federal_agency, suborgs, debug) + self.domain_info_changes.update.extend(updated_domains) + + if parse_requests: + updated_domain_requests = self.update_requests(portfolio, federal_agency, suborgs, debug) + self.domain_request_changes.update.extend(updated_domain_requests) + + # Update DomainInformation + try: + self.domain_info_changes.bulk_update(["portfolio", "sub_organization"]) + except Exception as err: + logger.error(f"{TerminalColors.FAIL}Could not bulk update domain infos.{TerminalColors.ENDC}") + logger.error(err, exc_info=True) + + # Update DomainRequest + try: + self.domain_request_changes.bulk_update( + [ + "portfolio", + "sub_organization", + "requested_suborganization", + "suborganization_city", + "suborganization_state_territory", + "federal_agency", + ] + ) + except Exception as err: + logger.error(f"{TerminalColors.FAIL}Could not bulk update domain requests.{TerminalColors.ENDC}") + logger.error(err, exc_info=True) + + # == Handle managers (no bulk_create) == # + if parse_managers: + domain_infos = DomainInformation.objects.filter(portfolio__in=portfolios_to_use) + domains = Domain.objects.filter(domain_info__in=domain_infos) + + # Create UserPortfolioPermission + self.create_user_portfolio_permissions(domains) + + # Create PortfolioInvitation + self.create_portfolio_invitations(domains) + + # == PRINT RUN SUMMARY == # + self.print_final_run_summary(parse_domains, parse_requests, parse_managers, debug) + + def print_final_run_summary(self, parse_domains, parse_requests, parse_managers, debug): + self.portfolio_changes.print_script_run_summary( + no_changes_message="||============= No portfolios changed. =============||", + log_header="============= PORTFOLIOS =============", skipped_header="----- SOME PORTFOLIOS WERENT CREATED (BUT OTHER RECORDS ARE STILL PROCESSED) -----", - display_as_str=True, - ) - - if add_managers: - TerminalHelper.log_script_run_summary( - self.added_managers, - self.failed_managers, - [], # can't skip managers, can only add or fail - log_header="----- MANAGERS ADDED -----", - debug=False, - display_as_str=True, - ) - - TerminalHelper.log_script_run_summary( - self.added_invitations, - [], - self.skipped_invitations, - log_header="----- INVITATIONS ADDED -----", - debug=False, - skipped_header="----- INVITATIONS SKIPPED (ALREADY EXISTED) -----", - display_as_str=True, - ) - - # POST PROCESSING STEP: Remove the federal agency if it matches the portfolio name. - # We only do this for started domain requests. - if parse_requests or both: - prompt_message = ( - "This action will update domain requests even if they aren't on a portfolio." - "\nNOTE: This will modify domain requests, even if no portfolios were created." - "\nIn the event no portfolios *are* created, then this step will target " - "the existing portfolios with your given params." - "\nThis step is entirely optional, and is just for extra data cleanup." - ) - TerminalHelper.prompt_for_execution( - system_exit_on_terminate=True, - prompt_message=prompt_message, - prompt_title=( - "POST PROCESS STEP: Do you want to clear federal agency on (related) started domain requests?" - ), - verify_message="*** THIS STEP IS OPTIONAL ***", - ) - self.post_process_started_domain_requests(agencies, portfolios) - - def add_managers_to_portfolio(self, portfolio: Portfolio): - """ - Add all domain managers of the portfolio's domains to the organization. - This includes adding them to the correct group and creating portfolio invitations. - """ - logger.info(f"Adding managers for portfolio {portfolio}") - - # Fetch all domains associated with the portfolio - domains = Domain.objects.filter(domain_info__portfolio=portfolio) - domain_managers: set[UserDomainRole] = set() - - # Fetch all users with manager roles for the domains - # select_related means that a db query will not be occur when you do user_domain_role.user - # Its similar to a set or dict in that it costs slightly more upfront in exchange for perf later - user_domain_roles = UserDomainRole.objects.select_related("user").filter( - domain__in=domains, role=UserDomainRole.Roles.MANAGER - ) - domain_managers.update(user_domain_roles) - - invited_managers: set[str] = set() - - # Get the emails of invited managers - domain_invitations = DomainInvitation.objects.filter( - domain__in=domains, status=DomainInvitation.DomainInvitationStatus.INVITED - ).values_list("email", flat=True) - invited_managers.update(domain_invitations) - - for user_domain_role in domain_managers: - try: - # manager is a user id - user = user_domain_role.user - _, created = UserPortfolioPermission.objects.get_or_create( - portfolio=portfolio, - user=user, - defaults={"roles": [UserPortfolioRoleChoices.ORGANIZATION_MEMBER]}, - ) - self.added_managers.add(user) - if created: - logger.info(f"Added manager '{user}' to portfolio '{portfolio}'") - else: - logger.info(f"Manager '{user}' already exists in portfolio '{portfolio}'") - except User.DoesNotExist: - self.failed_managers.add(user) - logger.debug(f"User '{user}' does not exist") - - for email in invited_managers: - self.create_portfolio_invitation(portfolio, email) - - def create_portfolio_invitation(self, portfolio: Portfolio, email: str): - """ - Create a portfolio invitation for the given email. - """ - _, created = PortfolioInvitation.objects.get_or_create( - portfolio=portfolio, - email=email, - defaults={ - "status": PortfolioInvitation.PortfolioInvitationStatus.INVITED, - "roles": [UserPortfolioRoleChoices.ORGANIZATION_MEMBER], - }, - ) - if created: - self.added_invitations.add(email) - logger.info(f"Created portfolio invitation for '{email}' to portfolio '{portfolio}'") - else: - self.skipped_invitations.add(email) - logger.info(f"Found existing portfolio invitation for '{email}' to portfolio '{portfolio}'") - - def post_process_started_domain_requests(self, agencies, portfolios): - """ - Removes duplicate organization data by clearing federal_agency when it matches the portfolio name. - Only processes domain requests in STARTED status. - """ - message = "Removing duplicate portfolio and federal_agency values from domain requests..." - TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message) - - # For each request, clear the federal agency under these conditions: - # 1. A portfolio *already exists* with the same name as the federal agency. - # 2. Said portfolio (or portfolios) are only the ones specified at the start of the script. - # 3. The domain request is in status "started". - # Note: Both names are normalized so excess spaces are stripped and the string is lowercased. - - domain_requests_to_update = DomainRequest.objects.filter( - federal_agency__in=agencies, - federal_agency__agency__isnull=False, - status=DomainRequest.DomainRequestStatus.STARTED, - organization_name__isnull=False, - ) - - if domain_requests_to_update.count() == 0: - TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, "No domain requests to update.") - return - - portfolio_set = {normalize_string(portfolio.organization_name) for portfolio in portfolios if portfolio} - - # Update the request, assuming the given agency name matches the portfolio name - updated_requests = [] - for req in domain_requests_to_update: - agency_name = normalize_string(req.federal_agency.agency) - if agency_name in portfolio_set: - req.federal_agency = None - updated_requests.append(req) - - # Execute the update and Log the results - if TerminalHelper.prompt_for_execution( - system_exit_on_terminate=False, - prompt_message=( - f"{len(domain_requests_to_update)} domain requests will be updated. " - f"These records will be changed: {[str(req) for req in updated_requests]}" + detailed_prompt_title=( + "PORTFOLIOS: Do you wish to see the full list of failed, skipped and updated records?" ), - prompt_title="Do you wish to commit this update to the database?", - ): - DomainRequest.objects.bulk_update(updated_requests, ["federal_agency"]) - TerminalHelper.colorful_logger(logger.info, TerminalColors.OKBLUE, "Action completed successfully.") - - def handle_populate_portfolio(self, federal_agency, parse_domains, parse_requests, both, skip_existing_portfolios): - """Attempts to create a portfolio. If successful, this function will - also create new suborganizations""" - portfolio, created = self.create_portfolio(federal_agency) - if skip_existing_portfolios and not created: - TerminalHelper.colorful_logger( - logger.warning, - TerminalColors.YELLOW, - "Skipping modifications to suborgs, domain requests, and " - "domains due to the --skip_existing_portfolios flag. Portfolio already exists.", - ) - return portfolio - - self.create_suborganizations(portfolio, federal_agency) - if parse_domains or both: - self.handle_portfolio_domains(portfolio, federal_agency) - - if parse_requests or both: - self.handle_portfolio_requests(portfolio, federal_agency) - - return portfolio - - def create_portfolio(self, federal_agency): - """Creates a portfolio if it doesn't presently exist. - Returns portfolio, created.""" - # Get the org name / senior official - org_name = federal_agency.agency - so = federal_agency.so_federal_agency.first() if federal_agency.so_federal_agency.exists() else None - - # First just try to get an existing portfolio - portfolio = Portfolio.objects.filter(organization_name=org_name).first() - if portfolio: - self.skipped_portfolios.add(portfolio) - TerminalHelper.colorful_logger( - logger.info, - TerminalColors.YELLOW, - f"Portfolio with organization name '{org_name}' already exists. Skipping create.", - ) - return portfolio, False - - # Create new portfolio if it doesn't exist - portfolio = Portfolio.objects.create( - organization_name=org_name, - federal_agency=federal_agency, - organization_type=DomainRequest.OrganizationChoices.FEDERAL, - creator=User.get_default_user(), - notes="Auto-generated record", - senior_official=so, + display_as_str=True, + debug=debug, + ) + self.suborganization_changes.print_script_run_summary( + no_changes_message="||============= No suborganizations changed. =============||", + log_header="============= SUBORGANIZATIONS =============", + skipped_header="----- SUBORGANIZATIONS SKIPPED (SAME NAME AS PORTFOLIO NAME) -----", + detailed_prompt_title=( + "SUBORGANIZATIONS: Do you wish to see the full list of failed, skipped and updated records?" + ), + display_as_str=True, + debug=debug, ) - self.updated_portfolios.add(portfolio) - TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, f"Created portfolio '{portfolio}'") - - # Log if the senior official was added or not. - if portfolio.senior_official: - message = f"Added senior official '{portfolio.senior_official}'" - TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message) - else: - message = ( - f"No senior official added to portfolio '{org_name}'. " - "None was returned for the reverse relation `FederalAgency.so_federal_agency.first()`" + if parse_domains: + self.domain_info_changes.print_script_run_summary( + no_changes_message="||============= No domains changed. =============||", + log_header="============= DOMAINS =============", + detailed_prompt_title=( + "DOMAINS: Do you wish to see the full list of failed, skipped and updated records?" + ), + display_as_str=True, + debug=debug, ) - TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message) - return portfolio, True + if parse_requests: + self.domain_request_changes.print_script_run_summary( + no_changes_message="||============= No domain requests changed. =============||", + log_header="============= DOMAIN REQUESTS =============", + detailed_prompt_title=( + "DOMAIN REQUESTS: Do you wish to see the full list of failed, skipped and updated records?" + ), + display_as_str=True, + debug=debug, + ) - def create_suborganizations(self, portfolio: Portfolio, federal_agency: FederalAgency): + if parse_managers: + self.user_portfolio_perm_changes.print_script_run_summary( + no_changes_message="||============= No managers changed. =============||", + log_header="============= MANAGERS =============", + skipped_header="----- MANAGERS SKIPPED (ALREADY EXISTED) -----", + detailed_prompt_title=( + "MANAGERS: Do you wish to see the full list of failed, skipped and updated records?" + ), + display_as_str=True, + debug=debug, + ) + self.portfolio_invitation_changes.print_script_run_summary( + no_changes_message="||============= No manager invitations changed. =============||", + log_header="============= MANAGER INVITATIONS =============", + skipped_header="----- INVITATIONS SKIPPED (ALREADY EXISTED) -----", + detailed_prompt_title=( + "MANAGER INVITATIONS: Do you wish to see the full list of failed, skipped and updated records?" + ), + display_as_str=True, + debug=debug, + ) + + def create_suborganizations(self, portfolio_dict, agency_dict): """Create Suborganizations tied to the given portfolio based on DomainInformation objects""" - valid_agencies = DomainInformation.objects.filter( - federal_agency=federal_agency, organization_name__isnull=False - ) - org_names = set(valid_agencies.values_list("organization_name", flat=True)) - if not org_names: - message = ( - "Could not add any suborganizations." - f"\nNo suborganizations were found for '{federal_agency}' when filtering on this name, " - "and excluding null organization_name records." - ) - TerminalHelper.colorful_logger(logger.warning, TerminalColors.FAIL, message) - return + created_suborgs = {} - # Check for existing suborgs on the current portfolio - existing_suborgs = Suborganization.objects.filter(name__in=org_names, name__isnull=False) - if existing_suborgs.exists(): - message = f"Some suborganizations already exist for portfolio '{portfolio}'." - TerminalHelper.colorful_logger(logger.info, TerminalColors.OKBLUE, message) + portfolios = portfolio_dict.values() + agencies = agency_dict.values() - # Create new suborgs, as long as they don't exist in the db already - new_suborgs = [] - for name in org_names - set(existing_suborgs.values_list("name", flat=True)): - if normalize_string(name) == normalize_string(portfolio.organization_name): - # You can use this to populate location information, when this occurs. - # However, this isn't needed for now so we can skip it. - message = ( - f"Skipping suborganization create on record '{name}'. " - "The federal agency name is the same as the portfolio name." - ) - TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, message) - else: - new_suborgs.append(Suborganization(name=name, portfolio=portfolio)) # type: ignore - - if new_suborgs: - Suborganization.objects.bulk_create(new_suborgs) - TerminalHelper.colorful_logger( - logger.info, TerminalColors.OKGREEN, f"Added {len(new_suborgs)} suborganizations" - ) - else: - TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, "No suborganizations added") - - def handle_portfolio_requests(self, portfolio: Portfolio, federal_agency: FederalAgency): - """ - Associate portfolio with domain requests for a federal agency. - Updates all relevant domain request records. - """ - invalid_states = [ - DomainRequest.DomainRequestStatus.STARTED, - DomainRequest.DomainRequestStatus.INELIGIBLE, - DomainRequest.DomainRequestStatus.REJECTED, - ] - domain_requests = DomainRequest.objects.filter(federal_agency=federal_agency).exclude(status__in=invalid_states) - if not domain_requests.exists(): - message = f""" - Portfolio '{portfolio}' not added to domain requests: no valid records found. - This means that a filter on DomainInformation for the federal_agency '{federal_agency}' returned no results. - Excluded statuses: STARTED, INELIGIBLE, REJECTED. - Filter info: DomainRequest.objects.filter(federal_agency=federal_agency).exclude( - status__in=invalid_states - ) - """ - TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message) - return None - - # Get all suborg information and store it in a dict to avoid doing a db call - suborgs = Suborganization.objects.filter(portfolio=portfolio).in_bulk(field_name="name") - for domain_request in domain_requests: - # Set the portfolio - domain_request.portfolio = portfolio - - # Set suborg info - domain_request.sub_organization = suborgs.get(domain_request.organization_name, None) - if domain_request.sub_organization is None: - domain_request.requested_suborganization = normalize_string( - domain_request.organization_name, lowercase=False - ) - domain_request.suborganization_city = normalize_string(domain_request.city, lowercase=False) - domain_request.suborganization_state_territory = domain_request.state_territory - - self.updated_portfolios.add(portfolio) - - DomainRequest.objects.bulk_update( - domain_requests, - [ - "portfolio", - "sub_organization", - "requested_suborganization", - "suborganization_city", - "suborganization_state_territory", - ], - ) - message = f"Added portfolio '{portfolio}' to {len(domain_requests)} domain requests." - TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message) - - def handle_portfolio_domains(self, portfolio: Portfolio, federal_agency: FederalAgency): - """ - Associate portfolio with domains for a federal agency. - Updates all relevant domain information records. - - Returns a queryset of DomainInformation objects, or None if nothing changed. - """ - domain_infos = DomainInformation.objects.filter(federal_agency=federal_agency) - if not domain_infos.exists(): - message = f""" - Portfolio '{portfolio}' not added to domains: no valid records found. - The filter on DomainInformation for the federal_agency '{federal_agency}' returned no results. - Filter info: DomainInformation.objects.filter(federal_agency=federal_agency) - """ - TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message) - return None - - # Get all suborg information and store it in a dict to avoid doing a db call - suborgs = Suborganization.objects.filter(portfolio=portfolio).in_bulk(field_name="name") - for domain_info in domain_infos: - domain_info.portfolio = portfolio - domain_info.sub_organization = suborgs.get(domain_info.organization_name, None) - - DomainInformation.objects.bulk_update(domain_infos, ["portfolio", "sub_organization"]) - message = f"Added portfolio '{portfolio}' to {len(domain_infos)} domains." - TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message) - - def post_process_all_suborganization_fields(self, agencies): - """Batch updates suborganization locations from domain and request data. - - Args: - agencies: List of FederalAgency objects to process - - Returns: - int: Number of suborganizations updated - - Priority for location data: - 1. Domain information - 2. Domain request suborganization fields - 3. Domain request standard fields - """ - # Common filter between domaininformation / domain request. - # Filter by only the agencies we've updated thus far. - # Then, only process records without null portfolio, org name, or suborg name. - base_filter = Q( - federal_agency__in=agencies, - portfolio__isnull=False, - organization_name__isnull=False, - sub_organization__isnull=False, - ) & ~Q(organization_name__iexact=F("portfolio__organization_name")) - - # First: Remove null city / state_territory values on domain info / domain requests. - # We want to add city data if there is data to add to begin with! domains = DomainInformation.objects.filter( - base_filter, - Q(city__isnull=False, state_territory__isnull=False), + # Org name must not be null, and must not be the portfolio name + Q( + organization_name__isnull=False, + ) + & ~Q(organization_name__iexact=F("portfolio__organization_name")), + # Only get relevant data to the agency/portfolio we are targeting + Q(federal_agency__in=agencies) | Q(portfolio__in=portfolios), ) requests = DomainRequest.objects.filter( - base_filter, - ( - Q(city__isnull=False, state_territory__isnull=False) - | Q(suborganization_city__isnull=False, suborganization_state_territory__isnull=False) - ), + # Org name must not be null, and must not be the portfolio name + Q( + organization_name__isnull=False, + ) + & ~Q(organization_name__iexact=F("portfolio__organization_name")), + # Only get relevant data to the agency/portfolio we are targeting + Q(federal_agency__in=agencies) | Q(portfolio__in=portfolios), ) + # First: get all existing suborgs + # NOTE: .all() is a heavy query, but unavoidable as we need to check for duplicate names. + # This is not quite as heavy as just using a for loop and .get_or_create, but worth noting. + # Change this if you can find a way to avoid doing this. + # This won't scale great for 10k+ records. + existing_suborgs = Suborganization.objects.all() + suborg_dict = {normalize_string(org.name): org for org in existing_suborgs} + # Second: Group domains and requests by normalized organization name. - # This means that later down the line we have to account for "duplicate" org names. domains_dict = {} requests_dict = {} for domain in domains: @@ -543,40 +364,60 @@ class Command(BaseCommand): normalized_name = normalize_string(request.organization_name) requests_dict.setdefault(normalized_name, []).append(request) - # Third: Get suborganizations to update - suborgs_to_edit = Suborganization.objects.filter( - Q(id__in=domains.values_list("sub_organization", flat=True)) - | Q(id__in=requests.values_list("sub_organization", flat=True)) - ) + # Third: Parse through each group of domains that have the same organization names, + # then create *one* suborg record from it. + # Normalize all suborg names so we don't add duplicate data unintentionally. + for portfolio_name, portfolio in portfolio_dict.items(): + # For a given agency, find all domains that list suborg info for it. + for norm_org_name, domains in domains_dict.items(): + # Don't add the record if the suborg name would equal the portfolio name + if norm_org_name == portfolio_name: + continue - # Fourth: Process each suborg to add city / state territory info - for suborg in suborgs_to_edit: - self.post_process_suborganization_fields(suborg, domains_dict, requests_dict) + new_suborg_name = None + if len(domains) == 1: + new_suborg_name = normalize_string(domains[0].organization_name, lowercase=False) + elif len(domains) > 1: + # Pick the best record for a suborg name (fewest spaces, most leading capitals) + best_record = max( + domains, + key=lambda rank: ( + -domain.organization_name.count(" "), + count_capitals(domain.organization_name, leading_only=True), + ), + ) + new_suborg_name = normalize_string(best_record.organization_name, lowercase=False) - # Fifth: Perform a bulk update - return Suborganization.objects.bulk_update(suborgs_to_edit, ["city", "state_territory"]) + # If the suborg already exists, don't add it again. + if norm_org_name not in suborg_dict and norm_org_name not in created_suborgs: + requests = requests_dict.get(norm_org_name) + suborg = Suborganization(name=new_suborg_name, portfolio=portfolio) + self.set_suborganization_location(suborg, domains, requests) + created_suborgs[norm_org_name] = suborg + return created_suborgs - def post_process_suborganization_fields(self, suborg, domains_dict, requests_dict): + def set_suborganization_location(self, suborg, domains, requests): """Updates a single suborganization's location data if valid. Args: suborg: Suborganization to update - domains_dict: Dict of domain info records grouped by org name - requests_dict: Dict of domain requests grouped by org name + domains: omain info records grouped by org name + requests: domain requests grouped by org name - Priority matches parent method. Updates are skipped if location data conflicts + Updates are skipped if location data conflicts between multiple records of the same type. """ - normalized_suborg_name = normalize_string(suborg.name) - domains = domains_dict.get(normalized_suborg_name, []) - requests = requests_dict.get(normalized_suborg_name, []) # Try to get matching domain info domain = None if domains: reference = domains[0] use_location_for_domain = all( - d.city == reference.city and d.state_territory == reference.state_territory for d in domains + d.city + and d.state_territory + and d.city == reference.city + and d.state_territory == reference.state_territory + for d in domains ) if use_location_for_domain: domain = reference @@ -608,7 +449,7 @@ class Command(BaseCommand): if not domain and not request: message = f"Skipping adding city / state_territory information to suborg: {suborg}. Bad data." - TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, message) + logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}") return # PRIORITY: @@ -625,8 +466,106 @@ class Command(BaseCommand): suborg.city = normalize_string(request.city, lowercase=False) suborg.state_territory = request.state_territory - message = ( - f"Added city/state_territory to suborg: {suborg}. " - f"city - {suborg.city}, state - {suborg.state_territory}" + def update_domains(self, portfolio, federal_agency, suborgs, debug): + """ + Associate portfolio with domains for a federal agency. + Updates all relevant domain information records. + + Returns a queryset of DomainInformation objects, or None if nothing changed. + """ + updated_domains = set() + domain_infos = federal_agency.domaininformation_set.all() + for domain_info in domain_infos: + org_name = normalize_string(domain_info.organization_name, lowercase=False) + domain_info.portfolio = portfolio + domain_info.sub_organization = suborgs.get(org_name, None) + updated_domains.add(domain_info) + + if not updated_domains and debug: + message = f"Portfolio '{portfolio}' not added to domains: nothing to add found." + logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}") + + return updated_domains + + def update_requests( + self, + portfolio, + federal_agency, + suborgs, + debug, + ): + """ + Associate portfolio with domain requests for a federal agency. + Updates all relevant domain request records. + """ + updated_domain_requests = set() + invalid_states = [ + DomainRequest.DomainRequestStatus.INELIGIBLE, + DomainRequest.DomainRequestStatus.REJECTED, + ] + domain_requests = federal_agency.domainrequest_set.exclude(status__in=invalid_states) + + # Add portfolio, sub_org, requested_suborg, suborg_city, and suborg_state_territory. + # For started domain requests, set the federal agency to None if not on a portfolio. + for domain_request in domain_requests: + if domain_request.status != DomainRequest.DomainRequestStatus.STARTED: + org_name = normalize_string(domain_request.organization_name, lowercase=False) + domain_request.portfolio = portfolio + domain_request.sub_organization = suborgs.get(org_name, None) + if domain_request.sub_organization is None: + domain_request.requested_suborganization = normalize_string( + domain_request.organization_name, lowercase=False + ) + domain_request.suborganization_city = normalize_string(domain_request.city, lowercase=False) + domain_request.suborganization_state_territory = domain_request.state_territory + else: + # Clear the federal agency for started domain requests + agency_name = normalize_string(domain_request.federal_agency.agency) + portfolio_name = normalize_string(portfolio.organization_name) + if agency_name == portfolio_name: + domain_request.federal_agency = None + logger.info(f"Set federal agency on started domain request '{domain_request}' to None.") + updated_domain_requests.add(domain_request) + + if not updated_domain_requests and debug: + message = f"Portfolio '{portfolio}' not added to domain requests: nothing to add found." + logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}") + + return updated_domain_requests + + def create_user_portfolio_permissions(self, domains): + user_domain_roles = UserDomainRole.objects.select_related( + "user", "domain", "domain__domain_info", "domain__domain_info__portfolio" + ).filter(domain__in=domains, domain__domain_info__portfolio__isnull=False, role=UserDomainRole.Roles.MANAGER) + for user_domain_role in user_domain_roles: + user = user_domain_role.user + permission, created = UserPortfolioPermission.objects.get_or_create( + portfolio=user_domain_role.domain.domain_info.portfolio, + user=user, + defaults={"roles": [UserPortfolioRoleChoices.ORGANIZATION_MEMBER]}, + ) + if created: + self.user_portfolio_perm_changes.create.append(permission) + else: + self.user_portfolio_perm_changes.skip.append(permission) + + def create_portfolio_invitations(self, domains): + domain_invitations = DomainInvitation.objects.select_related( + "domain", "domain__domain_info", "domain__domain_info__portfolio" + ).filter( + domain__in=domains, + domain__domain_info__portfolio__isnull=False, + status=DomainInvitation.DomainInvitationStatus.INVITED, ) - TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message) + for domain_invitation in domain_invitations: + email = normalize_string(domain_invitation.email) + invitation, created = PortfolioInvitation.objects.get_or_create( + portfolio=domain_invitation.domain.domain_info.portfolio, + email=email, + status=PortfolioInvitation.PortfolioInvitationStatus.INVITED, + roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER], + ) + if created: + self.portfolio_invitation_changes.create.append(invitation) + else: + self.portfolio_invitation_changes.skip.append(invitation) diff --git a/src/registrar/management/commands/populate_first_ready.py b/src/registrar/management/commands/populate_first_ready.py index 04468029a..1cef2245d 100644 --- a/src/registrar/management/commands/populate_first_ready.py +++ b/src/registrar/management/commands/populate_first_ready.py @@ -51,7 +51,7 @@ class Command(BaseCommand): ScriptDataHelper.bulk_update_fields(Domain, self.to_update, ["first_ready"]) # Log what happened - TerminalHelper.log_script_run_summary(self.to_update, self.failed_to_update, self.skipped, debug) + TerminalHelper.log_script_run_summary(self.to_update, self.failed_to_update, self.skipped, [], debug=debug) def update_first_ready_for_domain(self, domain: Domain, debug: bool): """Grabs the created_at field and associates it with the first_ready column. diff --git a/src/registrar/management/commands/populate_organization_type.py b/src/registrar/management/commands/populate_organization_type.py index 60d179cb8..20cd6d130 100644 --- a/src/registrar/management/commands/populate_organization_type.py +++ b/src/registrar/management/commands/populate_organization_type.py @@ -144,7 +144,12 @@ class Command(BaseCommand): # Log what happened log_header = "============= FINISHED UPDATE FOR DOMAINREQUEST ===============" TerminalHelper.log_script_run_summary( - self.request_to_update, self.request_failed_to_update, self.request_skipped, True, log_header + self.request_to_update, + self.request_failed_to_update, + self.request_skipped, + [], + debug=True, + log_header=log_header, ) update_skipped_count = len(self.request_to_update) @@ -195,7 +200,7 @@ class Command(BaseCommand): # Log what happened log_header = "============= FINISHED UPDATE FOR DOMAININFORMATION ===============" TerminalHelper.log_script_run_summary( - self.di_to_update, self.di_failed_to_update, self.di_skipped, True, log_header + self.di_to_update, self.di_failed_to_update, self.di_skipped, [], debug=True, log_header=log_header ) update_skipped_count = len(self.di_skipped) diff --git a/src/registrar/management/commands/update_default_public_contacts.py b/src/registrar/management/commands/update_default_public_contacts.py new file mode 100644 index 000000000..ac8c542db --- /dev/null +++ b/src/registrar/management/commands/update_default_public_contacts.py @@ -0,0 +1,105 @@ +import logging +import argparse +from django.core.management import BaseCommand +from registrar.management.commands.utility.terminal_helper import PopulateScriptTemplate, TerminalColors +from registrar.models import PublicContact +from registrar.models.utility.generic_helper import normalize_string +from registrar.utility.enums import DefaultEmail + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand, PopulateScriptTemplate): + help = "Loops through each default PublicContact and updates some values on each" + + def add_arguments(self, parser): + """Adds command line arguments""" + parser.add_argument( + "--overwrite_updated_contacts", + action=argparse.BooleanOptionalAction, + help=( + "Loops over PublicContacts with an email of 'help@get.gov' when enabled." + "Use this setting if the record was updated in the DB but not correctly in EPP." + ), + ) + + parser.add_argument( + "--target_domain", + help=( + "Updates the public contact on a given domain name (case insensitive). " + "Use this option to avoid doing a mass-update of every public contact record." + ), + ) + + def handle(self, **kwargs): + """Loops through each valid User object and updates its verification_type value""" + overwrite_updated_contacts = kwargs.get("overwrite_updated_contacts") + target_domain = kwargs.get("target_domain") + default_emails = {email for email in DefaultEmail} + + # Don't update records we've already updated + if not overwrite_updated_contacts: + default_emails.remove(DefaultEmail.PUBLIC_CONTACT_DEFAULT) + + # We should only update DEFAULT records. This means that if all values are not default, + # we should skip as this could lead to data corruption. + # Since we check for all fields, we don't account for casing differences. + self.old_and_new_default_contact_values = { + "name": { + "csd/cb – attn: .gov tld", + "csd/cb – attn: cameron dixon", + "program manager", + "registry customer service", + }, + "street1": {"1110 n. glebe rd", "cisa – ngr stop 0645", "4200 wilson blvd."}, + "pc": {"22201", "20598-0645"}, + "email": default_emails, + } + if not target_domain: + filter_condition = {"email__in": default_emails} + else: + filter_condition = {"email__in": default_emails, "domain__name__iexact": target_domain} + # This variable is decorative since we are skipping bulk update + fields_to_update = ["name", "street1", "pc", "email"] + self.mass_update_records(PublicContact, filter_condition, fields_to_update, show_record_count=True) + + def bulk_update_fields(self, *args, **kwargs): + """Skip bulk update since we need to manually save each field. + Our EPP logic is tied to an override of .save(), and this also associates + with our caching logic for this area of the code. + + Since bulk update does not trigger .save() for each field, we have to + call it manually. + """ + return None + + def update_record(self, record: PublicContact): + """Defines how we update the verification_type field""" + record.name = "CSD/CB – Attn: .gov TLD" + record.street1 = "1110 N. Glebe Rd" + record.pc = "22201" + record.email = DefaultEmail.PUBLIC_CONTACT_DEFAULT + record.save() + logger.info(f"{TerminalColors.OKCYAN}Updated '{record}' in EPP.{TerminalColors.ENDC}") + + def should_skip_record(self, record) -> bool: # noqa + """Skips updating a public contact if it contains different default info.""" + if record.registry_id and len(record.registry_id) < 16: + message = ( + f"Skipping legacy verisign contact '{record}'. " + f"The registry_id field has a length less than 16 characters." + ) + logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}") + return True + + for key, expected_values in self.old_and_new_default_contact_values.items(): + record_field = normalize_string(getattr(record, key)) + if record_field not in expected_values: + message = ( + f"Skipping '{record}' to avoid potential data corruption. " + f"The field '{key}' does not match the default.\n" + f"Details: DB value - {record_field}, expected value(s) - {expected_values}" + ) + logger.warning(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}") + return True + return False diff --git a/src/registrar/management/commands/utility/terminal_helper.py b/src/registrar/management/commands/utility/terminal_helper.py index 87d9f12e5..b909de44f 100644 --- a/src/registrar/management/commands/utility/terminal_helper.py +++ b/src/registrar/management/commands/utility/terminal_helper.py @@ -32,7 +32,7 @@ class ScriptDataHelper: """Helper method with utilities to speed up development of scripts that do DB operations""" @staticmethod - def bulk_update_fields(model_class, update_list, fields_to_update, batch_size=1000): + def bulk_update_fields(model_class, update_list, fields_to_update, batch_size=1000, quiet=False): """ This function performs a bulk update operation on a specified Django model class in batches. It uses Django's Paginator to handle large datasets in a memory-efficient manner. @@ -51,9 +51,12 @@ class ScriptDataHelper: fields_to_update: Specifies which fields to update. Usage: - bulk_update_fields(Domain, page.object_list, ["first_ready"]) + ScriptDataHelper.bulk_update_fields(Domain, page.object_list, ["first_ready"]) + + Returns: A queryset of the updated objets """ - logger.info(f"{TerminalColors.YELLOW} Bulk updating fields... {TerminalColors.ENDC}") + if not quiet: + logger.info(f"{TerminalColors.YELLOW} Bulk updating fields... {TerminalColors.ENDC}") # Create a Paginator object. Bulk_update on the full dataset # is too memory intensive for our current app config, so we can chunk this data instead. paginator = Paginator(update_list, batch_size) @@ -61,6 +64,41 @@ class ScriptDataHelper: page = paginator.page(page_num) model_class.objects.bulk_update(page.object_list, fields_to_update) + @staticmethod + def bulk_create_fields(model_class, update_list, batch_size=1000, return_created=False, quiet=False): + """ + This function performs a bulk create operation on a specified Django model class in batches. + It uses Django's Paginator to handle large datasets in a memory-efficient manner. + + Parameters: + model_class: The Django model class that you want to perform the bulk update on. + This should be the actual class, not a string of the class name. + + update_list: A list of model instances that you want to update. Each instance in the list + should already have the updated values set on the instance. + + batch_size: The maximum number of model instances to update in a single database query. + Defaults to 1000. If you're dealing with models that have a large number of fields, + or large field values, you may need to decrease this value to prevent out-of-memory errors. + Usage: + ScriptDataHelper.bulk_add_fields(Domain, page.object_list) + + Returns: A queryset of the added objects + """ + if not quiet: + logger.info(f"{TerminalColors.YELLOW} Bulk adding fields... {TerminalColors.ENDC}") + + created_objs = [] + paginator = Paginator(update_list, batch_size) + for page_num in paginator.page_range: + page = paginator.page(page_num) + all_created = model_class.objects.bulk_create(page.object_list) + if return_created: + created_objs.extend([created.id for created in all_created]) + if return_created: + return model_class.objects.filter(id__in=created_objs) + return None + class PopulateScriptTemplate(ABC): """ @@ -86,7 +124,9 @@ class PopulateScriptTemplate(ABC): """ raise NotImplementedError - def mass_update_records(self, object_class, filter_conditions, fields_to_update, debug=True, verbose=False): + def mass_update_records( + self, object_class, filter_conditions, fields_to_update, debug=True, verbose=False, show_record_count=False + ): """Loops through each valid "object_class" object - specified by filter_conditions - and updates fields defined by fields_to_update using update_record. @@ -106,6 +146,9 @@ class PopulateScriptTemplate(ABC): verbose: Whether to print a detailed run summary *before* run confirmation. Default: False. + show_record_count: Whether to show a 'Record 1/10' dialog when running update. + Default: False. + Raises: NotImplementedError: If you do not define update_record before using this function. TypeError: If custom_filter is not Callable. @@ -115,19 +158,19 @@ class PopulateScriptTemplate(ABC): # apply custom filter records = self.custom_filter(records) + records_length = len(records) readable_class_name = self.get_class_name(object_class) # for use in the execution prompt. - proposed_changes = f"""==Proposed Changes== - Number of {readable_class_name} objects to change: {len(records)} - These fields will be updated on each record: {fields_to_update} - """ + proposed_changes = ( + "==Proposed Changes==\n" + f"Number of {readable_class_name} objects to change: {records_length}\n" + f"These fields will be updated on each record: {fields_to_update}" + ) if verbose: - proposed_changes = f"""{proposed_changes} - These records will be updated: {list(records.all())} - """ + proposed_changes = f"{proposed_changes}\n" f"These records will be updated: {list(records.all())}" # Code execution will stop here if the user prompts "N" TerminalHelper.prompt_for_execution( @@ -140,7 +183,9 @@ class PopulateScriptTemplate(ABC): to_update: List[object_class] = [] to_skip: List[object_class] = [] failed_to_update: List[object_class] = [] - for record in records: + for i, record in enumerate(records, start=1): + if show_record_count: + logger.info(f"{TerminalColors.BOLD}Record {i}/{records_length}{TerminalColors.ENDC}") try: if not self.should_skip_record(record): self.update_record(record) @@ -154,18 +199,23 @@ class PopulateScriptTemplate(ABC): logger.error(fail_message) # Do a bulk update on the desired field - ScriptDataHelper.bulk_update_fields(object_class, to_update, fields_to_update) + self.bulk_update_fields(object_class, to_update, fields_to_update) # Log what happened TerminalHelper.log_script_run_summary( to_update, failed_to_update, to_skip, + [], debug=debug, log_header=self.run_summary_header, display_as_str=True, ) + def bulk_update_fields(self, object_class, to_update, fields_to_update): + """Bulk updates the given fields""" + ScriptDataHelper.bulk_update_fields(object_class, to_update, fields_to_update) + def get_class_name(self, sender) -> str: """Returns the class name that we want to display for the terminal prompt. Example: DomainRequest => "Domain Request" @@ -190,81 +240,96 @@ class PopulateScriptTemplate(ABC): class TerminalHelper: + @staticmethod def log_script_run_summary( - to_update, failed_to_update, skipped, debug: bool, log_header=None, skipped_header=None, display_as_str=False + create, + update, + skip, + fail, + debug: bool, + log_header="============= FINISHED =============", + skipped_header="----- SOME DATA WAS INVALID (NEEDS MANUAL PATCHING) -----", + failed_header="----- UPDATE FAILED -----", + display_as_str=False, + detailed_prompt_title="Do you wish to see the full list of failed, skipped and updated records?", ): - """Prints success, failed, and skipped counts, as well as - all affected objects.""" - update_success_count = len(to_update) - update_failed_count = len(failed_to_update) - update_skipped_count = len(skipped) + """Generates a formatted summary of script execution results with colored output. - if log_header is None: - log_header = "============= FINISHED ===============" + Displays counts and details of successful, failed, and skipped operations. + In debug mode or when prompted, shows full record details. + Uses color coding: green for success, yellow for skipped, red for failures. - if skipped_header is None: - skipped_header = "----- SOME DATA WAS INVALID (NEEDS MANUAL PATCHING) -----" + Args: + to_update: Records that were successfully updated + failed_to_update: Records that failed to update + skipped: Records that were intentionally skipped + to_add: Records that were newly added + debug: If True, shows detailed record information + log_header: Custom header for the summary (default: "FINISHED") + skipped_header: Custom header for skipped records section + failed_header: Custom header for failed records section + display_as_str: If True, converts records to strings for display + + Output Format (if count > 0 for each category): + [log_header] + Created W entries + Updated X entries + [skipped_header] + Skipped updating Y entries + [failed_header] + Failed to update Z entries + + Debug output (if enabled): + - Directly prints each list for each category (add, update, etc) + - Converts each item to string if display_as_str is True + """ + counts = { + "created": len(create), + "updated": len(update), + "skipped": len(skip), + "failed": len(fail), + } # Give the user the option to see failed / skipped records if any exist. display_detailed_logs = False - if not debug and update_failed_count > 0 or update_skipped_count > 0: + if not debug and counts["failed"] > 0 or counts["skipped"] > 0: display_detailed_logs = TerminalHelper.prompt_for_execution( system_exit_on_terminate=False, - prompt_message=f"You will see {update_failed_count} failed and {update_skipped_count} skipped records.", + prompt_message=f'You will see {counts["failed"]} failed and {counts["skipped"]} skipped records.', verify_message="** Some records were skipped, or some failed to update. **", - prompt_title="Do you wish to see the full list of failed, skipped and updated records?", + prompt_title=detailed_prompt_title, ) - # Prepare debug messages - if debug or display_detailed_logs: - updated_display = [str(u) for u in to_update] if display_as_str else to_update - skipped_display = [str(s) for s in skipped] if display_as_str else skipped - failed_display = [str(f) for f in failed_to_update] if display_as_str else failed_to_update - debug_messages = { - "success": (f"{TerminalColors.OKCYAN}Updated: {updated_display}{TerminalColors.ENDC}\n"), - "skipped": (f"{TerminalColors.YELLOW}Skipped: {skipped_display}{TerminalColors.ENDC}\n"), - "failed": (f"{TerminalColors.FAIL}Failed: {failed_display}{TerminalColors.ENDC}\n"), - } + non_zero_counts = {category: count for category, count in counts.items() if count > 0} + messages = [] + for category, count in non_zero_counts.items(): + match category: + case "created": + label, values, debug_color = "Created", create, TerminalColors.OKBLUE + case "updated": + label, values, debug_color = "Updated", update, TerminalColors.OKCYAN + case "skipped": + label, values, debug_color = "Skipped updating", skip, TerminalColors.YELLOW + messages.append(skipped_header) + case "failed": + label, values, debug_color = "Failed to update", fail, TerminalColors.FAIL + messages.append(failed_header) + messages.append(f"{label} {count} entries") - # Print out a list of everything that was changed, if we have any changes to log. - # Otherwise, don't print anything. - TerminalHelper.print_conditional( - True, - f"{debug_messages.get('success') if update_success_count > 0 else ''}" - f"{debug_messages.get('skipped') if update_skipped_count > 0 else ''}" - f"{debug_messages.get('failed') if update_failed_count > 0 else ''}", - ) + # Print debug messages (prints the internal add, update, skip, fail lists) + if debug or display_detailed_logs: + display_values = [str(v) for v in values] if display_as_str else values + debug_message = f"{label}: {display_values}" + logger.info(f"{debug_color}{debug_message}{TerminalColors.ENDC}") - if update_failed_count == 0 and update_skipped_count == 0: - logger.info( - f"""{TerminalColors.OKGREEN} - {log_header} - Updated {update_success_count} entries - {TerminalColors.ENDC} - """ - ) - elif update_failed_count == 0: - logger.warning( - f"""{TerminalColors.YELLOW} - {log_header} - Updated {update_success_count} entries - {skipped_header} - Skipped updating {update_skipped_count} entries - {TerminalColors.ENDC} - """ - ) + final_message = f"\n{log_header}\n" + "\n".join(messages) + if counts["failed"] > 0: + logger.error(f"{TerminalColors.FAIL}{final_message}{TerminalColors.ENDC}") + elif counts["skipped"] > 0: + logger.warning(f"{TerminalColors.YELLOW}{final_message}{TerminalColors.ENDC}") else: - logger.error( - f"""{TerminalColors.FAIL} - {log_header} - Updated {update_success_count} entries - ----- UPDATE FAILED ----- - Failed to update {update_failed_count} entries, - Skipped updating {update_skipped_count} entries - {TerminalColors.ENDC} - """ - ) + logger.info(f"{TerminalColors.OKGREEN}{final_message}{TerminalColors.ENDC}") @staticmethod def query_yes_no(question: str, default="yes"): @@ -402,11 +467,11 @@ class TerminalHelper: # and ask if they wish to proceed proceed_execution = TerminalHelper.query_yes_no_exit( f"\n{TerminalColors.OKCYAN}" - "=====================================================" - f"\n{prompt_title}\n" - "=====================================================" - f"\n{verify_message}\n" - f"\n{prompt_message}\n" + "=====================================================\n" + f"{prompt_title}\n" + "=====================================================\n" + f"{verify_message}\n" + f"{prompt_message}\n" f"{TerminalColors.FAIL}" f"Proceed? (Y = proceed, N = {action_description_for_selecting_no})" f"{TerminalColors.ENDC}" @@ -463,4 +528,4 @@ class TerminalHelper: terminal_color = color colored_message = f"{terminal_color}{message}{TerminalColors.ENDC}" - log_method(colored_message, exc_info=exc_info) + return log_method(colored_message, exc_info=exc_info) diff --git a/src/registrar/migrations/0145_create_groups_v19.py b/src/registrar/migrations/0145_create_groups_v19.py new file mode 100644 index 000000000..63a50d26f --- /dev/null +++ b/src/registrar/migrations/0145_create_groups_v19.py @@ -0,0 +1,37 @@ +# This migration creates the create_full_access_group and create_cisa_analyst_group groups +# If permissions on the groups need changing, edit CISA_ANALYST_GROUP_PERMISSIONS +# in the user_group model then: +# [NOT RECOMMENDED] +# step 1: docker-compose exec app ./manage.py migrate --fake registrar 0035_contenttypes_permissions +# step 2: docker-compose exec app ./manage.py migrate registrar 0036_create_groups +# step 3: fake run the latest migration in the migrations list +# [RECOMMENDED] +# Alternatively: +# step 1: duplicate the migration that loads data +# step 2: docker-compose exec app ./manage.py migrate + +from django.db import migrations +from registrar.models import UserGroup +from typing import Any + + +# For linting: RunPython expects a function reference, +# so let's give it one +def create_groups(apps, schema_editor) -> Any: + UserGroup.create_cisa_analyst_group(apps, schema_editor) + UserGroup.create_omb_analyst_group(apps, schema_editor) + UserGroup.create_full_access_group(apps, schema_editor) + + +class Migration(migrations.Migration): + dependencies = [ + ("registrar", "0144_domainrequest_eop_stakeholder_email_and_more"), + ] + + operations = [ + migrations.RunPython( + create_groups, + reverse_code=migrations.RunPython.noop, + atomic=True, + ), + ] diff --git a/src/registrar/models/domain.py b/src/registrar/models/domain.py index 233f8f77c..63483c18e 100644 --- a/src/registrar/models/domain.py +++ b/src/registrar/models/domain.py @@ -256,6 +256,29 @@ class Domain(TimeStampedModel, DomainHelper): req = commands.CheckDomain([domain_name]) return registry.send(req, cleaned=True).res_data[0].avail + @classmethod + def is_pending_delete(cls, domain: str) -> bool: + """Check if domain is pendingDelete state via response from registry.""" + domain_name = domain.lower() + + try: + info_req = commands.InfoDomain(domain_name) + info_response = registry.send(info_req, cleaned=True) + # Ensure res_data exists and is not empty + if info_response and info_response.res_data: + # Use _extract_data_from_response bc it's same thing but jsonified + domain_response = cls._extract_data_from_response(cls, info_response) # type: ignore + domain_status_state = domain_response.get("statuses") + if "pendingDelete" in str(domain_status_state): + return True + except RegistryError as err: + if not err.is_connection_error(): + logger.info(f"Domain does not exist yet so it won't be in pending delete -- {err}") + return False + else: + raise err + return False + @classmethod def registered(cls, domain: str) -> bool: """Check if a domain is _not_ available.""" @@ -2026,7 +2049,9 @@ class Domain(TimeStampedModel, DomainHelper): def _extract_data_from_response(self, data_response): """extract data from response from registry""" + data = data_response.res_data[0] + return { "auth_info": getattr(data, "auth_info", ...), "_contacts": getattr(data, "contacts", ...), diff --git a/src/registrar/models/public_contact.py b/src/registrar/models/public_contact.py index 58ad5be92..fc0130c54 100644 --- a/src/registrar/models/public_contact.py +++ b/src/registrar/models/public_contact.py @@ -164,4 +164,4 @@ class PublicContact(TimeStampedModel): return cls._meta.get_field("registry_id").max_length def __str__(self): - return f"{self.name} <{self.email}>" f"id: {self.registry_id} " f"type: {self.contact_type}" + return self.registry_id diff --git a/src/registrar/models/user.py b/src/registrar/models/user.py index d5476ab9a..c59ab376b 100644 --- a/src/registrar/models/user.py +++ b/src/registrar/models/user.py @@ -474,7 +474,7 @@ class User(AbstractUser): admin_count = admins.count() # Check if the current user is in the list of admins - if admin_count == 1 and admins.first().user == self: + if admin_count == 1 and admins.first() and admins.first().user == self: return True # The user is the only admin # If there are other admins or the user is not the only one diff --git a/src/registrar/models/user_group.py b/src/registrar/models/user_group.py index 331e36605..9aa4345c9 100644 --- a/src/registrar/models/user_group.py +++ b/src/registrar/models/user_group.py @@ -90,6 +90,14 @@ class UserGroup(Group): "delete_userportfoliopermission", ], }, + { + "app_label": "registrar", + "model": "portfolioinvitation", + "permissions": [ + "add_portfolioinvitation", + "view_portfolioinvitation", + ], + }, ] # Avoid error: You can't execute queries until the end diff --git a/src/registrar/permissions.py b/src/registrar/permissions.py new file mode 100644 index 000000000..6847c16d5 --- /dev/null +++ b/src/registrar/permissions.py @@ -0,0 +1,229 @@ +""" +Centralized permissions management for the registrar. +""" + +from django.urls import URLResolver, get_resolver, URLPattern +from registrar.decorators import ( + HAS_PORTFOLIO_DOMAIN_REQUESTS_ANY_PERM, + IS_STAFF, + IS_DOMAIN_MANAGER, + IS_DOMAIN_MANAGER_AND_NOT_PORTFOLIO_MEMBER, + IS_PORTFOLIO_MEMBER_AND_DOMAIN_MANAGER, + IS_CISA_ANALYST, + IS_OMB_ANALYST, + IS_FULL_ACCESS, + IS_DOMAIN_REQUEST_CREATOR, + IS_STAFF_MANAGING_DOMAIN, + IS_PORTFOLIO_MEMBER, + HAS_PORTFOLIO_DOMAINS_ANY_PERM, + HAS_PORTFOLIO_DOMAINS_VIEW_ALL, + HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, + HAS_PORTFOLIO_DOMAIN_REQUESTS_VIEW_ALL, + HAS_PORTFOLIO_MEMBERS_EDIT, + HAS_PORTFOLIO_MEMBERS_ANY_PERM, + HAS_PORTFOLIO_MEMBERS_VIEW, + ALL, +) + +# Define permissions for each URL pattern by name +URL_PERMISSIONS = { + # Home & general pages + "home": [ALL], + "health": [ALL], # Intentionally no decorator + # Domain management + "domain": [HAS_PORTFOLIO_DOMAINS_VIEW_ALL, IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN], + "domain-dns": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN], + "domain-dns-nameservers": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN], + "domain-dns-dnssec": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN], + "domain-dns-dnssec-dsdata": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN], + "domain-org-name-address": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN], + "domain-suborganization": [IS_PORTFOLIO_MEMBER_AND_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN], + "domain-senior-official": [IS_DOMAIN_MANAGER_AND_NOT_PORTFOLIO_MEMBER, IS_STAFF_MANAGING_DOMAIN], + "domain-security-email": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN], + "domain-renewal": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN], + "domain-users": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN], + "domain-users-add": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN], + "domain-user-delete": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN], + # Portfolio management + "domains": [HAS_PORTFOLIO_DOMAINS_ANY_PERM], + "no-portfolio-domains": [IS_PORTFOLIO_MEMBER], + "no-organization-domains": [IS_PORTFOLIO_MEMBER], + "members": [HAS_PORTFOLIO_MEMBERS_ANY_PERM], + "member": [HAS_PORTFOLIO_MEMBERS_ANY_PERM], + "member-delete": [HAS_PORTFOLIO_MEMBERS_EDIT], + "member-permissions": [HAS_PORTFOLIO_MEMBERS_EDIT], + "member-domains": [HAS_PORTFOLIO_MEMBERS_ANY_PERM], + "member-domains-edit": [HAS_PORTFOLIO_MEMBERS_EDIT], + "invitedmember": [HAS_PORTFOLIO_MEMBERS_ANY_PERM], + "invitedmember-delete": [HAS_PORTFOLIO_MEMBERS_EDIT], + "invitedmember-permissions": [HAS_PORTFOLIO_MEMBERS_EDIT], + "invitedmember-domains": [HAS_PORTFOLIO_MEMBERS_ANY_PERM], + "invitedmember-domains-edit": [HAS_PORTFOLIO_MEMBERS_EDIT], + "new-member": [HAS_PORTFOLIO_MEMBERS_EDIT], + "domain-requests": [HAS_PORTFOLIO_DOMAIN_REQUESTS_ANY_PERM], + "no-portfolio-requests": [IS_PORTFOLIO_MEMBER], + "organization": [IS_PORTFOLIO_MEMBER], + "senior-official": [IS_PORTFOLIO_MEMBER], + # Domain requests + "domain-request-status": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "domain-request-status-viewonly": [HAS_PORTFOLIO_DOMAIN_REQUESTS_VIEW_ALL], + "domain-request-withdraw-confirmation": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "domain-request-withdrawn": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "domain-request-delete": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "edit-domain-request": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + # Admin functions + "analytics": [IS_CISA_ANALYST, IS_FULL_ACCESS], + "export_data_type": [IS_CISA_ANALYST, IS_FULL_ACCESS], + "export_data_full": [IS_CISA_ANALYST, IS_FULL_ACCESS], + "export_data_domain_requests_full": [IS_CISA_ANALYST, IS_FULL_ACCESS], + "export_data_federal": [IS_CISA_ANALYST, IS_FULL_ACCESS], + "export_domains_growth": [IS_CISA_ANALYST, IS_FULL_ACCESS], + "export_requests_growth": [IS_CISA_ANALYST, IS_FULL_ACCESS], + "export_managed_domains": [IS_CISA_ANALYST, IS_FULL_ACCESS], + "export_unmanaged_domains": [IS_CISA_ANALYST, IS_FULL_ACCESS], + "transfer_user": [IS_CISA_ANALYST, IS_FULL_ACCESS], + # Analytics + "all-domain-metadata": [IS_STAFF], + "current-full": [IS_STAFF], + "all-domain-requests-metadata": [IS_STAFF], + "domain-growth": [IS_STAFF], + "request-growth": [IS_STAFF], + "managed-domains": [IS_STAFF], + "unmanaged-domains": [IS_STAFF], + # Reports + "export-user-domains-as-csv": [IS_STAFF], + "export-portfolio-members-as-csv": [IS_STAFF], + "export_members_portfolio": [HAS_PORTFOLIO_MEMBERS_VIEW], + "export_data_type_user": [ALL], + # API endpoints + "get-senior-official-from-federal-agency-json": [IS_CISA_ANALYST, IS_FULL_ACCESS, IS_OMB_ANALYST], + "get-portfolio-json": [IS_CISA_ANALYST, IS_FULL_ACCESS, IS_OMB_ANALYST], + "get-suborganization-list-json": [IS_CISA_ANALYST, IS_FULL_ACCESS, IS_OMB_ANALYST], + "get-federal-and-portfolio-types-from-federal-agency-json": [IS_CISA_ANALYST, IS_FULL_ACCESS, IS_OMB_ANALYST], + "get-action-needed-email-for-user-json": [IS_CISA_ANALYST, IS_FULL_ACCESS, IS_OMB_ANALYST], + "get-rejection-email-for-user-json": [IS_CISA_ANALYST, IS_FULL_ACCESS, IS_OMB_ANALYST], + "get_domains_json": [ALL], + "get_domain_requests_json": [ALL], + "get_portfolio_members_json": [HAS_PORTFOLIO_MEMBERS_ANY_PERM], + "get_member_domains_json": [HAS_PORTFOLIO_MEMBERS_ANY_PERM], + # User profile + "finish-user-profile-setup": [ALL], + "user-profile": [ALL], + # Invitation + "invitation-cancel": [IS_DOMAIN_MANAGER, IS_STAFF_MANAGING_DOMAIN], + # DNS Hosting + "prototype-domain-dns": [IS_STAFF], + # Domain request wizard + "start": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "finished": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "generic_org_type": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "tribal_government": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "organization_federal": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "organization_election": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "organization_contact": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "about_your_organization": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "senior_official": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "current_sites": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "dotgov_domain": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "purpose": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "other_contacts": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "additional_details": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "requirements": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "review": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "portfolio_requesting_entity": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], + "portfolio_additional_details": [HAS_PORTFOLIO_DOMAIN_REQUESTS_EDIT, IS_DOMAIN_REQUEST_CREATOR], +} + +UNCHECKED_URLS = [ + "health", + "openid/", + "get-current-federal", + "get-current-full", + "available", + "rdap", + "todo", + "logout", +] + + +def verify_all_urls_have_permissions(): + """ + Utility function to verify that all URLs in the application have defined permissions + in the permissions mapping. + """ + + resolver = get_resolver() + missing_permissions = [] + missing_names = [] + + # Collect all URL pattern names + for pattern in resolver.url_patterns: + # Skip URLResolver objects (like admin.site.urls) + if isinstance(pattern, URLResolver): + continue + + if hasattr(pattern, "name") and pattern.name: + if pattern.name not in URL_PERMISSIONS and pattern.name not in UNCHECKED_URLS: + missing_permissions.append(pattern.name) + else: + raise ValueError(f"URL pattern {pattern} has no name") + + if missing_names: + raise ValueError(f"The following URL patterns have no name: {missing_names}") + + return missing_permissions + + +def validate_permissions(): # noqa: C901 + """ + Validates that all URL patterns have consistent permission rules between + the centralized mapping and view decorators. + + Returns a dictionary of issues found. + """ + + resolver = get_resolver() + issues = { + "missing_in_mapping": [], # URLs with decorators but not in mapping + "missing_decorator": [], # URLs in mapping but missing decorators + "permission_mismatch": [], # URLs with different permissions + } + + def check_url_pattern(pattern, parent_path=""): + if isinstance(pattern, URLPattern): + view_func = pattern.callback + path = f"{parent_path}/{pattern.pattern}" + url_name = pattern.name + + if url_name: + # Skip check for endpoints that intentionally have no decorator + if url_name in UNCHECKED_URLS: + return + + # Check if view has decorator but missing from mapping + if getattr(view_func, "has_explicit_access", False) and url_name not in URL_PERMISSIONS: + issues["missing_in_mapping"].append((url_name, path)) + + # Check if view is in mapping but missing decorator + elif url_name in URL_PERMISSIONS and not getattr(view_func, "has_explicit_access", False): + issues["missing_decorator"].append((url_name, path)) + + # Check if permissions match (more complex, may need refinement) + elif getattr(view_func, "has_explicit_access", False) and url_name in URL_PERMISSIONS: + view_permissions = getattr(view_func, "_access_rules", set()) + mapping_permissions = set(URL_PERMISSIONS[url_name]) + + if view_permissions != mapping_permissions: + issues["permission_mismatch"].append((url_name, path, view_permissions, mapping_permissions)) + + elif isinstance(pattern, URLResolver): + # Handle included URL patterns (nested) + new_parent = f"{parent_path}/{pattern.pattern}" + for p in pattern.url_patterns: + check_url_pattern(p, new_parent) + + # Check all URL patterns + for pattern in resolver.url_patterns: + check_url_pattern(pattern) + + return issues diff --git a/src/registrar/registrar_middleware.py b/src/registrar/registrar_middleware.py index 8c9c79876..36be3cd39 100644 --- a/src/registrar/registrar_middleware.py +++ b/src/registrar/registrar_middleware.py @@ -222,3 +222,31 @@ class RestrictAccessMiddleware: raise PermissionDenied # Deny access if the view lacks explicit permission handling return self.get_response(request) + + +class RequestLoggingMiddleware: + """ + Middleware to log user email, remote address, and request path. + """ + + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request): + response = self.get_response(request) + + # Only log in production (stable) + if getattr(settings, "IS_PRODUCTION", False): + # Get user email (if authenticated), else "Anonymous" + user_email = request.user.email if request.user.is_authenticated else "Anonymous" + + # Get remote IP address + remote_ip = request.META.get("REMOTE_ADDR", "Unknown IP") + + # Get request path + request_path = request.path + + # Log user information + logger.info(f"Router log | User: {user_email} | IP: {remote_ip} | Path: {request_path}") + + return response diff --git a/src/registrar/templates/domain_detail.html b/src/registrar/templates/domain_detail.html index a0d477249..e4c3f703d 100644 --- a/src/registrar/templates/domain_detail.html +++ b/src/registrar/templates/domain_detail.html @@ -73,9 +73,16 @@ {% if is_portfolio_user and not is_domain_manager %}
+ {% if not is_portfolio_admin %}

You don't have access to manage {{domain.name}}. If you need to make updates, contact one of the listed domain managers.

+ {% else %} + {% url 'member' member_pk=user_portfolio_permission.id as edit_member_url %} +

+ You don't have access to manage {{domain.name}}. If you need to become a domain manager, edit the domain assignments in your member profile. +

+ {% endif %}
{% endif %} diff --git a/src/registrar/templates/domain_request_intro.html b/src/registrar/templates/domain_request_intro.html index 263201393..281e65127 100644 --- a/src/registrar/templates/domain_request_intro.html +++ b/src/registrar/templates/domain_request_intro.html @@ -19,7 +19,7 @@

We’ll use the information you provide to verify your organization’s eligibility for a .gov domain. We’ll also verify that the domain you request meets our guidelines.

{% endif %}

Time to complete the form

-

If you have all the information you need, +

If you have all the information you need, completing your domain request might take around 15 minutes.

How we’ll reach you

While reviewing your domain request, we may need to reach out with questions. We’ll also email you when we complete our review. If the contact information below is not correct, visit your profile to make updates.

diff --git a/src/registrar/templates/domain_request_requirements.html b/src/registrar/templates/domain_request_requirements.html index 4d49b235e..4625162ea 100644 --- a/src/registrar/templates/domain_request_requirements.html +++ b/src/registrar/templates/domain_request_requirements.html @@ -49,7 +49,7 @@

Domain renewal

-

.Gov domains are registered for a one-year period. To renew your domain, you'll be asked to verify your organization’s eligibility and your contact information.

+

.Gov domains are registered for a one-year period. To renew the domain, you’ll be asked to verify your contact information and some details about the domain.

Though a domain may expire, it will not automatically be put on hold or deleted. We’ll make extensive efforts to contact your organization before holding or deleting a domain.

{% endblock %} diff --git a/src/registrar/templates/includes/members_table.html b/src/registrar/templates/includes/members_table.html index f93b0d3c8..2731b20d7 100644 --- a/src/registrar/templates/includes/members_table.html +++ b/src/registrar/templates/includes/members_table.html @@ -8,7 +8,7 @@
- {% with label_text="Search by member name" item_name="members" aria_label_text="Members search component"%} + {% with label_text="Search by member email address" item_name="members" aria_label_text="Members search component"%} {% include "includes/search.html" %} {% endwith %} {% with export_aria="Members report component" export_url='export_members_portfolio' %} diff --git a/src/registrar/templates/portfolio_member.html b/src/registrar/templates/portfolio_member.html index 1b9ffd653..1afe96161 100644 --- a/src/registrar/templates/portfolio_member.html +++ b/src/registrar/templates/portfolio_member.html @@ -98,6 +98,18 @@ Organization member {% if portfolio_permission %} + {% if member and member.id == request.user.id and is_only_admin %} +
+
+

+ You're the only admin for this organization. + Organizations must have at least one admin. + To remove yourself or change your member access, + you'll need to add another admin. +

+
+
+ {% endif %} {% include "includes/summary_item.html" with title='Member access and permissions' permissions=True value=portfolio_permission edit_link=edit_url editable=has_edit_members_portfolio_permission %} {% elif portfolio_invitation %} {% include "includes/summary_item.html" with title='Member access and permissions' permissions=True value=portfolio_invitation edit_link=edit_url editable=has_edit_members_portfolio_permission %} diff --git a/src/registrar/templates/portfolio_member_permissions.html b/src/registrar/templates/portfolio_member_permissions.html index e5ae5864e..59a8b1fd8 100644 --- a/src/registrar/templates/portfolio_member_permissions.html +++ b/src/registrar/templates/portfolio_member_permissions.html @@ -75,11 +75,25 @@ +

Member access

+ + {% if member and member.id == request.user.id and is_only_admin %} +
+
+

+ You're the only admin for this organization. + Organizations must have at least one admin. + To remove yourself or change your member access, + you'll need to add another admin. +

+
+
+ {% endif %} Select the level of access for this member. * @@ -109,4 +123,22 @@
+ + {% comment %} If an admin is trying to edit themselves, show a modal {% endcomment %} + {% if member and member.id == request.user.id and not is_only_admin %} + + + {% endif %} {% endblock portfolio_content%} diff --git a/src/registrar/tests/common.py b/src/registrar/tests/common.py index 220dbb198..3d91bad24 100644 --- a/src/registrar/tests/common.py +++ b/src/registrar/tests/common.py @@ -1092,7 +1092,7 @@ def completed_domain_request( # noqa email="testy@town.com", phone="(555) 555 5555", ) - domain, _ = DraftDomain.objects.get_or_create(name=name) + domain = DraftDomain.objects.create(name=name) other, _ = Contact.objects.get_or_create( first_name="Testy", last_name="Tester", @@ -1931,7 +1931,14 @@ class MockEppLib(TestCase): return MagicMock(res_data=[mocked_result]) def mockCreateContactCommands(self, _request, cleaned): - if getattr(_request, "id", None) == "fail" and self.mockedSendFunction.call_count == 3: + ids_to_throw_already_exists = [ + "failAdmin1234567", + "failTech12345678", + "failSec123456789", + "failReg123456789", + "fail", + ] + if getattr(_request, "id", None) in ids_to_throw_already_exists and self.mockedSendFunction.call_count == 3: # use this for when a contact is being updated # sets the second send() to fail raise RegistryError(code=ErrorCode.OBJECT_EXISTS) @@ -1946,7 +1953,14 @@ class MockEppLib(TestCase): return MagicMock(res_data=[self.mockDataInfoHosts]) def mockDeleteContactCommands(self, _request, cleaned): - if getattr(_request, "id", None) == "fail": + ids_to_throw_already_exists = [ + "failAdmin1234567", + "failTech12345678", + "failSec123456789", + "failReg123456789", + "fail", + ] + if getattr(_request, "id", None) in ids_to_throw_already_exists: raise RegistryError(code=ErrorCode.OBJECT_EXISTS) else: return MagicMock( diff --git a/src/registrar/tests/test_admin_request.py b/src/registrar/tests/test_admin_request.py index e6ad2ef3e..68fbf26e7 100644 --- a/src/registrar/tests/test_admin_request.py +++ b/src/registrar/tests/test_admin_request.py @@ -50,6 +50,7 @@ from unittest.mock import ANY, patch from django.conf import settings import boto3_mocking # type: ignore import logging +from django.contrib.messages.storage.fallback import FallbackStorage logger = logging.getLogger(__name__) @@ -2315,7 +2316,6 @@ class TestDomainRequestAdmin(MockEppLib): Used to test errors when saving a change with an active domain, also used to test side effects when saving a change goes through.""" - with less_console_noise(): # Create an instance of the model domain_request = completed_domain_request(status=DomainRequest.DomainRequestStatus.APPROVED) @@ -2428,6 +2428,44 @@ class TestDomainRequestAdmin(MockEppLib): "Cannot approve. Requested domain is already in use.", ) + def test_error_when_approving_domain_in_pending_delete_state(self): + """If in pendingDelete state from in review -> approve not allowed.""" + + # 1. Create domain + to_be_in_pending_deleted = completed_domain_request( + status=DomainRequest.DomainRequestStatus.SUBMITTED, name="meoward1.gov" + ) + to_be_in_pending_deleted.creator = self.superuser + + # 2. Put domain into in-review state + to_be_in_pending_deleted.status = DomainRequest.DomainRequestStatus.IN_REVIEW + to_be_in_pending_deleted.save() + + # 3. Update request as a superuser + request = self.factory.post(f"/admin/registrar/domainrequest/{to_be_in_pending_deleted.pk}/change/") + request.user = self.superuser + request.session = {} + + setattr(request, "_messages", FallbackStorage(request)) + + # 4. Use ExitStack for combine patching like above + with boto3_mocking.clients.handler_for("sesv2", self.mock_client), ExitStack() as stack: + # Patch django.contrib.messages.error + stack.enter_context(patch.object(messages, "error")) + + with patch("registrar.models.domain.Domain.is_pending_delete", return_value=True): + # Attempt to approve to_be_in_pending_deleted + # (which should fail due is_pending_delete == True so nothing can get approved) + to_be_in_pending_deleted.status = DomainRequest.DomainRequestStatus.APPROVED + + # Save the model with the patched request + self.admin.save_model(request, to_be_in_pending_deleted, None, True) + + messages.error.assert_called_once_with( + request, + "Domain of same name is currently in pending delete state.", + ) + @less_console_noise def test_no_error_when_saving_to_approved_and_domain_exists(self): """The negative of the redundant admin check on model transition not allowed.""" diff --git a/src/registrar/tests/test_forms.py b/src/registrar/tests/test_forms.py index 1c8b127ae..6e4b6adae 100644 --- a/src/registrar/tests/test_forms.py +++ b/src/registrar/tests/test_forms.py @@ -477,8 +477,8 @@ class TestBasePortfolioMemberForms(TestCase): self.assertTrue(form.is_valid(), f"Form {form_class.__name__} failed validation with data: {data}") return form - def _assert_form_has_error(self, form_class, data, field_name): - form = form_class(data=data) + def _assert_form_has_error(self, form_class, data, field_name, instance=None): + form = form_class(data=data, instance=instance) self.assertFalse(form.is_valid()) self.assertIn(field_name, form.errors) @@ -504,17 +504,23 @@ class TestBasePortfolioMemberForms(TestCase): "domain_permissions": "", # Simulate missing field "member_permissions": "", # Simulate missing field } + user_portfolio_permission, _ = UserPortfolioPermission.objects.get_or_create( + portfolio=self.portfolio, user=self.user + ) + portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create(portfolio=self.portfolio, email="hi@ho") # Check required fields for all forms - self._assert_form_has_error(PortfolioMemberForm, data, "domain_request_permissions") - self._assert_form_has_error(PortfolioMemberForm, data, "domain_permissions") - self._assert_form_has_error(PortfolioMemberForm, data, "member_permissions") - self._assert_form_has_error(PortfolioInvitedMemberForm, data, "domain_request_permissions") - self._assert_form_has_error(PortfolioInvitedMemberForm, data, "domain_permissions") - self._assert_form_has_error(PortfolioInvitedMemberForm, data, "member_permissions") - self._assert_form_has_error(PortfolioNewMemberForm, data, "domain_request_permissions") - self._assert_form_has_error(PortfolioNewMemberForm, data, "domain_permissions") - self._assert_form_has_error(PortfolioNewMemberForm, data, "member_permissions") + self._assert_form_has_error(PortfolioMemberForm, data, "domain_request_permissions", user_portfolio_permission) + self._assert_form_has_error(PortfolioMemberForm, data, "domain_permissions", user_portfolio_permission) + self._assert_form_has_error(PortfolioMemberForm, data, "member_permissions", user_portfolio_permission) + self._assert_form_has_error( + PortfolioInvitedMemberForm, data, "domain_request_permissions", portfolio_invitation + ) + self._assert_form_has_error(PortfolioInvitedMemberForm, data, "domain_permissions", portfolio_invitation) + self._assert_form_has_error(PortfolioInvitedMemberForm, data, "member_permissions", portfolio_invitation) + self._assert_form_has_error(PortfolioNewMemberForm, data, "domain_request_permissions", portfolio_invitation) + self._assert_form_has_error(PortfolioNewMemberForm, data, "domain_permissions", portfolio_invitation) + self._assert_form_has_error(PortfolioNewMemberForm, data, "member_permissions", portfolio_invitation) @less_console_noise_decorator def test_clean_validates_required_fields_for_admin_role(self): @@ -529,7 +535,6 @@ class TestBasePortfolioMemberForms(TestCase): portfolio=self.portfolio, user=self.user ) portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create(portfolio=self.portfolio, email="hi@ho") - data = { "role": UserPortfolioRoleChoices.ORGANIZATION_ADMIN.value, } @@ -677,6 +682,7 @@ class TestBasePortfolioMemberForms(TestCase): @less_console_noise_decorator def test_invalid_data_for_member(self): """Test invalid form submission for a member role with missing permissions.""" + portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create(portfolio=self.portfolio, email="hi@ho") data = { "email": "hi@ho.com", "portfolio": self.portfolio.id, @@ -685,5 +691,5 @@ class TestBasePortfolioMemberForms(TestCase): "member_permissions": "", # Missing field "domain_permissions": "", # Missing field } - self._assert_form_has_error(PortfolioMemberForm, data, "domain_request_permissions") - self._assert_form_has_error(PortfolioInvitedMemberForm, data, "member_permissions") + self._assert_form_has_error(PortfolioMemberForm, data, "domain_request_permissions", portfolio_invitation) + self._assert_form_has_error(PortfolioInvitedMemberForm, data, "member_permissions", portfolio_invitation) diff --git a/src/registrar/tests/test_management_scripts.py b/src/registrar/tests/test_management_scripts.py index 110feea85..65e052ae1 100644 --- a/src/registrar/tests/test_management_scripts.py +++ b/src/registrar/tests/test_management_scripts.py @@ -32,6 +32,7 @@ from registrar.models import ( Portfolio, Suborganization, ) +from registrar.utility.enums import DefaultEmail import tablib from unittest.mock import patch, call, MagicMock, mock_open from epplibwrapper import commands, common @@ -1473,6 +1474,7 @@ class TestCreateFederalPortfolio(TestCase): generic_org_type=DomainRequest.OrganizationChoices.CITY, federal_agency=self.federal_agency, user=self.user, + organization_name="Testorg", ) self.domain_request.approve() self.domain_info = DomainInformation.objects.filter(domain_request=self.domain_request).get() @@ -1529,13 +1531,10 @@ class TestCreateFederalPortfolio(TestCase): @less_console_noise_decorator def test_post_process_started_domain_requests_existing_portfolio(self): - """Ensures that federal agency is cleared when agency name matches portfolio name. - As the name implies, this implicitly tests the "post_process_started_domain_requests" function. - """ + """Ensures that federal agency is cleared when agency name matches portfolio name.""" federal_agency_2 = FederalAgency.objects.create(agency="Sugarcane", federal_type=BranchChoices.EXECUTIVE) # Test records with portfolios and no org names - # Create a portfolio. This script skips over "started" portfolio = Portfolio.objects.create(organization_name="Sugarcane", creator=self.user) # Create a domain request with matching org name matching_request = completed_domain_request( @@ -1822,12 +1821,12 @@ class TestCreateFederalPortfolio(TestCase): # We expect a error to be thrown when we dont pass parse requests or domains with self.assertRaisesRegex( - CommandError, "You must specify at least one of --parse_requests, --parse_domains, or --add_managers." + CommandError, "You must specify at least one of --parse_requests, --parse_domains, or --parse_managers." ): self.run_create_federal_portfolio(branch="executive") with self.assertRaisesRegex( - CommandError, "You must specify at least one of --parse_requests, --parse_domains, or --add_managers." + CommandError, "You must specify at least one of --parse_requests, --parse_domains, or --parse_managers." ): self.run_create_federal_portfolio(agency_name="test") @@ -1877,7 +1876,9 @@ class TestCreateFederalPortfolio(TestCase): UserDomainRole.objects.create(user=manager2, domain=self.domain, role=UserDomainRole.Roles.MANAGER) # Run the management command - self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_domains=True, add_managers=True) + self.run_create_federal_portfolio( + agency_name=self.federal_agency.agency, parse_domains=True, parse_managers=True + ) # Check that the portfolio was created self.portfolio = Portfolio.objects.get(federal_agency=self.federal_agency) @@ -1900,7 +1901,9 @@ class TestCreateFederalPortfolio(TestCase): ) # Run the management command - self.run_create_federal_portfolio(agency_name=self.federal_agency.agency, parse_domains=True, add_managers=True) + self.run_create_federal_portfolio( + agency_name=self.federal_agency.agency, parse_domains=True, parse_managers=True + ) # Check that the portfolio was created self.portfolio = Portfolio.objects.get(federal_agency=self.federal_agency) @@ -1917,7 +1920,7 @@ class TestCreateFederalPortfolio(TestCase): # Verify that no duplicate invitations are created self.run_create_federal_portfolio( - agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True + agency_name=self.federal_agency.agency, parse_requests=True, parse_managers=True ) invitations = PortfolioInvitation.objects.filter(email="manager1@example.com", portfolio=self.portfolio) self.assertEqual( @@ -1945,7 +1948,7 @@ class TestCreateFederalPortfolio(TestCase): # Run the management command self.run_create_federal_portfolio( - agency_name=self.federal_agency.agency, parse_requests=True, add_managers=True + agency_name=self.federal_agency.agency, parse_requests=True, parse_managers=True ) # Ensure that the manager is not duplicated @@ -1993,13 +1996,13 @@ class TestCreateFederalPortfolio(TestCase): self.run_create_federal_portfolio( agency_name=self.federal_agency.agency, parse_requests=True, - add_managers=True, + parse_managers=True, skip_existing_portfolios=True, ) - # Check that managers were added to the portfolio + # Check that managers weren't added to the portfolio permissions = UserPortfolioPermission.objects.filter(portfolio=self.portfolio, user__in=[manager1, manager2]) - self.assertEqual(permissions.count(), 2) + self.assertEqual(permissions.count(), 0) for perm in permissions: self.assertIn(UserPortfolioRoleChoices.ORGANIZATION_MEMBER, perm.roles) @@ -2506,3 +2509,189 @@ class TestRemovePortfolios(TestCase): # Check that the portfolio was deleted self.assertFalse(Portfolio.objects.filter(organization_name="Test with suborg").exists()) + + +class TestUpdateDefaultPublicContacts(MockEppLib): + """Tests for the update_default_public_contacts management command.""" + + @less_console_noise_decorator + def setUp(self): + """Setup test data with PublicContact records.""" + super().setUp() + self.domain_request = completed_domain_request( + name="testdomain.gov", + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + ) + self.domain_request.approve() + self.domain = self.domain_request.approved_domain + + # 1. PublicContact with all old default values + self.old_default_contact = self.domain.get_default_administrative_contact() + self.old_default_contact.registry_id = "failAdmin1234567" + self.old_default_contact.name = "CSD/CB – ATTN: Cameron Dixon" + self.old_default_contact.street1 = "CISA – NGR STOP 0645" + self.old_default_contact.pc = "20598-0645" + self.old_default_contact.email = DefaultEmail.OLD_PUBLIC_CONTACT_DEFAULT + self.old_default_contact.save() + + # 2. PublicContact with current default email but old values for other fields + self.mixed_default_contact = self.domain.get_default_technical_contact() + self.mixed_default_contact.registry_id = "failTech12345678" + self.mixed_default_contact.save(skip_epp_save=True) + self.mixed_default_contact.name = "registry customer service" + self.mixed_default_contact.street1 = "4200 Wilson Blvd." + self.mixed_default_contact.pc = "22201" + self.mixed_default_contact.email = DefaultEmail.PUBLIC_CONTACT_DEFAULT + self.mixed_default_contact.save() + + # 3. PublicContact with non-default values + self.non_default_contact = self.domain.get_default_security_contact() + self.non_default_contact.registry_id = "failSec123456789" + self.non_default_contact.domain = self.domain + self.non_default_contact.save(skip_epp_save=True) + self.non_default_contact.name = "Hotdogs" + self.non_default_contact.street1 = "123 hotdog town" + self.non_default_contact.pc = "22111" + self.non_default_contact.email = "thehotdogman@igorville.gov" + self.non_default_contact.save() + + # 4. Create a default contact but with an old email + self.default_registrant_old_email = self.domain.get_default_registrant_contact() + self.default_registrant_old_email.registry_id = "failReg123456789" + self.default_registrant_old_email.email = DefaultEmail.LEGACY_DEFAULT + self.default_registrant_old_email.save() + DF = common.DiscloseField + excluded_disclose_fields = {DF.NOTIFY_EMAIL, DF.VAT, DF.IDENT} + self.all_disclose_fields = {field for field in DF} - excluded_disclose_fields + + def tearDown(self): + """Clean up test data.""" + super().tearDown() + PublicContact.objects.all().delete() + Domain.objects.all().delete() + DomainRequest.objects.all().delete() + DomainInformation.objects.all().delete() + User.objects.all().delete() + + @patch("registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", return_value=True) + @less_console_noise_decorator + def run_update_default_public_contacts(self, mock_prompt, **kwargs): + """Execute the update_default_public_contacts command with options.""" + call_command("update_default_public_contacts", **kwargs) + + # @less_console_noise_decorator + def test_updates_old_default_contact(self): + """ + Test that contacts with old default values are updated to new default values. + Also tests for string normalization. + """ + self.run_update_default_public_contacts() + self.old_default_contact.refresh_from_db() + + # Verify updates occurred + self.assertEqual(self.old_default_contact.name, "CSD/CB – Attn: .gov TLD") + self.assertEqual(self.old_default_contact.street1, "1110 N. Glebe Rd") + self.assertEqual(self.old_default_contact.pc, "22201") + self.assertEqual(self.old_default_contact.email, DefaultEmail.PUBLIC_CONTACT_DEFAULT) + + # Verify EPP create/update calls were made + expected_update = self._convertPublicContactToEpp( + self.old_default_contact, + disclose=False, + disclose_fields=self.all_disclose_fields - {"name", "email", "voice", "addr"}, + ) + self.mockedSendFunction.assert_any_call(expected_update, cleaned=True) + + @less_console_noise_decorator + def test_updates_with_default_contact_values(self): + """ + Test that contacts created from the default helper function with old email are updated. + """ + self.run_update_default_public_contacts() + self.default_registrant_old_email.refresh_from_db() + + # Verify updates occurred + self.assertEqual(self.default_registrant_old_email.name, "CSD/CB – Attn: .gov TLD") + self.assertEqual(self.default_registrant_old_email.street1, "1110 N. Glebe Rd") + self.assertEqual(self.default_registrant_old_email.pc, "22201") + self.assertEqual(self.default_registrant_old_email.email, DefaultEmail.PUBLIC_CONTACT_DEFAULT) + + # Verify values match the default + default_reg = PublicContact.get_default_registrant() + self.assertEqual(self.default_registrant_old_email.name, default_reg.name) + self.assertEqual(self.default_registrant_old_email.street1, default_reg.street1) + self.assertEqual(self.default_registrant_old_email.pc, default_reg.pc) + self.assertEqual(self.default_registrant_old_email.email, default_reg.email) + + # Verify EPP create/update calls were made + expected_update = self._convertPublicContactToEpp( + self.default_registrant_old_email, disclose=False, disclose_fields=self.all_disclose_fields + ) + self.mockedSendFunction.assert_any_call(expected_update, cleaned=True) + + @less_console_noise_decorator + def test_skips_non_default_contacts(self): + """ + Test that contacts with non-default values are skipped. + """ + original_name = self.non_default_contact.name + original_street1 = self.non_default_contact.street1 + original_pc = self.non_default_contact.pc + original_email = self.non_default_contact.email + + self.run_update_default_public_contacts() + self.non_default_contact.refresh_from_db() + + # Verify no updates occurred + self.assertEqual(self.non_default_contact.name, original_name) + self.assertEqual(self.non_default_contact.street1, original_street1) + self.assertEqual(self.non_default_contact.pc, original_pc) + self.assertEqual(self.non_default_contact.email, original_email) + + # Ensure that the update is still skipped even with the override flag + self.run_update_default_public_contacts(overwrite_updated_contacts=True) + self.non_default_contact.refresh_from_db() + + # Verify no updates occurred + self.assertEqual(self.non_default_contact.name, original_name) + self.assertEqual(self.non_default_contact.street1, original_street1) + self.assertEqual(self.non_default_contact.pc, original_pc) + self.assertEqual(self.non_default_contact.email, original_email) + + @less_console_noise_decorator + def test_skips_contacts_with_current_default_email_by_default(self): + """ + Test that contacts with the current default email are skipped when not using the override flag. + """ + # Get original values + original_name = self.mixed_default_contact.name + original_street1 = self.mixed_default_contact.street1 + + self.run_update_default_public_contacts() + self.mixed_default_contact.refresh_from_db() + + # Verify no updates occurred + self.assertEqual(self.mixed_default_contact.name, original_name) + self.assertEqual(self.mixed_default_contact.street1, original_street1) + self.assertEqual(self.mixed_default_contact.email, DefaultEmail.PUBLIC_CONTACT_DEFAULT) + + @less_console_noise_decorator + def test_updates_with_overwrite_flag(self): + """ + Test that contacts with the current default email are updated when using the override flag. + """ + # Run the command with the override flag + self.run_update_default_public_contacts(overwrite_updated_contacts=True) + self.mixed_default_contact.refresh_from_db() + + # Verify updates occurred + self.assertEqual(self.mixed_default_contact.name, "CSD/CB – Attn: .gov TLD") + self.assertEqual(self.mixed_default_contact.street1, "1110 N. Glebe Rd") + self.assertEqual(self.mixed_default_contact.pc, "22201") + self.assertEqual(self.mixed_default_contact.email, DefaultEmail.PUBLIC_CONTACT_DEFAULT) + + # Verify EPP create/update calls were made + expected_update = self._convertPublicContactToEpp( + self.mixed_default_contact, disclose=False, disclose_fields=self.all_disclose_fields + ) + self.mockedSendFunction.assert_any_call(expected_update, cleaned=True) diff --git a/src/registrar/tests/test_middleware_logging.py b/src/registrar/tests/test_middleware_logging.py new file mode 100644 index 000000000..d84c25af5 --- /dev/null +++ b/src/registrar/tests/test_middleware_logging.py @@ -0,0 +1,48 @@ +from django.test import TestCase, RequestFactory, override_settings +from unittest.mock import patch, MagicMock +from django.contrib.auth.models import AnonymousUser, User + +from registrar.registrar_middleware import RequestLoggingMiddleware + + +class RequestLoggingMiddlewareTest(TestCase): + """Test 'our' middleware logging.""" + + def setUp(self): + self.factory = RequestFactory() + self.get_response_mock = MagicMock() + self.middleware = RequestLoggingMiddleware(self.get_response_mock) + + @override_settings(IS_PRODUCTION=True) # Scopes change to this test only + @patch("logging.Logger.info") + def test_logging_enabled_in_production(self, mock_logger): + """Test that logging occurs when IS_PRODUCTION is True""" + request = self.factory.get("/test-path", **{"REMOTE_ADDR": "Unknown IP"}) # Override IP + request.user = User(username="testuser", email="testuser@example.com") + + self.middleware(request) # Call middleware + + mock_logger.assert_called_once_with( + "Router log | User: testuser@example.com | IP: Unknown IP | Path: /test-path" + ) + + @patch("logging.Logger.info") + def test_logging_disabled_in_non_production(self, mock_logger): + """Test that logging does not occur when IS_PRODUCTION is False""" + request = self.factory.get("/test-path") + request.user = User(username="testuser", email="testuser@example.com") + + self.middleware(request) # Call middleware + + mock_logger.assert_not_called() # Ensure no logs are generated + + @override_settings(IS_PRODUCTION=True) # Scopes change to this test only + @patch("logging.Logger.info") + def test_logging_anonymous_user(self, mock_logger): + """Test logging for an anonymous user""" + request = self.factory.get("/anonymous-path", **{"REMOTE_ADDR": "Unknown IP"}) # Override IP + request.user = AnonymousUser() # Simulate an anonymous user + + self.middleware(request) # Call middleware + + mock_logger.assert_called_once_with("Router log | User: Anonymous | IP: Unknown IP | Path: /anonymous-path") diff --git a/src/registrar/tests/test_migrations.py b/src/registrar/tests/test_migrations.py index 11daca870..d6d643d8b 100644 --- a/src/registrar/tests/test_migrations.py +++ b/src/registrar/tests/test_migrations.py @@ -43,6 +43,8 @@ class TestGroups(TestCase): "add_portfolio", "change_portfolio", "delete_portfolio", + "add_portfolioinvitation", + "view_portfolioinvitation", "add_seniorofficial", "change_seniorofficial", "delete_seniorofficial", diff --git a/src/registrar/tests/test_models_domain.py b/src/registrar/tests/test_models_domain.py index d3ec7b24e..53788aeff 100644 --- a/src/registrar/tests/test_models_domain.py +++ b/src/registrar/tests/test_models_domain.py @@ -666,6 +666,66 @@ class TestDomainAvailable(MockEppLib): self.assertFalse(available) patcher.stop() + def test_is_pending_delete(self): + """ + Scenario: Testing if a domain is in pendingDelete status from the registry + Should return True + + * Mock EPP response with pendingDelete status + * Validate InfoDomain command is called + * Validate response given mock + """ + + with patch("registrar.models.domain.registry.send") as mocked_send, patch( + "registrar.models.domain.Domain._extract_data_from_response" + ) as mocked_extract: + + # Mock the registry response + mock_response = MagicMock() + mock_response.res_data = [MagicMock(statuses=[MagicMock(state="pendingDelete")])] + mocked_send.return_value = mock_response + + # Mock JSONified response + mocked_extract.return_value = {"statuses": ["pendingDelete"]} + + result = Domain.is_pending_delete("is-pending-delete.gov") + + mocked_send.assert_called_once_with(commands.InfoDomain("is-pending-delete.gov"), cleaned=True) + mocked_extract.assert_called_once() + + self.assertTrue(result) + + def test_is_not_pending_delete(self): + """ + Scenario: Testing if a domain is NOT in pendingDelete status. + Should return False. + + * Mock EPP response without pendingDelete status (isserverTransferProhibited) + * Validate response given mock + """ + + with patch("registrar.models.domain.registry.send") as mocked_send, patch( + "registrar.models.domain.Domain._extract_data_from_response" + ) as mocked_extract: + + # Mock the registry response + mock_response = MagicMock() + mock_response.res_data = [ + MagicMock(statuses=[MagicMock(state="serverTransferProhibited"), MagicMock(state="ok")]) + ] + mocked_send.return_value = mock_response + + # Mock JSONified response + mocked_extract.return_value = {"statuses": ["serverTransferProhibited", "ok"]} + + result = Domain.is_pending_delete("is-not-pending.gov") + + # Assertions + mocked_send.assert_called_once_with(commands.InfoDomain("is-not-pending.gov"), cleaned=True) + mocked_extract.assert_called_once() + + self.assertFalse(result) + def test_domain_available_with_invalid_error(self): """ Scenario: Testing whether an invalid domain is available diff --git a/src/registrar/tests/test_permissions.py b/src/registrar/tests/test_permissions.py new file mode 100644 index 000000000..380518fdb --- /dev/null +++ b/src/registrar/tests/test_permissions.py @@ -0,0 +1,70 @@ +""" +Tests for validating the permissions system consistency. + +These tests ensure that: +1. All URLs have permissions defined in the centralized mapping +2. All views with permission decorators are in the mapping +3. The permissions in the decorators match those in the mapping +""" + +from django.test import TestCase +from registrar.permissions import verify_all_urls_have_permissions, validate_permissions + + +class TestPermissionsMapping(TestCase): + """Test the centralized permissions mapping for completeness and consistency.""" + + def test_all_urls_have_permissions(self): + """Verify that all URL patterns in the application have permissions defined in the mapping.""" + missing_urls = verify_all_urls_have_permissions() + + # Format URLs for better readability in case of failure + if missing_urls: + formatted_urls = "\n".join([f" - {url}" for url in missing_urls]) + self.fail( + f"The following URL patterns are missing from URL_PERMISSIONS mapping:\n{formatted_urls}\n" + f"Please add them to the URL_PERMISSIONS dictionary in registrar/permissions.py" + ) + + def test_permission_decorator_consistency(self): + """ + Test that all views have consistent permission rules between + the centralized mapping and view decorators. + """ + issues = validate_permissions() + + error_messages = [] + + if issues["missing_in_mapping"]: + urls = "\n".join([f" - {name} (at {path})" for name, path in issues["missing_in_mapping"]]) + error_messages.append( + f"The following URLs have permission decorators but are missing from the mapping:\n{urls}\n" + "Add these URLs to the URL_PERMISSIONS dictionary in registrar/permissions.py" + ) + + if issues["missing_decorator"]: + urls = "\n".join([f" - {name} (at {path})" for name, path in issues["missing_decorator"]]) + error_messages.append( + f"The following URLs are in the mapping but missing @grant_access decorators:\n{urls}\n" + "Add appropriate @grant_access decorators to these views" + ) + + if issues["permission_mismatch"]: + mismatches = [] + for name, path, view_perms, mapping_perms in issues["permission_mismatch"]: + view_perms_str = ", ".join(sorted(str(p) for p in view_perms)) + mapping_perms_str = ", ".join(sorted(str(p) for p in mapping_perms)) + mismatches.append( + f" - {name} (at {path}):\n" + f" Decorator: [{view_perms_str}]\n" + f" Mapping: [{mapping_perms_str}]" + ) + + error_messages.append( + f"The following URLs have mismatched permissions between decorators and mapping:\n" + f"{chr(10).join(mismatches)}\n" + "Update either the decorator or the mapping to ensure consistency" + ) + + if error_messages: + self.fail("\n\n".join(error_messages)) diff --git a/src/registrar/tests/test_reports.py b/src/registrar/tests/test_reports.py index 236e810cf..e8322db61 100644 --- a/src/registrar/tests/test_reports.py +++ b/src/registrar/tests/test_reports.py @@ -438,12 +438,6 @@ class ExportDataTest(MockDbForIndividualTests, MockEppLib): self.assertIn(dd_3.name, csv_content) self.assertNotIn(dd_2.name, csv_content) - # Get the csv content - csv_content = self._run_domain_request_data_type_user_export(request) - self.assertIn(dd_1.name, csv_content) - self.assertIn(dd_3.name, csv_content) - self.assertNotIn(dd_2.name, csv_content) - portfolio_permission.roles = [UserPortfolioRoleChoices.ORGANIZATION_MEMBER] portfolio_permission.save() portfolio_permission.refresh_from_db() diff --git a/src/registrar/tests/test_views_domain.py b/src/registrar/tests/test_views_domain.py index 0e76406f8..7de065d4b 100644 --- a/src/registrar/tests/test_views_domain.py +++ b/src/registrar/tests/test_views_domain.py @@ -441,10 +441,11 @@ class TestDomainDetail(TestDomainOverview): user.refresh_from_db() self.client.force_login(user) detail_page = self.client.get(f"/domain/{domain.id}") - # Check that alert message displays properly + # Check that alert message displays properly. + # This message is different for one user on the portfolio vs multiple. self.assertContains( detail_page, - "If you need to make updates, contact one of the listed domain managers.", + "If you need to become a domain manager, edit the domain assignments", ) # Check that user does not have option to Edit domain self.assertNotContains(detail_page, "Edit") diff --git a/src/registrar/tests/test_views_portfolio.py b/src/registrar/tests/test_views_portfolio.py index f490f51ad..81be0d3ad 100644 --- a/src/registrar/tests/test_views_portfolio.py +++ b/src/registrar/tests/test_views_portfolio.py @@ -1826,10 +1826,7 @@ class TestPortfolioMemberDeleteView(WebTest): ) self.assertEqual(response.status_code, 400) - expected_error_message = ( - "There must be at least one admin in your organization. Give another member admin " - "permissions, make sure they log into the registrar, and then remove this member." - ) + expected_error_message = "the only admin for this organization" self.assertContains(response, expected_error_message, status_code=400) # assert that send_portfolio_admin_removal_emails is not called @@ -2155,17 +2152,14 @@ class TestPortfolioMemberDeleteView(WebTest): self.assertEqual(response.status_code, 302) - expected_error_message = ( - "There must be at least one admin in your organization. Give another member admin " - "permissions, make sure they log into the registrar, and then remove this member." - ) + expected_error_message = "the only admin for this organization." args, kwargs = mock_error.call_args # Check if first arg is a WSGIRequest, confirms request object passed correctly # WSGIRequest protocol is basically the HTTPRequest but in Django form (ie POST '/member/1/delete') self.assertIsInstance(args[0], WSGIRequest) # Check that the error message matches the expected error message - self.assertEqual(args[1], expected_error_message) + self.assertIn(expected_error_message, args[1]) # Location is used for a 3xx HTTP status code to indicate that the URL was redirected # and then confirm that we're still on the Manage Members page @@ -4639,6 +4633,17 @@ class TestPortfolioMemberEditView(WebTest): # Get the user's admin permission admin_permission = UserPortfolioPermission.objects.get(user=self.user, portfolio=self.portfolio) + # Create a second permission so the user isn't just deleting themselves + member = create_test_user() + UserPortfolioPermission.objects.create( + user=member, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] + ) + + # First, verify that the change modal is on the page + response = self.client.get(reverse("member-permissions", kwargs={"member_pk": admin_permission.id})) + self.assertEqual(response.status_code, 200) + self.assertContains(response, "Yes, change my access") + response = self.client.post( reverse("member-permissions", kwargs={"member_pk": admin_permission.id}), { @@ -4652,6 +4657,39 @@ class TestPortfolioMemberEditView(WebTest): self.assertEqual(response.status_code, 302) self.assertEqual(response["Location"], reverse("home")) + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("organization_members", active=True) + def test_admin_removing_own_admin_role_only_admin(self): + """Tests that admin removing their own admin role when they are the only admin + throws a validation error. + """ + + self.client.force_login(self.user) + + # Get the user's admin permission + admin_permission = UserPortfolioPermission.objects.get(user=self.user, portfolio=self.portfolio) + + # First, verify that the info alert is present on the page + response = self.client.get(reverse("member-permissions", kwargs={"member_pk": admin_permission.id})) + self.assertEqual(response.status_code, 200) + self.assertContains(response, "To remove yourself or change your member access") + + # Then, verify that the right form error is shown + response = self.client.post( + reverse("member-permissions", kwargs={"member_pk": admin_permission.id}), + { + "role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER, + "domain_permissions": UserPortfolioPermissionChoices.VIEW_MANAGED_DOMAINS, + "member_permissions": "no_access", + "domain_request_permissions": "no_access", + }, + ) + + self.assertEqual(response.status_code, 200) + error_message = "the only admin for this organization" + self.assertIn(error_message, str(response.context["form"].errors)) + class TestPortfolioInvitedMemberEditView(WebTest): """Tests for the edit invited member page on portfolios""" diff --git a/src/registrar/utility/csv_export.py b/src/registrar/utility/csv_export.py index 6d6a7325d..0406c84e8 100644 --- a/src/registrar/utility/csv_export.py +++ b/src/registrar/utility/csv_export.py @@ -37,7 +37,6 @@ from django.contrib.postgres.aggregates import ArrayAgg, StringAgg from django.contrib.admin.models import LogEntry, ADDITION from django.contrib.contenttypes.models import ContentType from registrar.models.utility.generic_helper import convert_queryset_to_dict -from registrar.models.utility.orm_helper import ArrayRemoveNull from registrar.templatetags.custom_filters import get_region from registrar.utility.constants import BranchChoices from registrar.utility.enums import DefaultEmail, DefaultUserValues @@ -392,10 +391,17 @@ class MemberExport(BaseExport): ) # Invitations - domain_invitations = DomainInvitation.objects.filter( - email=OuterRef("email"), # Check if email matches the OuterRef("email") - domain__domain_info__portfolio=portfolio, # Check if the domain's portfolio matches the given portfolio - ).annotate(domain_info=F("domain__name")) + domain_invitations = Subquery( + DomainInvitation.objects.filter( + email=OuterRef("email"), + domain__domain_info__portfolio=portfolio, + status=DomainInvitation.DomainInvitationStatus.INVITED, + ) + .values("email") # Select a stable field + .annotate(domain_list=ArrayAgg("domain__name", distinct=True)) # Aggregate within subquery + .values("domain_list") # Ensure only one value is returned + ) + invitations = ( PortfolioInvitation.objects.exclude(status=PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED) .filter(portfolio=portfolio) @@ -407,23 +413,19 @@ class MemberExport(BaseExport): additional_permissions_display=F("additional_permissions"), member_display=F("email"), # Use ArrayRemove to return an empty list when no domain invitations are found - domain_info=ArrayRemoveNull( - ArrayAgg( - Subquery(domain_invitations.values("domain_info")), - distinct=True, - ) - ), + domain_info=domain_invitations, type=Value("invitedmember", output_field=CharField()), joined_date=Value("Unretrieved", output_field=CharField()), invited_by=cls.get_invited_by_query(object_id_query=Cast(OuterRef("id"), output_field=CharField())), ) .values(*shared_columns) ) + # Adding a order_by increases output predictability. # Doesn't matter as much for normal use, but makes tests easier. # We should also just be ordering by default anyway. members = permissions.union(invitations).order_by("email_display", "member_display", "first_name", "last_name") - return convert_queryset_to_dict(members, is_model=False) + return convert_queryset_to_dict(members, is_model=False, key="email_display") @classmethod def get_invited_by_query(cls, object_id_query): diff --git a/src/registrar/utility/errors.py b/src/registrar/utility/errors.py index 1e5e9562a..68ac12eb7 100644 --- a/src/registrar/utility/errors.py +++ b/src/registrar/utility/errors.py @@ -129,6 +129,7 @@ class FSMErrorCodes(IntEnum): - 3 INVESTIGATOR_NOT_STAFF Investigator is a non-staff user - 4 NO_REJECTION_REASON No rejection reason is specified - 5 NO_ACTION_NEEDED_REASON No action needed reason is specified + - 6 DOMAIN_IS_PENDING_DELETE Domain is in pending delete state """ APPROVE_DOMAIN_IN_USE = 1 @@ -136,6 +137,7 @@ class FSMErrorCodes(IntEnum): INVESTIGATOR_NOT_STAFF = 3 NO_REJECTION_REASON = 4 NO_ACTION_NEEDED_REASON = 5 + DOMAIN_IS_PENDING_DELETE = 6 class FSMDomainRequestError(Exception): @@ -150,6 +152,7 @@ class FSMDomainRequestError(Exception): FSMErrorCodes.INVESTIGATOR_NOT_STAFF: ("Investigator is not a staff user."), FSMErrorCodes.NO_REJECTION_REASON: ("A reason is required for this status."), FSMErrorCodes.NO_ACTION_NEEDED_REASON: ("A reason is required for this status."), + FSMErrorCodes.DOMAIN_IS_PENDING_DELETE: ("Domain of same name is currently in pending delete state."), } def __init__(self, *args, code=None, **kwargs): diff --git a/src/registrar/views/domain.py b/src/registrar/views/domain.py index c38a23385..0f7876d25 100644 --- a/src/registrar/views/domain.py +++ b/src/registrar/views/domain.py @@ -15,6 +15,7 @@ from registrar.decorators import ( IS_DOMAIN_MANAGER, IS_DOMAIN_MANAGER_AND_NOT_PORTFOLIO_MEMBER, IS_PORTFOLIO_MEMBER_AND_DOMAIN_MANAGER, + IS_STAFF, IS_STAFF_MANAGING_DOMAIN, grant_access, ) @@ -405,6 +406,9 @@ class DomainView(DomainBaseView): default_emails = DefaultEmail.get_all_emails() context["hidden_security_emails"] = default_emails + context["user_portfolio_permission"] = UserPortfolioPermission.objects.filter( + user=self.request.user, portfolio=self.request.session.get("portfolio") + ).first() security_email = self.object.get_security_email() if security_email is None or security_email in default_emails: @@ -707,6 +711,7 @@ class PrototypeDomainDNSRecordForm(forms.Form): ) +@grant_access(IS_STAFF) class PrototypeDomainDNSRecordView(DomainFormBaseView): template_name = "prototype_domain_dns.html" form_class = PrototypeDomainDNSRecordForm diff --git a/src/registrar/views/portfolio_members_json.py b/src/registrar/views/portfolio_members_json.py index 46b295025..aea4aa801 100644 --- a/src/registrar/views/portfolio_members_json.py +++ b/src/registrar/views/portfolio_members_json.py @@ -205,12 +205,7 @@ class PortfolioMembersJson(View): return queryset def serialize_members(self, request, portfolio, item, user): - # Check if the user can edit other users - user_can_edit_other_users = any( - user.has_perm(perm) for perm in ["registrar.full_access_permission", "registrar.change_user"] - ) - - view_only = not user.has_edit_members_portfolio_permission(portfolio) or not user_can_edit_other_users + view_only = not user.has_edit_members_portfolio_permission(portfolio) is_admin = UserPortfolioRoleChoices.ORGANIZATION_ADMIN in (item.get("roles") or []) diff --git a/src/registrar/views/portfolios.py b/src/registrar/views/portfolios.py index 952793084..2a9b9684d 100644 --- a/src/registrar/views/portfolios.py +++ b/src/registrar/views/portfolios.py @@ -114,6 +114,7 @@ class PortfolioMemberView(DetailView, View): "member_has_view_members_portfolio_permission": member_has_view_members_portfolio_permission, "member_has_edit_members_portfolio_permission": member_has_edit_members_portfolio_permission, "member_has_view_all_domains_portfolio_permission": member_has_view_all_domains_portfolio_permission, + "is_only_admin": request.user.is_only_admin_of_portfolio(portfolio_permission.portfolio), }, ) @@ -160,8 +161,8 @@ class PortfolioMemberDeleteView(View): ) if member.is_only_admin_of_portfolio(portfolio): return ( - "There must be at least one admin in your organization. Give another member admin " - "permissions, make sure they log into the registrar, and then remove this member." + "You can't remove yourself because you're the only admin for this organization. " + "To remove yourself, you'll need to add another admin." ) return None @@ -257,9 +258,7 @@ class PortfolioMemberEditView(DetailView, View): def get(self, request, member_pk): portfolio_permission = get_object_or_404(UserPortfolioPermission, pk=member_pk) user = portfolio_permission.user - form = self.form_class(instance=portfolio_permission) - return render( request, self.template_name, @@ -267,6 +266,7 @@ class PortfolioMemberEditView(DetailView, View): "form": form, "member": user, "portfolio_permission": portfolio_permission, + "is_only_admin": request.user.is_only_admin_of_portfolio(portfolio_permission.portfolio), }, ) @@ -307,15 +307,17 @@ class PortfolioMemberEditView(DetailView, View): form.save() messages.success(self.request, "The member access and permission changes have been saved.") return redirect("member", member_pk=member_pk) if not removing_admin_role_on_self else redirect("home") - - return render( - request, - self.template_name, - { - "form": form, - "member": user, # Pass the user object again to the template - }, - ) + else: + return render( + request, + self.template_name, + { + "form": form, + "member": user, + "portfolio_permission": portfolio_permission, + "is_only_admin": request.user.is_only_admin_of_portfolio(portfolio_permission.portfolio), + }, + ) def _handle_exceptions(self, exception): """Handle exceptions raised during the process."""