diff --git a/docs/operations/data_migration.md b/docs/operations/data_migration.md index 0863aa0b7..499c0840f 100644 --- a/docs/operations/data_migration.md +++ b/docs/operations/data_migration.md @@ -918,3 +918,38 @@ Example (only requests): `./manage.py create_federal_portfolio --branch "executi - Parameters #1-#2: Either `--agency_name` or `--branch` must be specified. Not both. - Parameters #2-#3, you cannot use `--both` while using these. You must specify either `--parse_requests` or `--parse_domains` seperately. While all of these parameters are optional in that you do not need to specify all of them, you must specify at least one to run this script. + + +## Patch suborganizations +This script deletes some duplicate suborganization data that exists in our database (one-time use). +It works in two ways: +1. If the only name difference between two suborg records is extra spaces or a capitalization difference, +then we delete all duplicate records of this type. +2. If the suborg name is one we manually specify to delete via the script. + +Before it deletes records, it goes through each DomainInformation and DomainRequest object and updates the reference to "sub_organization" to match the non-duplicative record. + +### Running on sandboxes + +#### Step 1: Login to CloudFoundry +```cf login -a api.fr.cloud.gov --sso``` + +#### Step 2: SSH into your environment +```cf ssh getgov-{space}``` + +Example: `cf ssh getgov-za` + +#### Step 3: Create a shell instance +```/tmp/lifecycle/shell``` + +#### Step 4: Upload your csv to the desired sandbox +[Follow these steps](#use-scp-to-transfer-data-to-sandboxes) to upload the federal_cio csv to a sandbox of your choice. + +#### Step 5: Running the script +To create a specific portfolio: +```./manage.py patch_suborganizations``` + +### Running locally + +#### Step 1: Running the script +```docker-compose exec app ./manage.py patch_suborganizations``` diff --git a/src/registrar/admin.py b/src/registrar/admin.py index 849cb6100..1b6e2de6a 100644 --- a/src/registrar/admin.py +++ b/src/registrar/admin.py @@ -14,6 +14,7 @@ from django.db.models import ( from django.db.models.functions import Concat, Coalesce from django.http import HttpResponseRedirect from registrar.models.federal_agency import FederalAgency +from registrar.models.portfolio_invitation import PortfolioInvitation from registrar.utility.admin_helpers import ( AutocompleteSelectWithPlaceholder, get_action_needed_reason_default_email, @@ -27,8 +28,12 @@ from django.shortcuts import redirect from django_fsm import get_available_FIELD_transitions, FSMField from registrar.models import DomainInformation, Portfolio, UserPortfolioPermission, DomainInvitation from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices -from registrar.utility.email import EmailSendingError -from registrar.utility.email_invitations import send_portfolio_invitation_email +from registrar.utility.email_invitations import send_domain_invitation_email, send_portfolio_invitation_email +from registrar.views.utility.invitation_helper import ( + get_org_membership, + get_requested_user, + handle_invitation_exceptions, +) from waffle.decorators import flag_is_active from django.contrib import admin, messages from django.contrib.auth.admin import UserAdmin as BaseUserAdmin @@ -41,7 +46,7 @@ from waffle.admin import FlagAdmin from waffle.models import Sample, Switch from registrar.models import Contact, Domain, DomainRequest, DraftDomain, User, Website, SeniorOfficial from registrar.utility.constants import BranchChoices -from registrar.utility.errors import FSMDomainRequestError, FSMErrorCodes, MissingEmailError +from registrar.utility.errors import FSMDomainRequestError, FSMErrorCodes from registrar.utility.waffle import flag_is_active_for_user from registrar.views.utility.mixins import OrderableFieldsMixin from django.contrib.admin.views.main import ORDER_VAR @@ -1389,7 +1394,78 @@ class UserDomainRoleAdmin(ListHeaderAdmin, ImportExportModelAdmin): return super().changeform_view(request, object_id, form_url, extra_context=extra_context) -class DomainInvitationAdmin(ListHeaderAdmin): +class BaseInvitationAdmin(ListHeaderAdmin): + """Base class for admin classes which will customize save_model and send email invitations + on model adds, and require custom handling of forms and form errors.""" + + def response_add(self, request, obj, post_url_continue=None): + """ + Override response_add to handle rendering when exceptions are raised during add model. + + Normal flow on successful save_model on add is to redirect to changelist_view. + If there are errors, flow is modified to instead render change form. + """ + # store current messages from request so that they are preserved throughout the method + storage = get_messages(request) + # Check if there are any error or warning messages in the `messages` framework + has_errors = any(message.level_tag in ["error", "warning"] for message in storage) + + if has_errors: + # Re-render the change form if there are errors or warnings + # Prepare context for rendering the change form + + # Get the model form + ModelForm = self.get_form(request, obj=obj) + form = ModelForm(instance=obj) + + # Create an AdminForm instance + admin_form = AdminForm( + form, + list(self.get_fieldsets(request, obj)), + self.get_prepopulated_fields(request, obj), + self.get_readonly_fields(request, obj), + model_admin=self, + ) + media = self.media + form.media + + opts = obj._meta + change_form_context = { + **self.admin_site.each_context(request), # Add admin context + "title": f"Add {opts.verbose_name}", + "opts": opts, + "original": obj, + "save_as": self.save_as, + "has_change_permission": self.has_change_permission(request, obj), + "add": True, # Indicate this is an "Add" form + "change": False, # Indicate this is not a "Change" form + "is_popup": False, + "inline_admin_formsets": [], + "save_on_top": self.save_on_top, + "show_delete": self.has_delete_permission(request, obj), + "obj": obj, + "adminform": admin_form, # Pass the AdminForm instance + "media": media, + "errors": None, + } + return self.render_change_form( + request, + context=change_form_context, + add=True, + change=False, + obj=obj, + ) + + response = super().response_add(request, obj, post_url_continue) + + # Re-add all messages from storage after `super().response_add` + # as super().response_add resets the success messages in request + for message in storage: + messages.add_message(request, message.level, message.message) + + return response + + +class DomainInvitationAdmin(BaseInvitationAdmin): """Custom domain invitation admin class.""" class Meta: @@ -1442,14 +1518,60 @@ class DomainInvitationAdmin(ListHeaderAdmin): which will be successful if a single User exists for that email; otherwise, will just continue to create the invitation. """ - if not change and User.objects.filter(email=obj.email).count() == 1: - # Domain Invitation creation for an existing User - obj.retrieve() - # Call the parent save method to save the object - super().save_model(request, obj, form, change) + if not change: + domain = obj.domain + domain_org = getattr(domain.domain_info, "portfolio", None) + requested_email = obj.email + # Look up a user with that email + requested_user = get_requested_user(requested_email) + requestor = request.user + + member_of_a_different_org, member_of_this_org = get_org_membership( + domain_org, requested_email, requested_user + ) + + try: + if ( + flag_is_active(request, "organization_feature") + and not flag_is_active(request, "multiple_portfolios") + and domain_org is not None + and not member_of_this_org + and not member_of_a_different_org + ): + send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=domain_org) + portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create( + email=requested_email, + portfolio=domain_org, + roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER], + ) + # if user exists for email, immediately retrieve portfolio invitation upon creation + if requested_user is not None: + portfolio_invitation.retrieve() + portfolio_invitation.save() + messages.success(request, f"{requested_email} has been invited to the organization: {domain_org}") + + send_domain_invitation_email( + email=requested_email, + requestor=requestor, + domains=domain, + is_member_of_different_org=member_of_a_different_org, + requested_user=requested_user, + ) + if requested_user is not None: + # Domain Invitation creation for an existing User + obj.retrieve() + # Call the parent save method to save the object + super().save_model(request, obj, form, change) + messages.success(request, f"{requested_email} has been invited to the domain: {domain}") + except Exception as e: + handle_invitation_exceptions(request, e, requested_email) + return + else: + # Call the parent save method to save the object + super().save_model(request, obj, form, change) -class PortfolioInvitationAdmin(ListHeaderAdmin): +class PortfolioInvitationAdmin(BaseInvitationAdmin): """Custom portfolio invitation admin class.""" form = PortfolioInvitationAdminForm @@ -1472,7 +1594,7 @@ class PortfolioInvitationAdmin(ListHeaderAdmin): # Search search_fields = [ "email", - "portfolio__name", + "portfolio__organization_name", ] # Filters @@ -1510,6 +1632,8 @@ class PortfolioInvitationAdmin(ListHeaderAdmin): portfolio = obj.portfolio requested_email = obj.email requestor = request.user + # Look up a user with that email + requested_user = get_requested_user(requested_email) permission_exists = UserPortfolioPermission.objects.filter( user__email=requested_email, portfolio=portfolio, user__email__isnull=False @@ -1518,98 +1642,19 @@ class PortfolioInvitationAdmin(ListHeaderAdmin): if not permission_exists: # if permission does not exist for a user with requested_email, send email send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=portfolio) + # if user exists for email, immediately retrieve portfolio invitation upon creation + if requested_user is not None: + obj.retrieve() messages.success(request, f"{requested_email} has been invited.") else: messages.warning(request, "User is already a member of this portfolio.") except Exception as e: # when exception is raised, handle and do not save the model - self._handle_exceptions(e, request, obj) + handle_invitation_exceptions(request, e, requested_email) return # Call the parent save method to save the object super().save_model(request, obj, form, change) - def _handle_exceptions(self, exception, request, obj): - """Handle exceptions raised during the process. - - Log warnings / errors, and message errors to the user. - """ - if isinstance(exception, EmailSendingError): - logger.warning( - "Could not sent email invitation to %s for portfolio %s (EmailSendingError)", - obj.email, - obj.portfolio, - exc_info=True, - ) - messages.error(request, "Could not send email invitation. Portfolio invitation not saved.") - elif isinstance(exception, MissingEmailError): - messages.error(request, str(exception)) - logger.error( - f"Can't send email to '{obj.email}' for portfolio '{obj.portfolio}'. " - f"No email exists for the requestor.", - exc_info=True, - ) - - else: - logger.warning("Could not send email invitation (Other Exception)", exc_info=True) - messages.error(request, "Could not send email invitation. Portfolio invitation not saved.") - - def response_add(self, request, obj, post_url_continue=None): - """ - Override response_add to handle rendering when exceptions are raised during add model. - - Normal flow on successful save_model on add is to redirect to changelist_view. - If there are errors, flow is modified to instead render change form. - """ - # Check if there are any error or warning messages in the `messages` framework - storage = get_messages(request) - has_errors = any(message.level_tag in ["error", "warning"] for message in storage) - - if has_errors: - # Re-render the change form if there are errors or warnings - # Prepare context for rendering the change form - - # Get the model form - ModelForm = self.get_form(request, obj=obj) - form = ModelForm(instance=obj) - - # Create an AdminForm instance - admin_form = AdminForm( - form, - list(self.get_fieldsets(request, obj)), - self.get_prepopulated_fields(request, obj), - self.get_readonly_fields(request, obj), - model_admin=self, - ) - media = self.media + form.media - - opts = obj._meta - change_form_context = { - **self.admin_site.each_context(request), # Add admin context - "title": f"Add {opts.verbose_name}", - "opts": opts, - "original": obj, - "save_as": self.save_as, - "has_change_permission": self.has_change_permission(request, obj), - "add": True, # Indicate this is an "Add" form - "change": False, # Indicate this is not a "Change" form - "is_popup": False, - "inline_admin_formsets": [], - "save_on_top": self.save_on_top, - "show_delete": self.has_delete_permission(request, obj), - "obj": obj, - "adminform": admin_form, # Pass the AdminForm instance - "media": media, - "errors": None, - } - return self.render_change_form( - request, - context=change_form_context, - add=True, - change=False, - obj=obj, - ) - return super().response_add(request, obj, post_url_continue) - class DomainInformationResource(resources.ModelResource): """defines how each field in the referenced model should be mapped to the corresponding fields in the @@ -2782,7 +2827,9 @@ class DomainRequestAdmin(ListHeaderAdmin, ImportExportModelAdmin): try: # Retrieve and order audit log entries by timestamp in descending order - audit_log_entries = LogEntry.objects.filter(object_id=object_id).order_by("-timestamp") + audit_log_entries = LogEntry.objects.filter( + object_id=object_id, content_type__model="domainrequest" + ).order_by("-timestamp") # Process each log entry to filter based on the change criteria for log_entry in audit_log_entries: diff --git a/src/registrar/assets/src/sass/_theme/_tooltips.scss b/src/registrar/assets/src/sass/_theme/_tooltips.scss index 58beb8ae6..65bfbb483 100644 --- a/src/registrar/assets/src/sass/_theme/_tooltips.scss +++ b/src/registrar/assets/src/sass/_theme/_tooltips.scss @@ -66,9 +66,9 @@ text-align: center; font-size: inherit; //inherit tooltip fontsize of .93rem max-width: fit-content; + display: block; @include at-media('desktop') { width: 70vw; } - display: block; } } \ No newline at end of file diff --git a/src/registrar/management/commands/create_federal_portfolio.py b/src/registrar/management/commands/create_federal_portfolio.py index 389662ead..c56b4ff6b 100644 --- a/src/registrar/management/commands/create_federal_portfolio.py +++ b/src/registrar/management/commands/create_federal_portfolio.py @@ -6,6 +6,7 @@ from django.core.management import BaseCommand, CommandError from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper from registrar.models import DomainInformation, DomainRequest, FederalAgency, Suborganization, Portfolio, User from registrar.models.utility.generic_helper import normalize_string +from django.db.models import F, Q logger = logging.getLogger(__name__) @@ -104,12 +105,17 @@ class Command(BaseCommand): message = f"Failed to create portfolio '{federal_agency.agency}'" TerminalHelper.colorful_logger(logger.info, TerminalColors.FAIL, message) + # POST PROCESS STEP: Add additional suborg info where applicable. + updated_suborg_count = self.post_process_all_suborganization_fields(agencies) + message = f"Added city and state_territory information to {updated_suborg_count} suborgs." + TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message) + TerminalHelper.log_script_run_summary( self.updated_portfolios, self.failed_portfolios, self.skipped_portfolios, debug=False, - skipped_header="----- SOME PORTFOLIOS WERE SKIPPED -----", + skipped_header="----- SOME PORTFOLIOS WERENT CREATED -----", display_as_str=True, ) @@ -169,14 +175,11 @@ class Command(BaseCommand): def handle_populate_portfolio(self, federal_agency, parse_domains, parse_requests, both): """Attempts to create a portfolio. If successful, this function will - also create new suborganizations. - Returns the portfolio for the given federal_agency. - """ - portfolio, created = self.create_portfolio(federal_agency) - if created: - self.create_suborganizations(portfolio, federal_agency) - if parse_domains or both: - self.handle_portfolio_domains(portfolio, federal_agency) + also create new suborganizations""" + portfolio, _ = self.create_portfolio(federal_agency) + self.create_suborganizations(portfolio, federal_agency) + if parse_domains or both: + self.handle_portfolio_domains(portfolio, federal_agency) if parse_requests or both: self.handle_portfolio_requests(portfolio, federal_agency) @@ -233,7 +236,6 @@ class Command(BaseCommand): federal_agency=federal_agency, organization_name__isnull=False ) org_names = set(valid_agencies.values_list("organization_name", flat=True)) - if not org_names: message = ( "Could not add any suborganizations." @@ -352,3 +354,141 @@ class Command(BaseCommand): DomainInformation.objects.bulk_update(domain_infos, ["portfolio", "sub_organization"]) message = f"Added portfolio '{portfolio}' to {len(domain_infos)} domains." TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message) + + def post_process_all_suborganization_fields(self, agencies): + """Batch updates suborganization locations from domain and request data. + + Args: + agencies: List of FederalAgency objects to process + + Returns: + int: Number of suborganizations updated + + Priority for location data: + 1. Domain information + 2. Domain request suborganization fields + 3. Domain request standard fields + """ + # Common filter between domaininformation / domain request. + # Filter by only the agencies we've updated thus far. + # Then, only process records without null portfolio, org name, or suborg name. + base_filter = Q( + federal_agency__in=agencies, + portfolio__isnull=False, + organization_name__isnull=False, + sub_organization__isnull=False, + ) & ~Q(organization_name__iexact=F("portfolio__organization_name")) + + # First: Remove null city / state_territory values on domain info / domain requests. + # We want to add city data if there is data to add to begin with! + domains = DomainInformation.objects.filter( + base_filter, + Q(city__isnull=False, state_territory__isnull=False), + ) + requests = DomainRequest.objects.filter( + base_filter, + ( + Q(city__isnull=False, state_territory__isnull=False) + | Q(suborganization_city__isnull=False, suborganization_state_territory__isnull=False) + ), + ) + + # Second: Group domains and requests by normalized organization name. + # This means that later down the line we have to account for "duplicate" org names. + domains_dict = {} + requests_dict = {} + for domain in domains: + normalized_name = normalize_string(domain.organization_name) + domains_dict.setdefault(normalized_name, []).append(domain) + + for request in requests: + normalized_name = normalize_string(request.organization_name) + requests_dict.setdefault(normalized_name, []).append(request) + + # Third: Get suborganizations to update + suborgs_to_edit = Suborganization.objects.filter( + Q(id__in=domains.values_list("sub_organization", flat=True)) + | Q(id__in=requests.values_list("sub_organization", flat=True)) + ) + + # Fourth: Process each suborg to add city / state territory info + for suborg in suborgs_to_edit: + self.post_process_suborganization_fields(suborg, domains_dict, requests_dict) + + # Fifth: Perform a bulk update + return Suborganization.objects.bulk_update(suborgs_to_edit, ["city", "state_territory"]) + + def post_process_suborganization_fields(self, suborg, domains_dict, requests_dict): + """Updates a single suborganization's location data if valid. + + Args: + suborg: Suborganization to update + domains_dict: Dict of domain info records grouped by org name + requests_dict: Dict of domain requests grouped by org name + + Priority matches parent method. Updates are skipped if location data conflicts + between multiple records of the same type. + """ + normalized_suborg_name = normalize_string(suborg.name) + domains = domains_dict.get(normalized_suborg_name, []) + requests = requests_dict.get(normalized_suborg_name, []) + + # Try to get matching domain info + domain = None + if domains: + reference = domains[0] + use_location_for_domain = all( + d.city == reference.city and d.state_territory == reference.state_territory for d in domains + ) + if use_location_for_domain: + domain = reference + + # Try to get matching request info + # Uses consensus: if all city / state_territory info matches, then we can assume the data is "good". + # If not, take the safe route and just skip updating this particular record. + request = None + use_suborg_location_for_request = True + use_location_for_request = True + if requests: + reference = requests[0] + use_suborg_location_for_request = all( + r.suborganization_city + and r.suborganization_state_territory + and r.suborganization_city == reference.suborganization_city + and r.suborganization_state_territory == reference.suborganization_state_territory + for r in requests + ) + use_location_for_request = all( + r.city + and r.state_territory + and r.city == reference.city + and r.state_territory == reference.state_territory + for r in requests + ) + if use_suborg_location_for_request or use_location_for_request: + request = reference + + if not domain and not request: + message = f"Skipping adding city / state_territory information to suborg: {suborg}. Bad data." + TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, message) + return + + # PRIORITY: + # 1. Domain info + # 2. Domain request requested suborg fields + # 3. Domain request normal fields + if domain: + suborg.city = normalize_string(domain.city, lowercase=False) + suborg.state_territory = domain.state_territory + elif request and use_suborg_location_for_request: + suborg.city = normalize_string(request.suborganization_city, lowercase=False) + suborg.state_territory = request.suborganization_state_territory + elif request and use_location_for_request: + suborg.city = normalize_string(request.city, lowercase=False) + suborg.state_territory = request.state_territory + + message = ( + f"Added city/state_territory to suborg: {suborg}. " + f"city - {suborg.city}, state - {suborg.state_territory}" + ) + TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message) diff --git a/src/registrar/management/commands/patch_suborganizations.py b/src/registrar/management/commands/patch_suborganizations.py new file mode 100644 index 000000000..98ff1e36f --- /dev/null +++ b/src/registrar/management/commands/patch_suborganizations.py @@ -0,0 +1,133 @@ +import logging +from django.core.management import BaseCommand +from registrar.models import Suborganization, DomainRequest, DomainInformation +from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper +from registrar.models.utility.generic_helper import count_capitals, normalize_string + + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Clean up duplicate suborganizations that differ only by spaces and capitalization" + + def handle(self, **kwargs): + """Process manual deletions and find/remove duplicates. Shows preview + and updates DomainInformation / DomainRequest sub_organization references before deletion.""" + + # First: get a preset list of records we want to delete. + # For extra_records_to_prune: the key gets deleted, the value gets kept. + extra_records_to_prune = { + normalize_string("Assistant Secretary for Preparedness and Response Office of the Secretary"): { + "replace_with": "Assistant Secretary for Preparedness and Response, Office of the Secretary" + }, + normalize_string("US Geological Survey"): {"replace_with": "U.S. Geological Survey"}, + normalize_string("USDA/OC"): {"replace_with": "USDA, Office of Communications"}, + normalize_string("GSA, IC, OGP WebPortfolio"): {"replace_with": "GSA, IC, OGP Web Portfolio"}, + normalize_string("USDA/ARS/NAL"): {"replace_with": "USDA, ARS, NAL"}, + } + + # Second: loop through every Suborganization and return a dict of what to keep, and what to delete + # for each duplicate or "incorrect" record. We do this by pruning records with extra spaces or bad caps + # Note that "extra_records_to_prune" is just a manual mapping. + records_to_prune = self.get_records_to_prune(extra_records_to_prune) + if len(records_to_prune) == 0: + TerminalHelper.colorful_logger(logger.error, TerminalColors.FAIL, "No suborganizations to delete.") + return + + # Third: Build a preview of the changes + total_records_to_remove = 0 + preview_lines = ["The following records will be removed:"] + for data in records_to_prune.values(): + keep = data.get("keep") + delete = data.get("delete") + if keep: + preview_lines.append(f"Keeping: '{keep.name}' (id: {keep.id})") + + for duplicate in delete: + preview_lines.append(f"Removing: '{duplicate.name}' (id: {duplicate.id})") + total_records_to_remove += 1 + preview_lines.append("") + preview = "\n".join(preview_lines) + + # Fourth: Get user confirmation and delete + if TerminalHelper.prompt_for_execution( + system_exit_on_terminate=True, + prompt_message=preview, + prompt_title=f"Remove {total_records_to_remove} suborganizations?", + verify_message="*** WARNING: This will replace the record on DomainInformation and DomainRequest! ***", + ): + try: + # Update all references to point to the right suborg before deletion + all_suborgs_to_remove = set() + for record in records_to_prune.values(): + best_record = record["keep"] + suborgs_to_remove = {dupe.id for dupe in record["delete"]} + DomainRequest.objects.filter(sub_organization_id__in=suborgs_to_remove).update( + sub_organization=best_record + ) + DomainInformation.objects.filter(sub_organization_id__in=suborgs_to_remove).update( + sub_organization=best_record + ) + all_suborgs_to_remove.update(suborgs_to_remove) + # Delete the suborgs + delete_count, _ = Suborganization.objects.filter(id__in=all_suborgs_to_remove).delete() + TerminalHelper.colorful_logger( + logger.info, TerminalColors.MAGENTA, f"Successfully deleted {delete_count} suborganizations." + ) + except Exception as e: + TerminalHelper.colorful_logger( + logger.error, TerminalColors.FAIL, f"Failed to delete suborganizations: {str(e)}" + ) + + def get_records_to_prune(self, extra_records_to_prune): + """Maps all suborgs into a dictionary with a record to keep, and an array of records to delete.""" + # First: Group all suborganization names by their "normalized" names (finding duplicates). + # Returns a dict that looks like this: + # { + # "amtrak": [, , ], + # "usda/oc": [], + # ...etc + # } + # + name_groups = {} + for suborg in Suborganization.objects.all(): + normalized_name = normalize_string(suborg.name) + name_groups.setdefault(normalized_name, []).append(suborg) + + # Second: find the record we should keep, and the records we should delete + # Returns a dict that looks like this: + # { + # "amtrak": { + # "keep": + # "delete": [, ] + # }, + # "usda/oc": { + # "keep": , + # "delete": [] + # }, + # ...etc + # } + records_to_prune = {} + for normalized_name, duplicate_suborgs in name_groups.items(): + # Delete data from our preset list + if normalized_name in extra_records_to_prune: + # The 'keep' field expects a Suborganization but we just pass in a string, so this is just a workaround. + # This assumes that there is only one item in the name_group array (see usda/oc example). + # But this should be fine, given our data. + hardcoded_record_name = extra_records_to_prune[normalized_name]["replace_with"] + name_group = name_groups.get(normalize_string(hardcoded_record_name)) + keep = name_group[0] if name_group else None + records_to_prune[normalized_name] = {"keep": keep, "delete": duplicate_suborgs} + # Delete duplicates (extra spaces or casing differences) + elif len(duplicate_suborgs) > 1: + # Pick the best record (fewest spaces, most leading capitals) + best_record = max( + duplicate_suborgs, + key=lambda suborg: (-suborg.name.count(" "), count_capitals(suborg.name, leading_only=True)), + ) + records_to_prune[normalized_name] = { + "keep": best_record, + "delete": [s for s in duplicate_suborgs if s != best_record], + } + return records_to_prune diff --git a/src/registrar/management/commands/utility/terminal_helper.py b/src/registrar/management/commands/utility/terminal_helper.py index eed1027f7..87d9f12e5 100644 --- a/src/registrar/management/commands/utility/terminal_helper.py +++ b/src/registrar/management/commands/utility/terminal_helper.py @@ -401,16 +401,15 @@ class TerminalHelper: # Allow the user to inspect the command string # and ask if they wish to proceed proceed_execution = TerminalHelper.query_yes_no_exit( - f"""{TerminalColors.OKCYAN} - ===================================================== - {prompt_title} - ===================================================== - {verify_message} - - {prompt_message} - {TerminalColors.FAIL} - Proceed? (Y = proceed, N = {action_description_for_selecting_no}) - {TerminalColors.ENDC}""" + f"\n{TerminalColors.OKCYAN}" + "=====================================================" + f"\n{prompt_title}\n" + "=====================================================" + f"\n{verify_message}\n" + f"\n{prompt_message}\n" + f"{TerminalColors.FAIL}" + f"Proceed? (Y = proceed, N = {action_description_for_selecting_no})" + f"{TerminalColors.ENDC}" ) # If the user decided to proceed return true. @@ -443,13 +442,14 @@ class TerminalHelper: f.write(file_contents) @staticmethod - def colorful_logger(log_level, color, message): + def colorful_logger(log_level, color, message, exc_info=True): """Adds some color to your log output. Args: log_level: str | Logger.method -> Desired log level. ex: logger.info or "INFO" color: str | TerminalColors -> Output color. ex: TerminalColors.YELLOW or "YELLOW" message: str -> Message to display. + exc_info: bool -> Whether the log should print exc_info or not """ if isinstance(log_level, str) and hasattr(logger, log_level.lower()): @@ -463,4 +463,4 @@ class TerminalHelper: terminal_color = color colored_message = f"{terminal_color}{message}{TerminalColors.ENDC}" - log_method(colored_message) + log_method(colored_message, exc_info=exc_info) diff --git a/src/registrar/models/utility/generic_helper.py b/src/registrar/models/utility/generic_helper.py index e8992acc2..c2ba6887f 100644 --- a/src/registrar/models/utility/generic_helper.py +++ b/src/registrar/models/utility/generic_helper.py @@ -353,3 +353,17 @@ def normalize_string(string_to_normalize, lowercase=True): new_string = " ".join(string_to_normalize.split()) return new_string.lower() if lowercase else new_string + + +def count_capitals(text: str, leading_only: bool): + """Counts capital letters in a string. + Args: + text (str): The string to analyze. + leading_only (bool): If False, counts all capital letters. + If True, only counts capitals at the start of words. + Returns: + int: Number of capital letters found. + """ + if leading_only: + return sum(word[0].isupper() for word in text.split() if word) + return sum(c.isupper() for c in text if c) diff --git a/src/registrar/models/utility/portfolio_helper.py b/src/registrar/models/utility/portfolio_helper.py index cde28e4bd..4ae282f21 100644 --- a/src/registrar/models/utility/portfolio_helper.py +++ b/src/registrar/models/utility/portfolio_helper.py @@ -153,7 +153,9 @@ def validate_user_portfolio_permission(user_portfolio_permission): "Based on current waffle flag settings, users cannot be assigned to multiple portfolios." ) - existing_invitations = PortfolioInvitation.objects.filter(email=user_portfolio_permission.user.email) + existing_invitations = PortfolioInvitation.objects.exclude( + portfolio=user_portfolio_permission.portfolio + ).filter(email=user_portfolio_permission.user.email) if existing_invitations.exists(): raise ValidationError( "This user is already assigned to a portfolio invitation. " diff --git a/src/registrar/templates/emails/domain_invitation.txt b/src/registrar/templates/emails/domain_invitation.txt index 068040205..a077bff26 100644 --- a/src/registrar/templates/emails/domain_invitation.txt +++ b/src/registrar/templates/emails/domain_invitation.txt @@ -1,36 +1,40 @@ {% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #} -Hi. +Hi,{% if requested_user and requested_user.first_name %} {{ requested_user.first_name }}.{% endif %} -{{ requestor_email }} has added you as a manager on {{ domain.name }}. - -You can manage this domain on the .gov registrar . +{{ requestor_email }} has invited you to manage: +{% for domain in domains %}{{ domain.name }} +{% endfor %} +To manage domain information, visit the .gov registrar . ---------------------------------------------------------------- +{% if not requested_user %} YOU NEED A LOGIN.GOV ACCOUNT -You’ll need a Login.gov account to manage your .gov domain. Login.gov provides -a simple and secure process for signing in to many government services with one -account. +You’ll need a Login.gov account to access the .gov registrar. That account needs to be +associated with the following email address: {{ invitee_email_address }} -If you don’t already have one, follow these steps to create your -Login.gov account . +Login.gov provides a simple and secure process for signing in to many government +services with one account. If you don’t already have one, follow these steps to create +your Login.gov account . +{% endif %} DOMAIN MANAGEMENT -As a .gov domain manager, you can add or update information about your domain. -You’ll also serve as a contact for your .gov domain. Please keep your contact -information updated. +As a .gov domain manager, you can add or update information like name servers. You’ll +also serve as a contact for the domains you manage. Please keep your contact +information updated. Learn more about domain management . SOMETHING WRONG? -If you’re not affiliated with {{ domain.name }} or think you received this -message in error, reply to this email. +If you’re not affiliated with the .gov domains mentioned in this invitation or think you +received this message in error, reply to this email. THANK YOU -.Gov helps the public identify official, trusted information. Thank you for using a .gov domain. +.Gov helps the public identify official, trusted information. Thank you for using a .gov +domain. ---------------------------------------------------------------- @@ -38,5 +42,6 @@ The .gov team Contact us: Learn about .gov -The .gov registry is a part of the Cybersecurity and Infrastructure Security Agency (CISA) +The .gov registry is a part of the Cybersecurity and Infrastructure Security Agency +(CISA) {% endautoescape %} diff --git a/src/registrar/templates/emails/domain_invitation_subject.txt b/src/registrar/templates/emails/domain_invitation_subject.txt index 319b80176..9663346d0 100644 --- a/src/registrar/templates/emails/domain_invitation_subject.txt +++ b/src/registrar/templates/emails/domain_invitation_subject.txt @@ -1 +1 @@ -You’ve been added to a .gov domain \ No newline at end of file +You've been invited to manage {% if domains|length > 1 %}.gov domains{% else %}{{ domains.0.name }}{% endif %} \ No newline at end of file diff --git a/src/registrar/tests/common.py b/src/registrar/tests/common.py index 1afc7c358..c0083068d 100644 --- a/src/registrar/tests/common.py +++ b/src/registrar/tests/common.py @@ -40,6 +40,7 @@ from epplibwrapper import ( ErrorCode, responses, ) +from registrar.models.suborganization import Suborganization from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices from registrar.models.user_domain_role import UserDomainRole @@ -911,6 +912,7 @@ class MockDb(TestCase): DomainInformation.objects.all().delete() DomainRequest.objects.all().delete() UserDomainRole.objects.all().delete() + Suborganization.objects.all().delete() Portfolio.objects.all().delete() UserPortfolioPermission.objects.all().delete() User.objects.all().delete() diff --git a/src/registrar/tests/test_admin.py b/src/registrar/tests/test_admin.py index 3195f8237..2a7a52a13 100644 --- a/src/registrar/tests/test_admin.py +++ b/src/registrar/tests/test_admin.py @@ -28,7 +28,6 @@ from registrar.admin import ( TransitionDomainAdmin, UserGroupAdmin, PortfolioAdmin, - UserPortfolioPermissionAdmin, ) from registrar.models import ( Domain, @@ -69,9 +68,7 @@ from django.contrib.sessions.backends.db import SessionStore from django.contrib.auth import get_user_model from django.contrib import messages -from unittest.mock import ANY, patch, Mock -from django.forms import ValidationError - +from unittest.mock import ANY, call, patch, Mock import logging @@ -136,12 +133,18 @@ class TestDomainInvitationAdmin(TestCase): self.admin = ListHeaderAdmin(model=DomainInvitationAdmin, admin_site=AdminSite()) self.superuser = create_superuser() self.domain = Domain.objects.create(name="example.com") + self.portfolio = Portfolio.objects.create(organization_name="new portfolio", creator=self.superuser) + DomainInformation.objects.create(domain=self.domain, portfolio=self.portfolio, creator=self.superuser) """Create a client object""" self.client = Client(HTTP_HOST="localhost:8080") def tearDown(self): """Delete all DomainInvitation objects""" + PortfolioInvitation.objects.all().delete() DomainInvitation.objects.all().delete() + DomainInformation.objects.all().delete() + Portfolio.objects.all().delete() + Domain.objects.all().delete() Contact.objects.all().delete() User.objects.all().delete() @@ -189,11 +192,273 @@ class TestDomainInvitationAdmin(TestCase): self.assertContains(response, retrieved_html, count=1) @less_console_noise_decorator - def test_save_model_user_exists(self): - """Test saving a domain invitation when the user exists. + @override_flag("organization_feature", active=True) + @patch("registrar.admin.send_domain_invitation_email") + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.success") + def test_add_domain_invitation_success_when_user_not_portfolio_member( + self, mock_messages_success, mock_send_portfolio_email, mock_send_domain_email + ): + """Test saving a domain invitation when the user exists and is not a portfolio member. + + Should send out domain and portfolio invites. + Should trigger success messages for both email sends. + Should attempt to retrieve the domain invitation. + Should attempt to retrieve the portfolio invitation.""" + + user = User.objects.create_user(email="test@example.com", username="username") + + # Create a domain invitation instance + invitation = DomainInvitation(email="test@example.com", domain=self.domain) + + admin_instance = DomainInvitationAdmin(DomainInvitation, admin_site=None) + + # Create a request object + request = self.factory.post("/admin/registrar/DomainInvitation/add/") + request.user = self.superuser + + admin_instance.save_model(request, invitation, form=None, change=False) + + # Assert sends appropriate emails - domain and portfolio invites + mock_send_domain_email.assert_called_once_with( + email="test@example.com", + requestor=self.superuser, + domains=self.domain, + is_member_of_different_org=None, + requested_user=user, + ) + mock_send_portfolio_email.assert_called_once_with( + email="test@example.com", + requestor=self.superuser, + portfolio=self.portfolio, + ) + + # Assert success message + mock_messages_success.assert_has_calls( + [ + call(request, "test@example.com has been invited to the organization: new portfolio"), + call(request, "test@example.com has been invited to the domain: example.com"), + ] + ) + + # Assert the invitations were saved + self.assertEqual(DomainInvitation.objects.count(), 1) + self.assertEqual(DomainInvitation.objects.first().email, "test@example.com") + self.assertEqual(PortfolioInvitation.objects.count(), 1) + self.assertEqual(PortfolioInvitation.objects.first().email, "test@example.com") + + # Assert invitations were retrieved + domain_invitation = DomainInvitation.objects.get(email=user.email, domain=self.domain) + portfolio_invitation = PortfolioInvitation.objects.get(email=user.email, portfolio=self.portfolio) + + self.assertEqual(domain_invitation.status, DomainInvitation.DomainInvitationStatus.RETRIEVED) + self.assertEqual(portfolio_invitation.status, PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED) + self.assertEqual(UserDomainRole.objects.count(), 1) + self.assertEqual(UserDomainRole.objects.first().user, user) + self.assertEqual(UserPortfolioPermission.objects.count(), 1) + self.assertEqual(UserPortfolioPermission.objects.first().user, user) + + @less_console_noise_decorator + @override_flag("organization_feature", active=False) + @patch("registrar.admin.send_domain_invitation_email") + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.success") + def test_add_domain_invitation_success_when_user_not_portfolio_member_and_organization_feature_off( + self, mock_messages_success, mock_send_portfolio_email, mock_send_domain_email + ): + """Test saving a domain invitation when the user exists and organization_feature flag is off. + + Should send out a domain invitation. + Should not send a out portfolio invitation. + Should trigger success message for the domain invitation. + Should retrieve the domain invitation. + Should not create a portfolio invitation.""" + + user = User.objects.create_user(email="test@example.com", username="username") + + # Create a domain invitation instance + invitation = DomainInvitation(email="test@example.com", domain=self.domain) + + admin_instance = DomainInvitationAdmin(DomainInvitation, admin_site=None) + + # Create a request object + request = self.factory.post("/admin/registrar/DomainInvitation/add/") + request.user = self.superuser + + admin_instance.save_model(request, invitation, form=None, change=False) + + # Assert sends appropriate emails - domain but not portfolio + mock_send_domain_email.assert_called_once_with( + email="test@example.com", + requestor=self.superuser, + domains=self.domain, + is_member_of_different_org=None, + requested_user=user, + ) + mock_send_portfolio_email.assert_not_called() + + # Assert correct invite was created + self.assertEqual(DomainInvitation.objects.count(), 1) + self.assertEqual(PortfolioInvitation.objects.count(), 0) + + # Assert success message + mock_messages_success.assert_called_once_with( + request, "test@example.com has been invited to the domain: example.com" + ) + + # Assert the domain invitation was saved + self.assertEqual(DomainInvitation.objects.count(), 1) + self.assertEqual(DomainInvitation.objects.first().email, "test@example.com") + self.assertEqual(PortfolioInvitation.objects.count(), 0) + + # Assert the domain invitation was retrieved + domain_invitation = DomainInvitation.objects.get(email=user.email, domain=self.domain) + + self.assertEqual(domain_invitation.status, DomainInvitation.DomainInvitationStatus.RETRIEVED) + self.assertEqual(UserDomainRole.objects.count(), 1) + self.assertEqual(UserDomainRole.objects.first().user, user) + self.assertEqual(UserPortfolioPermission.objects.count(), 0) + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("multiple_portfolios", active=True) + @patch("registrar.admin.send_domain_invitation_email") + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.success") + def test_add_domain_invitation_success_when_user_not_portfolio_member_and_multiple_portfolio_feature_on( + self, mock_messages_success, mock_send_portfolio_email, mock_send_domain_email + ): + """Test saving a domain invitation when the user exists and multiple_portfolio flag is on. + + Should send out a domain invitation. + Should not send a out portfolio invitation. + Should trigger success message for the domain invitation. + Should retrieve the domain invitation. + Should not create a portfolio invitation. + + NOTE: This test may need to be reworked when the multiple_portfolio flag is fully fleshed out. + """ + + user = User.objects.create_user(email="test@example.com", username="username") + + # Create a domain invitation instance + invitation = DomainInvitation(email="test@example.com", domain=self.domain) + + admin_instance = DomainInvitationAdmin(DomainInvitation, admin_site=None) + + # Create a request object + request = self.factory.post("/admin/registrar/DomainInvitation/add/") + request.user = self.superuser + + admin_instance.save_model(request, invitation, form=None, change=False) + + # Assert sends appropriate emails - domain but not portfolio + mock_send_domain_email.assert_called_once_with( + email="test@example.com", + requestor=self.superuser, + domains=self.domain, + is_member_of_different_org=None, + requested_user=user, + ) + mock_send_portfolio_email.assert_not_called() + + # Assert correct invite was created + self.assertEqual(DomainInvitation.objects.count(), 1) + self.assertEqual(PortfolioInvitation.objects.count(), 0) + + # Assert success message + mock_messages_success.assert_called_once_with( + request, "test@example.com has been invited to the domain: example.com" + ) + + # Assert the domain invitation was saved + self.assertEqual(DomainInvitation.objects.count(), 1) + self.assertEqual(DomainInvitation.objects.first().email, "test@example.com") + self.assertEqual(PortfolioInvitation.objects.count(), 0) + + # Assert the domain invitation was retrieved + domain_invitation = DomainInvitation.objects.get(email=user.email, domain=self.domain) + + self.assertEqual(domain_invitation.status, DomainInvitation.DomainInvitationStatus.RETRIEVED) + self.assertEqual(UserDomainRole.objects.count(), 1) + self.assertEqual(UserDomainRole.objects.first().user, user) + self.assertEqual(UserPortfolioPermission.objects.count(), 0) + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @patch("registrar.admin.send_domain_invitation_email") + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.success") + def test_add_domain_invitation_success_when_user_existing_portfolio_member( + self, mock_messages_success, mock_send_portfolio_email, mock_send_domain_email + ): + """Test saving a domain invitation when the user exists and a portfolio invitation exists. + + Should send out domain invitation only. + Should trigger success message for the domain invitation. + Should retrieve the domain invitation.""" + + user = User.objects.create_user(email="test@example.com", username="username") + + # Create a domain invitation instance + invitation = DomainInvitation(email="test@example.com", domain=self.domain) + + UserPortfolioPermission.objects.create( + user=user, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER] + ) + + admin_instance = DomainInvitationAdmin(DomainInvitation, admin_site=None) + + # Create a request object + request = self.factory.post("/admin/registrar/DomainInvitation/add/") + request.user = self.superuser + + # Patch the retrieve method to ensure it is not called + with patch.object(DomainInvitation, "retrieve") as domain_invitation_mock_retrieve: + with patch.object(PortfolioInvitation, "retrieve") as portfolio_invitation_mock_retrieve: + admin_instance.save_model(request, invitation, form=None, change=False) + + # Assert sends appropriate emails - domain and portfolio invites + mock_send_domain_email.assert_called_once_with( + email="test@example.com", + requestor=self.superuser, + domains=self.domain, + is_member_of_different_org=None, + requested_user=user, + ) + mock_send_portfolio_email.assert_not_called + + # Assert retrieve was not called + domain_invitation_mock_retrieve.assert_called_once() + portfolio_invitation_mock_retrieve.assert_not_called() + + # Assert success message + mock_messages_success.assert_called_once_with( + request, "test@example.com has been invited to the domain: example.com" + ) + + # Assert the invitations were saved + self.assertEqual(DomainInvitation.objects.count(), 1) + self.assertEqual(DomainInvitation.objects.first().email, "test@example.com") + self.assertEqual(PortfolioInvitation.objects.count(), 0) + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @patch("registrar.admin.send_domain_invitation_email") + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.error") + def test_add_domain_invitation_when_user_not_portfolio_member_raises_exception_sending_portfolio_email( + self, mock_messages_error, mock_send_portfolio_email, mock_send_domain_email + ): + """Test saving a domain invitation when the user exists and is not a portfolio member raises + sending portfolio email exception. + + Should only attempt to send the portfolio invitation. + Should trigger error message on portfolio invitation. + Should not attempt to retrieve the domain invitation.""" + + mock_send_portfolio_email.side_effect = MissingEmailError("craving a burger") - Should attempt to retrieve the domain invitation.""" - # Create a user with the same email User.objects.create_user(email="test@example.com", username="username") # Create a domain invitation instance @@ -205,22 +470,180 @@ class TestDomainInvitationAdmin(TestCase): request = self.factory.post("/admin/registrar/DomainInvitation/add/") request.user = self.superuser - # Patch the retrieve method - with patch.object(DomainInvitation, "retrieve") as mock_retrieve: - admin_instance.save_model(request, invitation, form=None, change=False) + # Patch the retrieve method to ensure it is not called + with patch.object(DomainInvitation, "retrieve") as domain_invitation_mock_retrieve: + with patch.object(PortfolioInvitation, "retrieve") as portfolio_invitation_mock_retrieve: + admin_instance.save_model(request, invitation, form=None, change=False) - # Assert retrieve was called - mock_retrieve.assert_called_once() + # Assert sends appropriate emails - domain and portfolio invites + mock_send_domain_email.assert_not_called() + mock_send_portfolio_email.assert_called_once_with( + email="test@example.com", + requestor=self.superuser, + portfolio=self.portfolio, + ) - # Assert the invitation was saved - self.assertEqual(DomainInvitation.objects.count(), 1) - self.assertEqual(DomainInvitation.objects.first().email, "test@example.com") + # Assert retrieve on domain invite only was called + domain_invitation_mock_retrieve.assert_not_called() + portfolio_invitation_mock_retrieve.assert_not_called() + + # Assert error message + mock_messages_error.assert_called_once_with( + request, "Can't send invitation email. No email is associated with your user account." + ) + + # Assert the invitations were saved + self.assertEqual(DomainInvitation.objects.count(), 0) + self.assertEqual(PortfolioInvitation.objects.count(), 0) @less_console_noise_decorator - def test_save_model_user_does_not_exist(self): + @override_flag("organization_feature", active=True) + @patch("registrar.admin.send_domain_invitation_email") + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.success") + @patch("django.contrib.messages.error") + def test_add_domain_invitation_when_user_not_portfolio_member_raises_exception_sending_domain_email( + self, mock_messages_error, mock_messages_success, mock_send_portfolio_email, mock_send_domain_email + ): + """Test saving a domain invitation when the user exists and is not a portfolio member raises + sending domain email exception. + + Should send out the portfolio invitation and attempt to send the domain invitation. + Should trigger portfolio invitation success message. + Should trigger domain invitation error message. + Should not attempt to retrieve the domain invitation. + Should attempt to retrieve the portfolio invitation.""" + + mock_send_domain_email.side_effect = MissingEmailError("craving a burger") + + user = User.objects.create_user(email="test@example.com", username="username") + + # Create a domain invitation instance + invitation = DomainInvitation(email="test@example.com", domain=self.domain) + + admin_instance = DomainInvitationAdmin(DomainInvitation, admin_site=None) + + # Create a request object + request = self.factory.post("/admin/registrar/DomainInvitation/add/") + request.user = self.superuser + + # Patch the retrieve method to ensure it is not called + with patch.object(DomainInvitation, "retrieve") as domain_invitation_mock_retrieve: + with patch.object(PortfolioInvitation, "retrieve") as portfolio_invitation_mock_retrieve: + admin_instance.save_model(request, invitation, form=None, change=False) + + # Assert sends appropriate emails - domain and portfolio invites + mock_send_domain_email.assert_called_once_with( + email="test@example.com", + requestor=self.superuser, + domains=self.domain, + is_member_of_different_org=None, + requested_user=user, + ) + mock_send_portfolio_email.assert_called_once_with( + email="test@example.com", + requestor=self.superuser, + portfolio=self.portfolio, + ) + + # Assert retrieve on domain invite only was called + domain_invitation_mock_retrieve.assert_not_called() + portfolio_invitation_mock_retrieve.assert_called_once() + + # Assert success message + mock_messages_success.assert_called_once_with( + request, "test@example.com has been invited to the organization: new portfolio" + ) + + # Assert error message + mock_messages_error.assert_called_once_with( + request, "Can't send invitation email. No email is associated with your user account." + ) + + # Assert the invitations were saved + self.assertEqual(DomainInvitation.objects.count(), 0) + self.assertEqual(PortfolioInvitation.objects.count(), 1) + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @patch("registrar.admin.send_domain_invitation_email") + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.success") + @patch("django.contrib.messages.error") + def test_add_domain_invitation_when_user_existing_portfolio_member_raises_exception_sending_domain_email( + self, mock_messages_error, mock_messages_success, mock_send_portfolio_email, mock_send_domain_email + ): + """Test saving a domain invitation when the user exists and is not a portfolio member raises + sending domain email exception. + + Should send out the portfolio invitation and attempt to send the domain invitation. + Should trigger portfolio invitation success message. + Should trigger domain invitation error message. + Should not attempt to retrieve the domain invitation. + Should attempt to retrieve the portfolio invitation.""" + + mock_send_domain_email.side_effect = MissingEmailError("craving a burger") + + user = User.objects.create_user(email="test@example.com", username="username") + + UserPortfolioPermission.objects.create( + user=user, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER] + ) + + # Create a domain invitation instance + invitation = DomainInvitation(email="test@example.com", domain=self.domain) + + admin_instance = DomainInvitationAdmin(DomainInvitation, admin_site=None) + + # Create a request object + request = self.factory.post("/admin/registrar/DomainInvitation/add/") + request.user = self.superuser + + # Patch the retrieve method to ensure it is not called + with patch.object(DomainInvitation, "retrieve") as domain_invitation_mock_retrieve: + with patch.object(PortfolioInvitation, "retrieve") as portfolio_invitation_mock_retrieve: + admin_instance.save_model(request, invitation, form=None, change=False) + + # Assert sends appropriate emails - domain and portfolio invites + mock_send_domain_email.assert_called_once_with( + email="test@example.com", + requestor=self.superuser, + domains=self.domain, + is_member_of_different_org=None, + requested_user=user, + ) + mock_send_portfolio_email.assert_not_called() + + # Assert retrieve on domain invite only was called + domain_invitation_mock_retrieve.assert_not_called() + portfolio_invitation_mock_retrieve.assert_not_called() + + # Assert success message + mock_messages_success.assert_not_called() + + # Assert error message + mock_messages_error.assert_called_once_with( + request, "Can't send invitation email. No email is associated with your user account." + ) + + # Assert the invitations were saved + self.assertEqual(DomainInvitation.objects.count(), 0) + self.assertEqual(PortfolioInvitation.objects.count(), 0) + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @patch("registrar.admin.send_domain_invitation_email") + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.success") + def test_add_domain_invitation_success_when_email_not_portfolio_member( + self, mock_messages_success, mock_send_portfolio_email, mock_send_domain_email + ): """Test saving a domain invitation when the user does not exist. - Should not attempt to retrieve the domain invitation.""" + Should send out domain and portfolio invitations. + Should trigger success messages. + Should not attempt to retrieve the domain invitation. + Should not attempt to retrieve the portfolio invitation.""" # Create a domain invitation instance invitation = DomainInvitation(email="nonexistent@example.com", domain=self.domain) @@ -231,15 +654,393 @@ class TestDomainInvitationAdmin(TestCase): request.user = self.superuser # Patch the retrieve method to ensure it is not called - with patch.object(DomainInvitation, "retrieve") as mock_retrieve: - admin_instance.save_model(request, invitation, form=None, change=False) + with patch.object(DomainInvitation, "retrieve") as domain_invitation_mock_retrieve: + with patch.object(PortfolioInvitation, "retrieve") as portfolio_invitation_mock_retrieve: + admin_instance.save_model(request, invitation, form=None, change=False) + + # Assert sends appropriate emails - domain and portfolio invites + mock_send_domain_email.assert_called_once_with( + email="nonexistent@example.com", + requestor=self.superuser, + domains=self.domain, + is_member_of_different_org=None, + requested_user=None, + ) + mock_send_portfolio_email.assert_called_once_with( + email="nonexistent@example.com", + requestor=self.superuser, + portfolio=self.portfolio, + ) # Assert retrieve was not called - mock_retrieve.assert_not_called() + domain_invitation_mock_retrieve.assert_not_called() + portfolio_invitation_mock_retrieve.assert_not_called() - # Assert the invitation was saved + # Assert success message + mock_messages_success.assert_has_calls( + [ + call(request, "nonexistent@example.com has been invited to the organization: new portfolio"), + call(request, "nonexistent@example.com has been invited to the domain: example.com"), + ] + ) + + # Assert the invitations were saved self.assertEqual(DomainInvitation.objects.count(), 1) self.assertEqual(DomainInvitation.objects.first().email, "nonexistent@example.com") + self.assertEqual(PortfolioInvitation.objects.count(), 1) + self.assertEqual(PortfolioInvitation.objects.first().email, "nonexistent@example.com") + + @less_console_noise_decorator + @override_flag("organization_feature", active=False) + @patch("registrar.admin.send_domain_invitation_email") + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.success") + def test_add_domain_invitation_success_when_email_not_portfolio_member_and_organization_feature_off( + self, mock_messages_success, mock_send_portfolio_email, mock_send_domain_email + ): + """Test saving a domain invitation when the user does not exist and organization_feature flag is off. + + Should send out a domain invitation. + Should not send a out portfolio invitation. + Should trigger success message for domain invitation. + Should not retrieve the domain invitation. + Should not create a portfolio invitation.""" + # Create a domain invitation instance + invitation = DomainInvitation(email="nonexistent@example.com", domain=self.domain) + + admin_instance = DomainInvitationAdmin(DomainInvitation, admin_site=None) + + # Create a request object + request = self.factory.post("/admin/registrar/DomainInvitation/add/") + request.user = self.superuser + + # Patch the retrieve method to ensure it is not called + with patch.object(DomainInvitation, "retrieve") as domain_invitation_mock_retrieve: + with patch.object(PortfolioInvitation, "retrieve") as portfolio_invitation_mock_retrieve: + admin_instance.save_model(request, invitation, form=None, change=False) + + # Assert sends appropriate emails - domain but not portfolio + mock_send_domain_email.assert_called_once_with( + email="nonexistent@example.com", + requestor=self.superuser, + domains=self.domain, + is_member_of_different_org=None, + requested_user=None, + ) + mock_send_portfolio_email.assert_not_called() + + # Assert retrieve on domain invite only was called + domain_invitation_mock_retrieve.assert_not_called() + portfolio_invitation_mock_retrieve.assert_not_called() + + # Assert success message + mock_messages_success.assert_called_once_with( + request, "nonexistent@example.com has been invited to the domain: example.com" + ) + + # Assert the domain invitation was saved + self.assertEqual(DomainInvitation.objects.count(), 1) + self.assertEqual(DomainInvitation.objects.first().email, "nonexistent@example.com") + self.assertEqual(PortfolioInvitation.objects.count(), 0) + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("multiple_portfolios", active=True) + @patch("registrar.admin.send_domain_invitation_email") + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.success") + def test_add_domain_invitation_success_when_email_not_portfolio_member_and_multiple_portfolio_feature_on( + self, mock_messages_success, mock_send_portfolio_email, mock_send_domain_email + ): + """Test saving a domain invitation when the user does not exist and multiple_portfolio flag is on. + + Should send out a domain invitation. + Should not send a out portfolio invitation. + Should trigger success message for domain invitation. + Should not retrieve the domain invitation. + Should not create a portfolio invitation.""" + # Create a domain invitation instance + invitation = DomainInvitation(email="nonexistent@example.com", domain=self.domain) + + admin_instance = DomainInvitationAdmin(DomainInvitation, admin_site=None) + + # Create a request object + request = self.factory.post("/admin/registrar/DomainInvitation/add/") + request.user = self.superuser + + # Patch the retrieve method to ensure it is not called + with patch.object(DomainInvitation, "retrieve") as domain_invitation_mock_retrieve: + with patch.object(PortfolioInvitation, "retrieve") as portfolio_invitation_mock_retrieve: + admin_instance.save_model(request, invitation, form=None, change=False) + + # Assert sends appropriate emails - domain but not portfolio + mock_send_domain_email.assert_called_once_with( + email="nonexistent@example.com", + requestor=self.superuser, + domains=self.domain, + is_member_of_different_org=None, + requested_user=None, + ) + mock_send_portfolio_email.assert_not_called() + + # Assert retrieve on domain invite only was called + domain_invitation_mock_retrieve.assert_not_called() + portfolio_invitation_mock_retrieve.assert_not_called() + + # Assert success message + mock_messages_success.assert_called_once_with( + request, "nonexistent@example.com has been invited to the domain: example.com" + ) + + # Assert the domain invitation was saved + self.assertEqual(DomainInvitation.objects.count(), 1) + self.assertEqual(DomainInvitation.objects.first().email, "nonexistent@example.com") + self.assertEqual(PortfolioInvitation.objects.count(), 0) + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @patch("registrar.admin.send_domain_invitation_email") + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.success") + def test_add_domain_invitation_success_when_email_existing_portfolio_invitation( + self, mock_messages_success, mock_send_portfolio_email, mock_send_domain_email + ): + """Test saving a domain invitation when the user does not exist and a portfolio invitation exists. + + Should send out domain invitation only. + Should trigger success message for the domain invitation. + Should not attempt to retrieve the domain invitation. + Should not attempt to retrieve the portfolio invitation.""" + + PortfolioInvitation.objects.create( + email="nonexistent@example.com", + portfolio=self.portfolio, + roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER], + ) + + # Create a domain invitation instance + invitation = DomainInvitation(email="nonexistent@example.com", domain=self.domain) + + admin_instance = DomainInvitationAdmin(DomainInvitation, admin_site=None) + + # Create a request object + request = self.factory.post("/admin/registrar/DomainInvitation/add/") + request.user = self.superuser + + # Patch the retrieve method to ensure it is not called + with patch.object(DomainInvitation, "retrieve") as domain_invitation_mock_retrieve: + with patch.object(PortfolioInvitation, "retrieve") as portfolio_invitation_mock_retrieve: + admin_instance.save_model(request, invitation, form=None, change=False) + + # Assert sends appropriate emails - domain and portfolio invites + mock_send_domain_email.assert_called_once_with( + email="nonexistent@example.com", + requestor=self.superuser, + domains=self.domain, + is_member_of_different_org=False, + requested_user=None, + ) + mock_send_portfolio_email.assert_not_called + + # Assert retrieve was not called + domain_invitation_mock_retrieve.assert_not_called() + portfolio_invitation_mock_retrieve.assert_not_called() + + # Assert success message + mock_messages_success.assert_called_once_with( + request, "nonexistent@example.com has been invited to the domain: example.com" + ) + + # Assert the invitations were saved + self.assertEqual(DomainInvitation.objects.count(), 1) + self.assertEqual(DomainInvitation.objects.first().email, "nonexistent@example.com") + self.assertEqual(PortfolioInvitation.objects.count(), 1) + self.assertEqual(PortfolioInvitation.objects.first().email, "nonexistent@example.com") + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @patch("registrar.admin.send_domain_invitation_email") + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.error") + def test_add_domain_invitation_when_user_not_portfolio_email_raises_exception_sending_portfolio_email( + self, mock_messages_error, mock_send_portfolio_email, mock_send_domain_email + ): + """Test saving a domain invitation when the user exists and is not a portfolio member raises + sending portfolio email exception. + + Should only attempt to send the portfolio invitation. + Should trigger error message on portfolio invitation. + Should not attempt to retrieve the domain invitation. + Should not attempt to retrieve the portfolio invitation.""" + + mock_send_portfolio_email.side_effect = MissingEmailError("craving a burger") + + # Create a domain invitation instance + invitation = DomainInvitation(email="nonexistent@example.com", domain=self.domain) + + admin_instance = DomainInvitationAdmin(DomainInvitation, admin_site=None) + + # Create a request object + request = self.factory.post("/admin/registrar/DomainInvitation/add/") + request.user = self.superuser + + # Patch the retrieve method to ensure it is not called + with patch.object(DomainInvitation, "retrieve") as domain_invitation_mock_retrieve: + with patch.object(PortfolioInvitation, "retrieve") as portfolio_invitation_mock_retrieve: + admin_instance.save_model(request, invitation, form=None, change=False) + + # Assert sends appropriate emails - domain and portfolio invites + mock_send_domain_email.assert_not_called() + mock_send_portfolio_email.assert_called_once_with( + email="nonexistent@example.com", + requestor=self.superuser, + portfolio=self.portfolio, + ) + + # Assert retrieve on domain invite only was called + domain_invitation_mock_retrieve.assert_not_called() + portfolio_invitation_mock_retrieve.assert_not_called() + + # Assert error message + mock_messages_error.assert_called_once_with( + request, "Can't send invitation email. No email is associated with your user account." + ) + + # Assert the invitations were saved + self.assertEqual(DomainInvitation.objects.count(), 0) + self.assertEqual(PortfolioInvitation.objects.count(), 0) + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @patch("registrar.admin.send_domain_invitation_email") + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.success") + @patch("django.contrib.messages.error") + def test_add_domain_invitation_when_user_not_portfolio_email_raises_exception_sending_domain_email( + self, mock_messages_error, mock_messages_success, mock_send_portfolio_email, mock_send_domain_email + ): + """Test saving a domain invitation when the user exists and is not a portfolio member + raises sending domain email exception. + + Should send out the portfolio invitation and attempt to send the domain invitation. + Should trigger portfolio invitation success message. + Should trigger domain invitation error message. + Should not attempt to retrieve the domain invitation. + Should attempt to retrieve the portfolio invitation.""" + + mock_send_domain_email.side_effect = MissingEmailError("craving a burger") + + # Create a domain invitation instance + invitation = DomainInvitation(email="nonexistent@example.com", domain=self.domain) + + admin_instance = DomainInvitationAdmin(DomainInvitation, admin_site=None) + + # Create a request object + request = self.factory.post("/admin/registrar/DomainInvitation/add/") + request.user = self.superuser + + # Patch the retrieve method to ensure it is not called + with patch.object(DomainInvitation, "retrieve") as domain_invitation_mock_retrieve: + with patch.object(PortfolioInvitation, "retrieve") as portfolio_invitation_mock_retrieve: + admin_instance.save_model(request, invitation, form=None, change=False) + + # Assert sends appropriate emails - domain and portfolio invites + mock_send_domain_email.assert_called_once_with( + email="nonexistent@example.com", + requestor=self.superuser, + domains=self.domain, + is_member_of_different_org=None, + requested_user=None, + ) + mock_send_portfolio_email.assert_called_once_with( + email="nonexistent@example.com", + requestor=self.superuser, + portfolio=self.portfolio, + ) + + # Assert retrieve on domain invite only was called + domain_invitation_mock_retrieve.assert_not_called() + portfolio_invitation_mock_retrieve.assert_not_called() + + # Assert success message + mock_messages_success.assert_called_once_with( + request, "nonexistent@example.com has been invited to the organization: new portfolio" + ) + + # Assert error message + mock_messages_error.assert_called_once_with( + request, "Can't send invitation email. No email is associated with your user account." + ) + + # Assert the invitations were saved + self.assertEqual(DomainInvitation.objects.count(), 0) + self.assertEqual(PortfolioInvitation.objects.count(), 1) + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @patch("registrar.admin.send_domain_invitation_email") + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.success") + @patch("django.contrib.messages.error") + def test_add_domain_invitation_when_user_existing_portfolio_email_raises_exception_sending_domain_email( + self, mock_messages_error, mock_messages_success, mock_send_portfolio_email, mock_send_domain_email + ): + """Test saving a domain invitation when the user exists and is not a portfolio member + raises sending domain email exception. + + Should send out the portfolio invitation and attempt to send the domain invitation. + Should trigger portfolio invitation success message. + Should trigger domain invitation error message. + Should not attempt to retrieve the domain invitation. + Should attempt to retrieve the portfolio invitation.""" + + mock_send_domain_email.side_effect = MissingEmailError("craving a burger") + + PortfolioInvitation.objects.create( + email="nonexistent@example.com", + portfolio=self.portfolio, + roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER], + ) + + # Create a domain invitation instance + invitation = DomainInvitation(email="nonexistent@example.com", domain=self.domain) + + admin_instance = DomainInvitationAdmin(DomainInvitation, admin_site=None) + + # Create a request object + request = self.factory.post("/admin/registrar/DomainInvitation/add/") + request.user = self.superuser + + # Patch the retrieve method to ensure it is not called + with patch.object(DomainInvitation, "retrieve") as domain_invitation_mock_retrieve: + with patch.object(PortfolioInvitation, "retrieve") as portfolio_invitation_mock_retrieve: + admin_instance.save_model(request, invitation, form=None, change=False) + + # Assert sends appropriate emails - domain and portfolio invites + mock_send_domain_email.assert_called_once_with( + email="nonexistent@example.com", + requestor=self.superuser, + domains=self.domain, + is_member_of_different_org=False, + requested_user=None, + ) + mock_send_portfolio_email.assert_not_called() + + # Assert retrieve on domain invite only was called + domain_invitation_mock_retrieve.assert_not_called() + portfolio_invitation_mock_retrieve.assert_not_called() + + # Assert success message + mock_messages_success.assert_not_called() + + # Assert error message + mock_messages_error.assert_called_once_with( + request, "Can't send invitation email. No email is associated with your user account." + ) + + # Assert the invitations were saved + self.assertEqual(DomainInvitation.objects.count(), 0) + self.assertEqual(PortfolioInvitation.objects.count(), 1) class TestUserPortfolioPermissionAdmin(TestCase): @@ -247,8 +1048,6 @@ class TestUserPortfolioPermissionAdmin(TestCase): def setUp(self): """Create a client object""" - self.factory = RequestFactory() - self.admin = ListHeaderAdmin(model=UserPortfolioPermissionAdmin, admin_site=AdminSite()) self.client = Client(HTTP_HOST="localhost:8080") self.superuser = create_superuser() self.portfolio = Portfolio.objects.create(organization_name="Test Portfolio", creator=self.superuser) @@ -256,77 +1055,9 @@ class TestUserPortfolioPermissionAdmin(TestCase): def tearDown(self): """Delete all DomainInvitation objects""" Portfolio.objects.all().delete() - PortfolioInvitation.objects.all().delete() Contact.objects.all().delete() User.objects.all().delete() - - @less_console_noise_decorator - def test_clean_user_portfolio_permission(self): - """Tests validation of user portfolio permission""" - - # Test validation fails when portfolio missing but permissions are present - permission = UserPortfolioPermission(user=self.superuser, roles=["organization_admin"], portfolio=None) - with self.assertRaises(ValidationError) as err: - permission.clean() - self.assertEqual( - str(err.exception), - "When portfolio roles or additional permissions are assigned, portfolio is required.", - ) - - # Test validation fails when portfolio present but no permissions are present - permission = UserPortfolioPermission(user=self.superuser, roles=None, portfolio=self.portfolio) - with self.assertRaises(ValidationError) as err: - permission.clean() - self.assertEqual( - str(err.exception), - "When portfolio is assigned, portfolio roles or additional permissions are required.", - ) - - # Test validation fails with forbidden permissions for single role - forbidden_member_roles = UserPortfolioPermission.FORBIDDEN_PORTFOLIO_ROLE_PERMISSIONS.get( - UserPortfolioRoleChoices.ORGANIZATION_MEMBER - ) - permission = UserPortfolioPermission( - user=self.superuser, - roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER], - additional_permissions=forbidden_member_roles, - portfolio=self.portfolio, - ) - with self.assertRaises(ValidationError) as err: - permission.clean() - self.assertEqual( - str(err.exception), - "These permissions cannot be assigned to Member: " - "", - ) - - @less_console_noise_decorator - def test_get_forbidden_permissions_with_multiple_roles(self): - """Tests that forbidden permissions are properly handled when a user has multiple roles""" - # Get forbidden permissions for member role - member_forbidden = UserPortfolioPermission.FORBIDDEN_PORTFOLIO_ROLE_PERMISSIONS.get( - UserPortfolioRoleChoices.ORGANIZATION_MEMBER - ) - - # Test with both admin and member roles - roles = [UserPortfolioRoleChoices.ORGANIZATION_ADMIN, UserPortfolioRoleChoices.ORGANIZATION_MEMBER] - - # These permissions would be forbidden for member alone, but should be allowed - # when combined with admin role - permissions = UserPortfolioPermission.get_forbidden_permissions( - roles=roles, additional_permissions=member_forbidden - ) - - # Should return empty set since no permissions are commonly forbidden between admin and member - self.assertEqual(permissions, set()) - - # Verify the same permissions are forbidden when only member role is present - member_only_permissions = UserPortfolioPermission.get_forbidden_permissions( - roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER], additional_permissions=member_forbidden - ) - - # Should return the forbidden permissions for member role - self.assertEqual(member_only_permissions, set(member_forbidden)) + UserPortfolioPermission.objects.all().delete() @less_console_noise_decorator def test_has_change_form_description(self): @@ -376,117 +1107,12 @@ class TestPortfolioInvitationAdmin(TestCase): Portfolio.objects.all().delete() PortfolioInvitation.objects.all().delete() Contact.objects.all().delete() + User.objects.all().delete() @classmethod def tearDownClass(self): User.objects.all().delete() - @less_console_noise_decorator - @override_flag("multiple_portfolios", active=False) - def test_clean_multiple_portfolios_inactive(self): - """Tests that users cannot have multiple portfolios or invitations when flag is inactive""" - # Create the first portfolio permission - UserPortfolioPermission.objects.create( - user=self.superuser, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] - ) - - # Test a second portfolio permission object (should fail) - second_portfolio = Portfolio.objects.create(organization_name="Second Portfolio", creator=self.superuser) - second_permission = UserPortfolioPermission( - user=self.superuser, portfolio=second_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] - ) - - with self.assertRaises(ValidationError) as err: - second_permission.clean() - self.assertIn("users cannot be assigned to multiple portfolios", str(err.exception)) - - # Test that adding a new portfolio invitation also fails - third_portfolio = Portfolio.objects.create(organization_name="Third Portfolio", creator=self.superuser) - invitation = PortfolioInvitation( - email=self.superuser.email, portfolio=third_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] - ) - - with self.assertRaises(ValidationError) as err: - invitation.clean() - self.assertIn("users cannot be assigned to multiple portfolios", str(err.exception)) - - @less_console_noise_decorator - @override_flag("multiple_portfolios", active=True) - def test_clean_multiple_portfolios_active(self): - """Tests that users can have multiple portfolios and invitations when flag is active""" - # Create first portfolio permission - UserPortfolioPermission.objects.create( - user=self.superuser, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] - ) - - # Second portfolio permission should succeed - second_portfolio = Portfolio.objects.create(organization_name="Second Portfolio", creator=self.superuser) - second_permission = UserPortfolioPermission( - user=self.superuser, portfolio=second_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] - ) - second_permission.clean() - second_permission.save() - - # Verify both permissions exist - user_permissions = UserPortfolioPermission.objects.filter(user=self.superuser) - self.assertEqual(user_permissions.count(), 2) - - # Portfolio invitation should also succeed - third_portfolio = Portfolio.objects.create(organization_name="Third Portfolio", creator=self.superuser) - invitation = PortfolioInvitation( - email=self.superuser.email, portfolio=third_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] - ) - invitation.clean() - invitation.save() - - # Verify invitation exists - self.assertTrue( - PortfolioInvitation.objects.filter( - email=self.superuser.email, - portfolio=third_portfolio, - ).exists() - ) - - @less_console_noise_decorator - def test_clean_portfolio_invitation(self): - """Tests validation of portfolio invitation permissions""" - - # Test validation fails when portfolio missing but permissions present - invitation = PortfolioInvitation(email="test@example.com", roles=["organization_admin"], portfolio=None) - with self.assertRaises(ValidationError) as err: - invitation.clean() - self.assertEqual( - str(err.exception), - "When portfolio roles or additional permissions are assigned, portfolio is required.", - ) - - # Test validation fails when portfolio present but no permissions - invitation = PortfolioInvitation(email="test@example.com", roles=None, portfolio=self.portfolio) - with self.assertRaises(ValidationError) as err: - invitation.clean() - self.assertEqual( - str(err.exception), - "When portfolio is assigned, portfolio roles or additional permissions are required.", - ) - - # Test validation fails with forbidden permissions - forbidden_member_roles = UserPortfolioPermission.FORBIDDEN_PORTFOLIO_ROLE_PERMISSIONS.get( - UserPortfolioRoleChoices.ORGANIZATION_MEMBER - ) - invitation = PortfolioInvitation( - email="test@example.com", - roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER], - additional_permissions=forbidden_member_roles, - portfolio=self.portfolio, - ) - with self.assertRaises(ValidationError) as err: - invitation.clean() - self.assertEqual( - str(err.exception), - "These permissions cannot be assigned to Member: " - "", - ) - @less_console_noise_decorator def test_has_model_description(self): """Tests if this model has a model description on the table view""" @@ -532,34 +1158,32 @@ class TestPortfolioInvitationAdmin(TestCase): @less_console_noise_decorator def test_get_filters(self): """Ensures that our filters are displaying correctly""" - with less_console_noise(): - self.client.force_login(self.superuser) + self.client.force_login(self.superuser) - response = self.client.get( - "/admin/registrar/portfolioinvitation/", - {}, - follow=True, - ) + response = self.client.get( + "/admin/registrar/portfolioinvitation/", + {}, + follow=True, + ) - # Assert that the filters are added - self.assertContains(response, "invited", count=4) - self.assertContains(response, "Invited", count=2) - self.assertContains(response, "retrieved", count=2) - self.assertContains(response, "Retrieved", count=2) + # Assert that the filters are added + self.assertContains(response, "invited", count=4) + self.assertContains(response, "Invited", count=2) + self.assertContains(response, "retrieved", count=2) + self.assertContains(response, "Retrieved", count=2) - # Check for the HTML context specificially - invited_html = 'Invited' - retrieved_html = 'Retrieved' + # Check for the HTML context specificially + invited_html = 'Invited' + retrieved_html = 'Retrieved' - self.assertContains(response, invited_html, count=1) - self.assertContains(response, retrieved_html, count=1) + self.assertContains(response, invited_html, count=1) + self.assertContains(response, retrieved_html, count=1) @less_console_noise_decorator @patch("registrar.admin.send_portfolio_invitation_email") @patch("django.contrib.messages.success") # Mock the `messages.warning` call - def test_save_sends_email(self, mock_messages_warning, mock_send_email): - """On save_model, an email is NOT sent if an invitation already exists.""" - self.client.force_login(self.superuser) + def test_save_sends_email(self, mock_messages_success, mock_send_email): + """On save_model, an email is sent if an invitation already exists.""" # Create an instance of the admin class admin_instance = PortfolioInvitationAdmin(PortfolioInvitation, admin_site=None) @@ -578,7 +1202,7 @@ class TestPortfolioInvitationAdmin(TestCase): # Call the save_model method admin_instance.save_model(request, portfolio_invitation, None, None) - # Assert that send_portfolio_invitation_email is not called + # Assert that send_portfolio_invitation_email is called mock_send_email.assert_called() # Get the arguments passed to send_portfolio_invitation_email @@ -590,7 +1214,7 @@ class TestPortfolioInvitationAdmin(TestCase): self.assertEqual(called_kwargs["portfolio"], self.portfolio) # Assert that a warning message was triggered - mock_messages_warning.assert_called_once_with(request, "james.gordon@gotham.gov has been invited.") + mock_messages_success.assert_called_once_with(request, "james.gordon@gotham.gov has been invited.") @less_console_noise_decorator @patch("registrar.admin.send_portfolio_invitation_email") @@ -627,6 +1251,94 @@ class TestPortfolioInvitationAdmin(TestCase): # Assert that a warning message was triggered mock_messages_warning.assert_called_once_with(request, "User is already a member of this portfolio.") + @less_console_noise_decorator + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.success") # Mock the `messages.warning` call + def test_add_portfolio_invitation_auto_retrieves_invitation_when_user_exists( + self, mock_messages_success, mock_send_email + ): + """On save_model, we create and retrieve a portfolio invitation if the user exists.""" + + # Create an instance of the admin class + admin_instance = PortfolioInvitationAdmin(PortfolioInvitation, admin_site=None) + + User.objects.create_user(email="james.gordon@gotham.gov", username="username") + + # Create a PortfolioInvitation instance + portfolio_invitation = PortfolioInvitation( + email="james.gordon@gotham.gov", + portfolio=self.portfolio, + roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], + ) + + # Create a request object + request = self.factory.post("/admin/registrar/PortfolioInvitation/add/") + request.user = self.superuser + + # Call the save_model method + with patch.object(PortfolioInvitation, "retrieve") as portfolio_invitation_mock_retrieve: + admin_instance.save_model(request, portfolio_invitation, None, None) + + # Assert that send_portfolio_invitation_email is called + mock_send_email.assert_called() + + # Get the arguments passed to send_portfolio_invitation_email + _, called_kwargs = mock_send_email.call_args + + # Assert the email content + self.assertEqual(called_kwargs["email"], "james.gordon@gotham.gov") + self.assertEqual(called_kwargs["requestor"], self.superuser) + self.assertEqual(called_kwargs["portfolio"], self.portfolio) + + # Assert that a warning message was triggered + mock_messages_success.assert_called_once_with(request, "james.gordon@gotham.gov has been invited.") + + # The invitation is not retrieved + portfolio_invitation_mock_retrieve.assert_called_once() + + @less_console_noise_decorator + @patch("registrar.admin.send_portfolio_invitation_email") + @patch("django.contrib.messages.success") # Mock the `messages.warning` call + def test_add_portfolio_invitation_does_not_retrieve_invitation_when_no_user( + self, mock_messages_success, mock_send_email + ): + """On save_model, we create but do not retrieve a portfolio invitation if the user does not exist.""" + + # Create an instance of the admin class + admin_instance = PortfolioInvitationAdmin(PortfolioInvitation, admin_site=None) + + # Create a PortfolioInvitation instance + portfolio_invitation = PortfolioInvitation( + email="james.gordon@gotham.gov", + portfolio=self.portfolio, + roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], + ) + + # Create a request object + request = self.factory.post("/admin/registrar/PortfolioInvitation/add/") + request.user = self.superuser + + # Call the save_model method + with patch.object(PortfolioInvitation, "retrieve") as portfolio_invitation_mock_retrieve: + admin_instance.save_model(request, portfolio_invitation, None, None) + + # Assert that send_portfolio_invitation_email is called + mock_send_email.assert_called() + + # Get the arguments passed to send_portfolio_invitation_email + _, called_kwargs = mock_send_email.call_args + + # Assert the email content + self.assertEqual(called_kwargs["email"], "james.gordon@gotham.gov") + self.assertEqual(called_kwargs["requestor"], self.superuser) + self.assertEqual(called_kwargs["portfolio"], self.portfolio) + + # Assert that a warning message was triggered + mock_messages_success.assert_called_once_with(request, "james.gordon@gotham.gov has been invited.") + + # The invitation is not retrieved + portfolio_invitation_mock_retrieve.assert_not_called() + @less_console_noise_decorator @patch("registrar.admin.send_portfolio_invitation_email") @patch("django.contrib.messages.error") # Mock the `messages.error` call @@ -655,9 +1367,7 @@ class TestPortfolioInvitationAdmin(TestCase): admin_instance.save_model(request, portfolio_invitation, None, None) # Assert that messages.error was called with the correct message - mock_messages_error.assert_called_once_with( - request, "Could not send email invitation. Portfolio invitation not saved." - ) + mock_messages_error.assert_called_once_with(request, "Email service unavailable") @less_console_noise_decorator @patch("registrar.admin.send_portfolio_invitation_email") @@ -694,7 +1404,7 @@ class TestPortfolioInvitationAdmin(TestCase): @less_console_noise_decorator @patch("registrar.admin.send_portfolio_invitation_email") - @patch("django.contrib.messages.error") # Mock the `messages.error` call + @patch("django.contrib.messages.warning") # Mock the `messages.error` call def test_save_exception_generic_error(self, mock_messages_error, mock_send_email): """Handle generic exceptions correctly during portfolio invitation.""" self.client.force_login(self.superuser) @@ -720,9 +1430,7 @@ class TestPortfolioInvitationAdmin(TestCase): admin_instance.save_model(request, portfolio_invitation, None, None) # Assert that messages.error was called with the correct message - mock_messages_error.assert_called_once_with( - request, "Could not send email invitation. Portfolio invitation not saved." - ) + mock_messages_error.assert_called_once_with(request, "Could not send email invitation.") class TestHostAdmin(TestCase): diff --git a/src/registrar/tests/test_management_scripts.py b/src/registrar/tests/test_management_scripts.py index 8ecb7cbea..655068493 100644 --- a/src/registrar/tests/test_management_scripts.py +++ b/src/registrar/tests/test_management_scripts.py @@ -32,7 +32,7 @@ import tablib from unittest.mock import patch, call, MagicMock, mock_open from epplibwrapper import commands, common -from .common import MockEppLib, less_console_noise, completed_domain_request, MockSESClient +from .common import MockEppLib, less_console_noise, completed_domain_request, MockSESClient, MockDbForIndividualTests from api.tests.common import less_console_noise_decorator @@ -1844,3 +1844,318 @@ class TestCreateFederalPortfolio(TestCase): self.assertEqual(existing_portfolio.organization_name, self.federal_agency.agency) self.assertEqual(existing_portfolio.notes, "Old notes") self.assertEqual(existing_portfolio.creator, self.user) + + @less_console_noise_decorator + def test_post_process_suborganization_fields(self): + """Test suborganization field updates from domain and request data. + Also tests the priority order for updating city and state_territory: + 1. Domain information fields + 2. Domain request suborganization fields + 3. Domain request standard fields + """ + # Create test data with different field combinations + self.domain_info.organization_name = "super" + self.domain_info.city = "Domain City " + self.domain_info.state_territory = "NY" + self.domain_info.save() + + self.domain_request.organization_name = "super" + self.domain_request.suborganization_city = "Request Suborg City" + self.domain_request.suborganization_state_territory = "CA" + self.domain_request.city = "Request City" + self.domain_request.state_territory = "TX" + self.domain_request.save() + + # Create another request/info pair without domain info data + self.domain_info_2.organization_name = "creative" + self.domain_info_2.city = None + self.domain_info_2.state_territory = None + self.domain_info_2.save() + + self.domain_request_2.organization_name = "creative" + self.domain_request_2.suborganization_city = "Second Suborg City" + self.domain_request_2.suborganization_state_territory = "WA" + self.domain_request_2.city = "Second City" + self.domain_request_2.state_territory = "OR" + self.domain_request_2.save() + + # Create a third request/info pair without suborg data + self.domain_info_3.organization_name = "names" + self.domain_info_3.city = None + self.domain_info_3.state_territory = None + self.domain_info_3.save() + + self.domain_request_3.organization_name = "names" + self.domain_request_3.suborganization_city = None + self.domain_request_3.suborganization_state_territory = None + self.domain_request_3.city = "Third City" + self.domain_request_3.state_territory = "FL" + self.domain_request_3.save() + + # Test running the script with both, and just with parse_requests + self.run_create_federal_portfolio(agency_name="Test Federal Agency", parse_requests=True, parse_domains=True) + self.run_create_federal_portfolio( + agency_name="Executive Agency 1", + parse_requests=True, + ) + + self.domain_info.refresh_from_db() + self.domain_request.refresh_from_db() + self.domain_info_2.refresh_from_db() + self.domain_request_2.refresh_from_db() + self.domain_info_3.refresh_from_db() + self.domain_request_3.refresh_from_db() + + # Verify suborganizations were created with correct field values + # Should use domain info values + suborg_1 = Suborganization.objects.get(name=self.domain_info.organization_name) + self.assertEqual(suborg_1.city, "Domain City") + self.assertEqual(suborg_1.state_territory, "NY") + + # Should use domain request suborg values + suborg_2 = Suborganization.objects.get(name=self.domain_info_2.organization_name) + self.assertEqual(suborg_2.city, "Second Suborg City") + self.assertEqual(suborg_2.state_territory, "WA") + + # Should use domain request standard values + suborg_3 = Suborganization.objects.get(name=self.domain_info_3.organization_name) + self.assertEqual(suborg_3.city, "Third City") + self.assertEqual(suborg_3.state_territory, "FL") + + @less_console_noise_decorator + def test_post_process_suborganization_fields_duplicate_records(self): + """Test suborganization field updates when multiple domains/requests exist for the same org. + Tests that: + 1. City / state_territory us updated when all location info matches + 2. Updates are skipped when locations don't match + 3. Priority order is maintained across multiple records: + a. Domain information fields + b. Domain request suborganization fields + c. Domain request standard fields + """ + # Case 1: Multiple records with all fields matching + matching_request_1 = completed_domain_request( + name="matching1.gov", + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + organization_name="matching org", + city="Standard City", + state_territory=DomainRequest.StateTerritoryChoices.TEXAS, + suborganization_city="Suborg City", + suborganization_state_territory=DomainRequest.StateTerritoryChoices.CALIFORNIA, + federal_agency=self.federal_agency, + ) + matching_request_1.approve() + domain_info_1 = DomainInformation.objects.get(domain_request=matching_request_1) + domain_info_1.city = "Domain Info City" + domain_info_1.state_territory = DomainRequest.StateTerritoryChoices.NEW_YORK + domain_info_1.save() + + matching_request_2 = completed_domain_request( + name="matching2.gov", + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + organization_name="matching org", + city="Standard City", + state_territory=DomainRequest.StateTerritoryChoices.TEXAS, + suborganization_city="Suborg City", + suborganization_state_territory=DomainRequest.StateTerritoryChoices.CALIFORNIA, + federal_agency=self.federal_agency, + ) + matching_request_2.approve() + domain_info_2 = DomainInformation.objects.get(domain_request=matching_request_2) + domain_info_2.city = "Domain Info City" + domain_info_2.state_territory = DomainRequest.StateTerritoryChoices.NEW_YORK + domain_info_2.save() + + # Case 2: Multiple records with only request fields (no domain info) + request_only_1 = completed_domain_request( + name="request1.gov", + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + organization_name="request org", + city="Standard City", + state_territory=DomainRequest.StateTerritoryChoices.TEXAS, + suborganization_city="Suborg City", + suborganization_state_territory=DomainRequest.StateTerritoryChoices.CALIFORNIA, + federal_agency=self.federal_agency, + ) + request_only_1.approve() + domain_info_3 = DomainInformation.objects.get(domain_request=request_only_1) + domain_info_3.city = None + domain_info_3.state_territory = None + domain_info_3.save() + + request_only_2 = completed_domain_request( + name="request2.gov", + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + organization_name="request org", + city="Standard City", + state_territory=DomainRequest.StateTerritoryChoices.TEXAS, + suborganization_city="Suborg City", + suborganization_state_territory=DomainRequest.StateTerritoryChoices.CALIFORNIA, + federal_agency=self.federal_agency, + ) + request_only_2.approve() + domain_info_4 = DomainInformation.objects.get(domain_request=request_only_2) + domain_info_4.city = None + domain_info_4.state_territory = None + domain_info_4.save() + + # Case 3: Multiple records with only standard fields (no suborg) + standard_only_1 = completed_domain_request( + name="standard1.gov", + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + organization_name="standard org", + city="Standard City", + state_territory=DomainRequest.StateTerritoryChoices.TEXAS, + federal_agency=self.federal_agency, + ) + standard_only_1.approve() + domain_info_5 = DomainInformation.objects.get(domain_request=standard_only_1) + domain_info_5.city = None + domain_info_5.state_territory = None + domain_info_5.save() + + standard_only_2 = completed_domain_request( + name="standard2.gov", + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + organization_name="standard org", + city="Standard City", + state_territory=DomainRequest.StateTerritoryChoices.TEXAS, + federal_agency=self.federal_agency, + ) + standard_only_2.approve() + domain_info_6 = DomainInformation.objects.get(domain_request=standard_only_2) + domain_info_6.city = None + domain_info_6.state_territory = None + domain_info_6.save() + + # Case 4: Multiple records with mismatched locations + mismatch_request_1 = completed_domain_request( + name="mismatch1.gov", + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + organization_name="mismatch org", + city="City One", + state_territory=DomainRequest.StateTerritoryChoices.FLORIDA, + federal_agency=self.federal_agency, + ) + mismatch_request_1.approve() + domain_info_5 = DomainInformation.objects.get(domain_request=mismatch_request_1) + domain_info_5.city = "Different City" + domain_info_5.state_territory = DomainRequest.StateTerritoryChoices.ALASKA + domain_info_5.save() + + mismatch_request_2 = completed_domain_request( + name="mismatch2.gov", + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + organization_name="mismatch org", + city="City Two", + state_territory=DomainRequest.StateTerritoryChoices.HAWAII, + federal_agency=self.federal_agency, + ) + mismatch_request_2.approve() + domain_info_6 = DomainInformation.objects.get(domain_request=mismatch_request_2) + domain_info_6.city = "Another City" + domain_info_6.state_territory = DomainRequest.StateTerritoryChoices.CALIFORNIA + domain_info_6.save() + + # Run the portfolio creation script + self.run_create_federal_portfolio(agency_name="Test Federal Agency", parse_requests=True, parse_domains=True) + + # Case 1: Should use domain info values (highest priority) + matching_suborg = Suborganization.objects.get(name="matching org") + self.assertEqual(matching_suborg.city, "Domain Info City") + self.assertEqual(matching_suborg.state_territory, DomainRequest.StateTerritoryChoices.NEW_YORK) + + # Case 2: Should use suborg values (second priority) + request_suborg = Suborganization.objects.get(name="request org") + self.assertEqual(request_suborg.city, "Suborg City") + self.assertEqual(request_suborg.state_territory, DomainRequest.StateTerritoryChoices.CALIFORNIA) + + # Case 3: Should use standard values (lowest priority) + standard_suborg = Suborganization.objects.get(name="standard org") + self.assertEqual(standard_suborg.city, "Standard City") + self.assertEqual(standard_suborg.state_territory, DomainRequest.StateTerritoryChoices.TEXAS) + + # Case 4: Should skip update due to mismatched locations + mismatch_suborg = Suborganization.objects.get(name="mismatch org") + self.assertIsNone(mismatch_suborg.city) + self.assertIsNone(mismatch_suborg.state_territory) + + +class TestPatchSuborganizations(MockDbForIndividualTests): + """Tests for the patch_suborganizations management command.""" + + @less_console_noise_decorator + def run_patch_suborganizations(self): + """Helper method to run the patch_suborganizations command.""" + with patch( + "registrar.management.commands.utility.terminal_helper.TerminalHelper.prompt_for_execution", + return_value=True, + ): + call_command("patch_suborganizations") + + @less_console_noise_decorator + def test_space_and_case_duplicates(self): + """Test cleaning up duplicates that differ by spaces and case. + + Should keep the version with: + 1. Fewest spaces + 2. Most leading capitals + """ + Suborganization.objects.create(name="Test Organization ", portfolio=self.portfolio_1) + Suborganization.objects.create(name="test organization", portfolio=self.portfolio_1) + Suborganization.objects.create(name="Test Organization", portfolio=self.portfolio_1) + + # Create an unrelated record to test that it doesn't get deleted, too + Suborganization.objects.create(name="unrelated org", portfolio=self.portfolio_1) + self.run_patch_suborganizations() + self.assertEqual(Suborganization.objects.count(), 2) + self.assertEqual(Suborganization.objects.filter(name__in=["unrelated org", "Test Organization"]).count(), 2) + + @less_console_noise_decorator + def test_hardcoded_record(self): + """Tests that our hardcoded records update as we expect them to""" + # Create orgs with old and new name formats + old_name = "USDA/OC" + new_name = "USDA, Office of Communications" + + Suborganization.objects.create(name=old_name, portfolio=self.portfolio_1) + Suborganization.objects.create(name=new_name, portfolio=self.portfolio_1) + + self.run_patch_suborganizations() + + # Verify only the new one remains + self.assertEqual(Suborganization.objects.count(), 1) + remaining = Suborganization.objects.first() + self.assertEqual(remaining.name, new_name) + + @less_console_noise_decorator + def test_reference_updates(self): + """Test that references are updated on domain info and domain request before deletion.""" + # Create suborganizations + keep_org = Suborganization.objects.create(name="Test Organization", portfolio=self.portfolio_1) + delete_org = Suborganization.objects.create(name="test organization ", portfolio=self.portfolio_1) + unrelated_org = Suborganization.objects.create(name="awesome", portfolio=self.portfolio_1) + + # We expect these references to update + self.domain_request_1.sub_organization = delete_org + self.domain_information_1.sub_organization = delete_org + self.domain_request_1.save() + self.domain_information_1.save() + + # But not these ones + self.domain_request_2.sub_organization = unrelated_org + self.domain_information_2.sub_organization = unrelated_org + self.domain_request_2.save() + self.domain_information_2.save() + + self.run_patch_suborganizations() + + self.domain_request_1.refresh_from_db() + self.domain_information_1.refresh_from_db() + self.domain_request_2.refresh_from_db() + self.domain_information_2.refresh_from_db() + + self.assertEqual(self.domain_request_1.sub_organization, keep_org) + self.assertEqual(self.domain_information_1.sub_organization, keep_org) + self.assertEqual(self.domain_request_2.sub_organization, unrelated_org) + self.assertEqual(self.domain_information_2.sub_organization, unrelated_org) diff --git a/src/registrar/tests/test_models.py b/src/registrar/tests/test_models.py index 46604a44a..d8db0f043 100644 --- a/src/registrar/tests/test_models.py +++ b/src/registrar/tests/test_models.py @@ -28,6 +28,7 @@ from registrar.models.verified_by_staff import VerifiedByStaff # type: ignore from .common import ( MockSESClient, completed_domain_request, + create_superuser, create_test_user, ) from waffle.testutils import override_flag @@ -155,6 +156,7 @@ class TestPortfolioInvitations(TestCase): roles=[self.portfolio_role_base, self.portfolio_role_admin], additional_permissions=[self.portfolio_permission_1, self.portfolio_permission_2], ) + self.superuser = create_superuser() def tearDown(self): super().tearDown() @@ -294,10 +296,158 @@ class TestPortfolioInvitations(TestCase): # Verify self.assertEquals(self.invitation.get_portfolio_permissions(), perm_list) + @less_console_noise_decorator + @override_flag("multiple_portfolios", active=False) + def test_clean_multiple_portfolios_inactive(self): + """Tests that users cannot have multiple portfolios or invitations when flag is inactive""" + # Create the first portfolio permission + UserPortfolioPermission.objects.create( + user=self.superuser, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] + ) + + # Test a second portfolio permission object (should fail) + second_portfolio = Portfolio.objects.create(organization_name="Second Portfolio", creator=self.superuser) + second_permission = UserPortfolioPermission( + user=self.superuser, portfolio=second_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] + ) + + with self.assertRaises(ValidationError) as err: + second_permission.clean() + self.assertIn("users cannot be assigned to multiple portfolios", str(err.exception)) + + # Test that adding a new portfolio invitation also fails + third_portfolio = Portfolio.objects.create(organization_name="Third Portfolio", creator=self.superuser) + invitation = PortfolioInvitation( + email=self.superuser.email, portfolio=third_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] + ) + + with self.assertRaises(ValidationError) as err: + invitation.clean() + self.assertIn("users cannot be assigned to multiple portfolios", str(err.exception)) + + @less_console_noise_decorator + @override_flag("multiple_portfolios", active=True) + def test_clean_multiple_portfolios_active(self): + """Tests that users can have multiple portfolios and invitations when flag is active""" + # Create first portfolio permission + UserPortfolioPermission.objects.create( + user=self.superuser, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] + ) + + # Second portfolio permission should succeed + second_portfolio = Portfolio.objects.create(organization_name="Second Portfolio", creator=self.superuser) + second_permission = UserPortfolioPermission( + user=self.superuser, portfolio=second_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] + ) + second_permission.clean() + second_permission.save() + + # Verify both permissions exist + user_permissions = UserPortfolioPermission.objects.filter(user=self.superuser) + self.assertEqual(user_permissions.count(), 2) + + # Portfolio invitation should also succeed + third_portfolio = Portfolio.objects.create(organization_name="Third Portfolio", creator=self.superuser) + invitation = PortfolioInvitation( + email=self.superuser.email, portfolio=third_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] + ) + invitation.clean() + invitation.save() + + # Verify invitation exists + self.assertTrue( + PortfolioInvitation.objects.filter( + email=self.superuser.email, + portfolio=third_portfolio, + ).exists() + ) + + @less_console_noise_decorator + def test_clean_portfolio_invitation(self): + """Tests validation of portfolio invitation permissions""" + + # Test validation fails when portfolio missing but permissions present + invitation = PortfolioInvitation(email="test@example.com", roles=["organization_admin"], portfolio=None) + with self.assertRaises(ValidationError) as err: + invitation.clean() + self.assertEqual( + str(err.exception), + "When portfolio roles or additional permissions are assigned, portfolio is required.", + ) + + # Test validation fails when portfolio present but no permissions + invitation = PortfolioInvitation(email="test@example.com", roles=None, portfolio=self.portfolio) + with self.assertRaises(ValidationError) as err: + invitation.clean() + self.assertEqual( + str(err.exception), + "When portfolio is assigned, portfolio roles or additional permissions are required.", + ) + + # Test validation fails with forbidden permissions + forbidden_member_roles = UserPortfolioPermission.FORBIDDEN_PORTFOLIO_ROLE_PERMISSIONS.get( + UserPortfolioRoleChoices.ORGANIZATION_MEMBER + ) + invitation = PortfolioInvitation( + email="test@example.com", + roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER], + additional_permissions=forbidden_member_roles, + portfolio=self.portfolio, + ) + with self.assertRaises(ValidationError) as err: + invitation.clean() + self.assertEqual( + str(err.exception), + "These permissions cannot be assigned to Member: " + "", + ) + + @less_console_noise_decorator + @override_flag("multiple_portfolios", active=False) + def test_clean_user_portfolio_permission_multiple_portfolios_flag_off_and_duplicate_permission(self): + """MISSING TEST: Test validation of multiple_portfolios flag. + Scenario 1: Flag is inactive, and the user has existing portfolio permissions + + NOTE: Refer to the same test under TestUserPortfolioPermission""" + + pass + + @less_console_noise_decorator + @override_flag("multiple_portfolios", active=False) + def test_clean_user_portfolio_permission_multiple_portfolios_flag_off_and_existing_invitation(self): + """MISSING TEST: Test validation of multiple_portfolios flag. + Scenario 2: Flag is inactive, and the user has existing portfolio invitation to another portfolio + + NOTE: Refer to the same test under TestUserPortfolioPermission""" + + pass + + @less_console_noise_decorator + @override_flag("multiple_portfolios", active=True) + def test_clean_user_portfolio_permission_multiple_portfolios_flag_on_and_duplicate_permission(self): + """MISSING TEST: Test validation of multiple_portfolios flag. + Scenario 3: Flag is active, and the user has existing portfolio invitation + + NOTE: Refer to the same test under TestUserPortfolioPermission""" + + pass + + @less_console_noise_decorator + @override_flag("multiple_portfolios", active=True) + def test_clean_user_portfolio_permission_multiple_portfolios_flag_on_and_existing_invitation(self): + """MISSING TEST: Test validation of multiple_portfolios flag. + Scenario 4: Flag is active, and the user has existing portfolio invitation to another portfolio + + NOTE: Refer to the same test under TestUserPortfolioPermission""" + + pass + class TestUserPortfolioPermission(TestCase): @less_console_noise_decorator def setUp(self): + self.superuser = create_superuser() + self.portfolio = Portfolio.objects.create(organization_name="Test Portfolio", creator=self.superuser) self.user, _ = User.objects.get_or_create(email="mayor@igorville.gov") self.user2, _ = User.objects.get_or_create(email="user2@igorville.gov", username="user2") super().setUp() @@ -311,6 +461,7 @@ class TestUserPortfolioPermission(TestCase): Portfolio.objects.all().delete() User.objects.all().delete() UserDomainRole.objects.all().delete() + PortfolioInvitation.objects.all().delete() @less_console_noise_decorator @override_flag("multiple_portfolios", active=True) @@ -427,6 +578,178 @@ class TestUserPortfolioPermission(TestCase): # Assert self.assertEqual(portfolio_permission.get_managed_domains_count(), 1) + @less_console_noise_decorator + def test_clean_user_portfolio_permission(self): + """Tests validation of user portfolio permission""" + + # Test validation fails when portfolio missing but permissions are present + permission = UserPortfolioPermission(user=self.superuser, roles=["organization_admin"], portfolio=None) + with self.assertRaises(ValidationError) as err: + permission.clean() + self.assertEqual( + str(err.exception), + "When portfolio roles or additional permissions are assigned, portfolio is required.", + ) + + # Test validation fails when portfolio present but no permissions are present + permission = UserPortfolioPermission(user=self.superuser, roles=None, portfolio=self.portfolio) + with self.assertRaises(ValidationError) as err: + permission.clean() + self.assertEqual( + str(err.exception), + "When portfolio is assigned, portfolio roles or additional permissions are required.", + ) + + # Test validation fails with forbidden permissions for single role + forbidden_member_roles = UserPortfolioPermission.FORBIDDEN_PORTFOLIO_ROLE_PERMISSIONS.get( + UserPortfolioRoleChoices.ORGANIZATION_MEMBER + ) + permission = UserPortfolioPermission( + user=self.superuser, + roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER], + additional_permissions=forbidden_member_roles, + portfolio=self.portfolio, + ) + with self.assertRaises(ValidationError) as err: + permission.clean() + self.assertEqual( + str(err.exception), + "These permissions cannot be assigned to Member: " + "", + ) + + @less_console_noise_decorator + @override_flag("multiple_portfolios", active=False) + def test_clean_user_portfolio_permission_multiple_portfolios_flag_off_and_duplicate_permission(self): + """Test validation of multiple_portfolios flag. + Scenario 1: Flag is inactive, and the user has existing portfolio permissions""" + + # existing permission + UserPortfolioPermission.objects.create( + user=self.superuser, + roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], + portfolio=self.portfolio, + ) + + permission = UserPortfolioPermission( + user=self.superuser, + roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], + portfolio=self.portfolio, + ) + + with self.assertRaises(ValidationError) as err: + permission.clean() + + self.assertEqual( + str(err.exception.messages[0]), + "This user is already assigned to a portfolio. " + "Based on current waffle flag settings, users cannot be assigned to multiple portfolios.", + ) + + @less_console_noise_decorator + @override_flag("multiple_portfolios", active=False) + def test_clean_user_portfolio_permission_multiple_portfolios_flag_off_and_existing_invitation(self): + """Test validation of multiple_portfolios flag. + Scenario 2: Flag is inactive, and the user has existing portfolio invitation to another portfolio""" + + portfolio2 = Portfolio.objects.create(creator=self.superuser, organization_name="Joey go away") + + PortfolioInvitation.objects.create( + email=self.superuser.email, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], portfolio=portfolio2 + ) + + permission = UserPortfolioPermission( + user=self.superuser, + roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], + portfolio=self.portfolio, + ) + + with self.assertRaises(ValidationError) as err: + permission.clean() + + self.assertEqual( + str(err.exception.messages[0]), + "This user is already assigned to a portfolio invitation. " + "Based on current waffle flag settings, users cannot be assigned to multiple portfolios.", + ) + + @less_console_noise_decorator + @override_flag("multiple_portfolios", active=True) + def test_clean_user_portfolio_permission_multiple_portfolios_flag_on_and_duplicate_permission(self): + """Test validation of multiple_portfolios flag. + Scenario 3: Flag is active, and the user has existing portfolio invitation""" + + # existing permission + UserPortfolioPermission.objects.create( + user=self.superuser, + roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], + portfolio=self.portfolio, + ) + + permission = UserPortfolioPermission( + user=self.superuser, + roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], + portfolio=self.portfolio, + ) + + # Should not raise any exceptions + try: + permission.clean() + except ValidationError: + self.fail("ValidationError was raised unexpectedly when flag is active.") + + @less_console_noise_decorator + @override_flag("multiple_portfolios", active=True) + def test_clean_user_portfolio_permission_multiple_portfolios_flag_on_and_existing_invitation(self): + """Test validation of multiple_portfolios flag. + Scenario 4: Flag is active, and the user has existing portfolio invitation to another portfolio""" + + portfolio2 = Portfolio.objects.create(creator=self.superuser, organization_name="Joey go away") + + PortfolioInvitation.objects.create( + email=self.superuser.email, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], portfolio=portfolio2 + ) + + permission = UserPortfolioPermission( + user=self.superuser, + roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], + portfolio=self.portfolio, + ) + + # Should not raise any exceptions + try: + permission.clean() + except ValidationError: + self.fail("ValidationError was raised unexpectedly when flag is active.") + + @less_console_noise_decorator + def test_get_forbidden_permissions_with_multiple_roles(self): + """Tests that forbidden permissions are properly handled when a user has multiple roles""" + # Get forbidden permissions for member role + member_forbidden = UserPortfolioPermission.FORBIDDEN_PORTFOLIO_ROLE_PERMISSIONS.get( + UserPortfolioRoleChoices.ORGANIZATION_MEMBER + ) + + # Test with both admin and member roles + roles = [UserPortfolioRoleChoices.ORGANIZATION_ADMIN, UserPortfolioRoleChoices.ORGANIZATION_MEMBER] + + # These permissions would be forbidden for member alone, but should be allowed + # when combined with admin role + permissions = UserPortfolioPermission.get_forbidden_permissions( + roles=roles, additional_permissions=member_forbidden + ) + + # Should return empty set since no permissions are commonly forbidden between admin and member + self.assertEqual(permissions, set()) + + # Verify the same permissions are forbidden when only member role is present + member_only_permissions = UserPortfolioPermission.get_forbidden_permissions( + roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER], additional_permissions=member_forbidden + ) + + # Should return the forbidden permissions for member role + self.assertEqual(member_only_permissions, set(member_forbidden)) + class TestUser(TestCase): """Test actions that occur on user login, diff --git a/src/registrar/tests/test_reports.py b/src/registrar/tests/test_reports.py index 4a41238c7..b11500ea9 100644 --- a/src/registrar/tests/test_reports.py +++ b/src/registrar/tests/test_reports.py @@ -900,6 +900,7 @@ class MemberExportTest(MockDbForIndividualTests, MockEppLib): # spaces and leading/trailing whitespace csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip() expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip() + self.maxDiff = None self.assertEqual(csv_content, expected_content) diff --git a/src/registrar/tests/test_views_domain.py b/src/registrar/tests/test_views_domain.py index d92da17dd..f13490312 100644 --- a/src/registrar/tests/test_views_domain.py +++ b/src/registrar/tests/test_views_domain.py @@ -720,6 +720,8 @@ class TestDomainManagers(TestDomainOverview): def tearDown(self): """Ensure that the user has its original permissions""" PortfolioInvitation.objects.all().delete() + UserPortfolioPermission.objects.all().delete() + User.objects.exclude(id=self.user.id).delete() super().tearDown() @less_console_noise_decorator @@ -807,21 +809,76 @@ class TestDomainManagers(TestDomainOverview): call_args = mock_send_domain_email.call_args.kwargs self.assertEqual(call_args["email"], "mayor@igorville.gov") self.assertEqual(call_args["requestor"], self.user) - self.assertEqual(call_args["domain"], self.domain) + self.assertEqual(call_args["domains"], self.domain) self.assertIsNone(call_args.get("is_member_of_different_org")) - # Assert that the PortfolioInvitation is created + # Assert that the PortfolioInvitation is created and retrieved portfolio_invitation = PortfolioInvitation.objects.filter( email="mayor@igorville.gov", portfolio=self.portfolio ).first() self.assertIsNotNone(portfolio_invitation, "Portfolio invitation should be created.") self.assertEqual(portfolio_invitation.email, "mayor@igorville.gov") self.assertEqual(portfolio_invitation.portfolio, self.portfolio) + self.assertEqual(portfolio_invitation.status, PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED) + + # Assert that the UserPortfolioPermission is created + user_portfolio_permission = UserPortfolioPermission.objects.filter( + user=self.user, portfolio=self.portfolio + ).first() + self.assertIsNotNone(user_portfolio_permission, "User portfolio permission should be created") self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) success_page = success_result.follow() self.assertContains(success_page, "mayor@igorville.gov") + @boto3_mocking.patching + @override_flag("organization_feature", active=True) + @less_console_noise_decorator + @patch("registrar.views.domain.send_portfolio_invitation_email") + @patch("registrar.views.domain.send_domain_invitation_email") + def test_domain_user_add_form_sends_portfolio_invitation_to_new_email( + self, mock_send_domain_email, mock_send_portfolio_email + ): + """Adding an email not associated with a user works and sends portfolio invitation.""" + add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id})) + session_id = self.app.cookies[settings.SESSION_COOKIE_NAME] + + add_page.form["email"] = "notauser@igorville.gov" + + self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) + + success_result = add_page.form.submit() + + self.assertEqual(success_result.status_code, 302) + self.assertEqual( + success_result["Location"], + reverse("domain-users", kwargs={"pk": self.domain.id}), + ) + + # Verify that the invitation emails were sent + mock_send_portfolio_email.assert_called_once_with( + email="notauser@igorville.gov", requestor=self.user, portfolio=self.portfolio + ) + mock_send_domain_email.assert_called_once() + call_args = mock_send_domain_email.call_args.kwargs + self.assertEqual(call_args["email"], "notauser@igorville.gov") + self.assertEqual(call_args["requestor"], self.user) + self.assertEqual(call_args["domains"], self.domain) + self.assertIsNone(call_args.get("is_member_of_different_org")) + + # Assert that the PortfolioInvitation is created + portfolio_invitation = PortfolioInvitation.objects.filter( + email="notauser@igorville.gov", portfolio=self.portfolio + ).first() + self.assertIsNotNone(portfolio_invitation, "Portfolio invitation should be created.") + self.assertEqual(portfolio_invitation.email, "notauser@igorville.gov") + self.assertEqual(portfolio_invitation.portfolio, self.portfolio) + self.assertEqual(portfolio_invitation.status, PortfolioInvitation.PortfolioInvitationStatus.INVITED) + + self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) + success_page = success_result.follow() + self.assertContains(success_page, "notauser@igorville.gov") + @boto3_mocking.patching @override_flag("organization_feature", active=True) @less_console_noise_decorator @@ -857,7 +914,7 @@ class TestDomainManagers(TestDomainOverview): call_args = mock_send_domain_email.call_args.kwargs self.assertEqual(call_args["email"], "mayor@igorville.gov") self.assertEqual(call_args["requestor"], self.user) - self.assertEqual(call_args["domain"], self.domain) + self.assertEqual(call_args["domains"], self.domain) self.assertIsNone(call_args.get("is_member_of_different_org")) # Assert that no PortfolioInvitation is created @@ -915,7 +972,7 @@ class TestDomainManagers(TestDomainOverview): self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) success_page = success_result.follow() - self.assertContains(success_page, "Could not send email invitation.") + self.assertContains(success_page, "Failed to send email.") @boto3_mocking.patching @less_console_noise_decorator diff --git a/src/registrar/tests/test_views_portfolio.py b/src/registrar/tests/test_views_portfolio.py index 9bc97874d..78a4dae82 100644 --- a/src/registrar/tests/test_views_portfolio.py +++ b/src/registrar/tests/test_views_portfolio.py @@ -2106,25 +2106,75 @@ class TestPortfolioInvitedMemberDomainsView(TestWithUser, WebTest): self.assertEqual(response.status_code, 404) -class TestPortfolioMemberDomainsEditView(TestPortfolioMemberDomainsView): +class TestPortfolioMemberDomainsEditView(TestWithUser, WebTest): @classmethod def setUpClass(cls): super().setUpClass() - cls.url = reverse("member-domains-edit", kwargs={"pk": cls.portfolio_permission.pk}) + # Create Portfolio + cls.portfolio = Portfolio.objects.create(creator=cls.user, organization_name="Test Portfolio") + # Create domains for testing + cls.domain1 = Domain.objects.create(name="1.gov") + cls.domain2 = Domain.objects.create(name="2.gov") + cls.domain3 = Domain.objects.create(name="3.gov") @classmethod def tearDownClass(cls): super().tearDownClass() + Portfolio.objects.all().delete() + User.objects.all().delete() + Domain.objects.all().delete() def setUp(self): super().setUp() - names = ["1.gov", "2.gov", "3.gov"] - Domain.objects.bulk_create([Domain(name=name) for name in names]) + # Create test member + self.user_member = User.objects.create( + username="test_member", + first_name="Second", + last_name="User", + email="second@example.com", + phone="8003112345", + title="Member", + ) + # Create test user with no perms + self.user_no_perms = User.objects.create( + username="test_user_no_perms", + first_name="No", + last_name="Permissions", + email="user_no_perms@example.com", + phone="8003112345", + title="No Permissions", + ) + # Assign permissions to the user making requests + self.portfolio_permission = UserPortfolioPermission.objects.create( + user=self.user, + portfolio=self.portfolio, + roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], + additional_permissions=[ + UserPortfolioPermissionChoices.VIEW_MEMBERS, + UserPortfolioPermissionChoices.EDIT_MEMBERS, + ], + ) + # Assign permissions to test member + self.permission = UserPortfolioPermission.objects.create( + user=self.user_member, + portfolio=self.portfolio, + roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], + additional_permissions=[ + UserPortfolioPermissionChoices.VIEW_MEMBERS, + UserPortfolioPermissionChoices.EDIT_MEMBERS, + ], + ) + # Create url to be used in all tests + self.url = reverse("member-domains-edit", kwargs={"pk": self.portfolio_permission.pk}) def tearDown(self): super().tearDown() UserDomainRole.objects.all().delete() - Domain.objects.all().delete() + DomainInvitation.objects.all().delete() + UserPortfolioPermission.objects.all().delete() + PortfolioInvitation.objects.all().delete() + Portfolio.objects.exclude(id=self.portfolio.id).delete() + User.objects.exclude(id=self.user.id).delete() @less_console_noise_decorator @override_flag("organization_feature", active=True) @@ -2180,12 +2230,13 @@ class TestPortfolioMemberDomainsEditView(TestPortfolioMemberDomainsView): @less_console_noise_decorator @override_flag("organization_feature", active=True) @override_flag("organization_members", active=True) - def test_post_with_valid_added_domains(self): + @patch("registrar.views.portfolios.send_domain_invitation_email") + def test_post_with_valid_added_domains(self, mock_send_domain_email): """Test that domains can be successfully added.""" self.client.force_login(self.user) data = { - "added_domains": json.dumps([1, 2, 3]), # Mock domain IDs + "added_domains": json.dumps([self.domain1.id, self.domain2.id, self.domain3.id]), # Mock domain IDs } response = self.client.post(self.url, data) @@ -2198,31 +2249,43 @@ class TestPortfolioMemberDomainsEditView(TestPortfolioMemberDomainsView): self.assertEqual(len(messages), 1) self.assertEqual(str(messages[0]), "The domain assignment changes have been saved.") + expected_domains = [self.domain1, self.domain2, self.domain3] + # Verify that the invitation email was sent + mock_send_domain_email.assert_called_once() + call_args = mock_send_domain_email.call_args.kwargs + self.assertEqual(call_args["email"], "info@example.com") + self.assertEqual(call_args["requestor"], self.user) + self.assertEqual(list(call_args["domains"]), list(expected_domains)) + self.assertIsNone(call_args.get("is_member_of_different_org")) + @less_console_noise_decorator @override_flag("organization_feature", active=True) @override_flag("organization_members", active=True) - def test_post_with_valid_removed_domains(self): + @patch("registrar.views.portfolios.send_domain_invitation_email") + def test_post_with_valid_removed_domains(self, mock_send_domain_email): """Test that domains can be successfully removed.""" self.client.force_login(self.user) # Create some UserDomainRole objects - domains = [1, 2, 3] - UserDomainRole.objects.bulk_create([UserDomainRole(domain_id=domain, user=self.user) for domain in domains]) + domains = [self.domain1, self.domain2, self.domain3] + UserDomainRole.objects.bulk_create([UserDomainRole(domain=domain, user=self.user) for domain in domains]) data = { - "removed_domains": json.dumps([1, 2]), + "removed_domains": json.dumps([self.domain1.id, self.domain2.id]), } response = self.client.post(self.url, data) # Check that the UserDomainRole objects were deleted self.assertEqual(UserDomainRole.objects.filter(user=self.user).count(), 1) - self.assertEqual(UserDomainRole.objects.filter(domain_id=3, user=self.user).count(), 1) + self.assertEqual(UserDomainRole.objects.filter(domain=self.domain3, user=self.user).count(), 1) # Check for a success message and a redirect self.assertRedirects(response, reverse("member-domains", kwargs={"pk": self.portfolio_permission.pk})) messages = list(response.wsgi_request._messages) self.assertEqual(len(messages), 1) self.assertEqual(str(messages[0]), "The domain assignment changes have been saved.") + # assert that send_domain_invitation_email is not called + mock_send_domain_email.assert_not_called() UserDomainRole.objects.all().delete() @@ -2290,26 +2353,93 @@ class TestPortfolioMemberDomainsEditView(TestPortfolioMemberDomainsView): self.assertEqual(len(messages), 1) self.assertEqual(str(messages[0]), "No changes detected.") + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("organization_members", active=True) + @patch("registrar.views.portfolios.send_domain_invitation_email") + def test_post_when_send_domain_email_raises_exception(self, mock_send_domain_email): + """Test attempt to add new domains when an EmailSendingError raised.""" + self.client.force_login(self.user) -class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomainsView): + data = { + "added_domains": json.dumps([self.domain1.id, self.domain2.id, self.domain3.id]), # Mock domain IDs + } + mock_send_domain_email.side_effect = EmailSendingError("Failed to send email") + response = self.client.post(self.url, data) + + # Check that the UserDomainRole objects were not created + self.assertEqual(UserDomainRole.objects.filter(user=self.user, role=UserDomainRole.Roles.MANAGER).count(), 0) + + # Check for an error message and a redirect to edit form + self.assertRedirects(response, reverse("member-domains-edit", kwargs={"pk": self.portfolio_permission.pk})) + messages = list(response.wsgi_request._messages) + self.assertEqual(len(messages), 1) + self.assertEqual( + str(messages[0]), + "An unexpected error occurred: Failed to send email. If the issue persists, please contact help@get.gov.", + ) + + +class TestPortfolioInvitedMemberEditDomainsView(TestWithUser, WebTest): @classmethod def setUpClass(cls): super().setUpClass() - cls.url = reverse("invitedmember-domains-edit", kwargs={"pk": cls.invitation.pk}) + # Create Portfolio + cls.portfolio = Portfolio.objects.create(creator=cls.user, organization_name="Test Portfolio") + # Create domains for testing + cls.domain1 = Domain.objects.create(name="1.gov") + cls.domain2 = Domain.objects.create(name="2.gov") + cls.domain3 = Domain.objects.create(name="3.gov") @classmethod def tearDownClass(cls): super().tearDownClass() + Portfolio.objects.all().delete() + User.objects.all().delete() + Domain.objects.all().delete() def setUp(self): super().setUp() - names = ["1.gov", "2.gov", "3.gov"] - Domain.objects.bulk_create([Domain(name=name) for name in names]) + # Add a user with no permissions + self.user_no_perms = User.objects.create( + username="test_user_no_perms", + first_name="No", + last_name="Permissions", + email="user_no_perms@example.com", + phone="8003112345", + title="No Permissions", + ) + # Add an invited member who has been invited to manage domains + self.invited_member_email = "invited@example.com" + self.invitation = PortfolioInvitation.objects.create( + email=self.invited_member_email, + portfolio=self.portfolio, + roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER], + additional_permissions=[ + UserPortfolioPermissionChoices.VIEW_MEMBERS, + ], + ) + + # Assign permissions to the user making requests + UserPortfolioPermission.objects.create( + user=self.user, + portfolio=self.portfolio, + roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], + additional_permissions=[ + UserPortfolioPermissionChoices.VIEW_MEMBERS, + UserPortfolioPermissionChoices.EDIT_MEMBERS, + ], + ) + self.url = reverse("invitedmember-domains-edit", kwargs={"pk": self.invitation.pk}) def tearDown(self): super().tearDown() Domain.objects.all().delete() DomainInvitation.objects.all().delete() + UserPortfolioPermission.objects.all().delete() + PortfolioInvitation.objects.all().delete() + Portfolio.objects.exclude(id=self.portfolio.id).delete() + User.objects.exclude(id=self.user.id).delete() @less_console_noise_decorator @override_flag("organization_feature", active=True) @@ -2364,12 +2494,13 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain @less_console_noise_decorator @override_flag("organization_feature", active=True) @override_flag("organization_members", active=True) - def test_post_with_valid_added_domains(self): + @patch("registrar.views.portfolios.send_domain_invitation_email") + def test_post_with_valid_added_domains(self, mock_send_domain_email): """Test adding new domains successfully.""" self.client.force_login(self.user) data = { - "added_domains": json.dumps([1, 2, 3]), # Mock domain IDs + "added_domains": json.dumps([self.domain1.id, self.domain2.id, self.domain3.id]), } response = self.client.post(self.url, data) @@ -2387,10 +2518,20 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain self.assertEqual(len(messages), 1) self.assertEqual(str(messages[0]), "The domain assignment changes have been saved.") + expected_domains = [self.domain1, self.domain2, self.domain3] + # Verify that the invitation email was sent + mock_send_domain_email.assert_called_once() + call_args = mock_send_domain_email.call_args.kwargs + self.assertEqual(call_args["email"], "invited@example.com") + self.assertEqual(call_args["requestor"], self.user) + self.assertEqual(list(call_args["domains"]), list(expected_domains)) + self.assertFalse(call_args.get("is_member_of_different_org")) + @less_console_noise_decorator @override_flag("organization_feature", active=True) @override_flag("organization_members", active=True) - def test_post_with_existing_and_new_added_domains(self): + @patch("registrar.views.portfolios.send_domain_invitation_email") + def test_post_with_existing_and_new_added_domains(self, _): """Test updating existing and adding new invitations.""" self.client.force_login(self.user) @@ -2398,29 +2539,33 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain DomainInvitation.objects.bulk_create( [ DomainInvitation( - domain_id=1, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.CANCELED + domain=self.domain1, + email="invited@example.com", + status=DomainInvitation.DomainInvitationStatus.CANCELED, ), DomainInvitation( - domain_id=2, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED + domain=self.domain2, + email="invited@example.com", + status=DomainInvitation.DomainInvitationStatus.INVITED, ), ] ) data = { - "added_domains": json.dumps([1, 2, 3]), + "added_domains": json.dumps([self.domain1.id, self.domain2.id, self.domain3.id]), } response = self.client.post(self.url, data) # Check that status for domain_id=1 was updated to INVITED self.assertEqual( - DomainInvitation.objects.get(domain_id=1, email="invited@example.com").status, + DomainInvitation.objects.get(domain=self.domain1, email="invited@example.com").status, DomainInvitation.DomainInvitationStatus.INVITED, ) # Check that domain_id=3 was created as INVITED self.assertTrue( DomainInvitation.objects.filter( - domain_id=3, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED + domain=self.domain3, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED ).exists() ) @@ -2430,7 +2575,8 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain @less_console_noise_decorator @override_flag("organization_feature", active=True) @override_flag("organization_members", active=True) - def test_post_with_valid_removed_domains(self): + @patch("registrar.views.portfolios.send_domain_invitation_email") + def test_post_with_valid_removed_domains(self, mock_send_domain_email): """Test removing domains successfully.""" self.client.force_login(self.user) @@ -2438,33 +2584,39 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain DomainInvitation.objects.bulk_create( [ DomainInvitation( - domain_id=1, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED + domain=self.domain1, + email="invited@example.com", + status=DomainInvitation.DomainInvitationStatus.INVITED, ), DomainInvitation( - domain_id=2, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED + domain=self.domain2, + email="invited@example.com", + status=DomainInvitation.DomainInvitationStatus.INVITED, ), ] ) data = { - "removed_domains": json.dumps([1]), + "removed_domains": json.dumps([self.domain1.id]), } response = self.client.post(self.url, data) # Check that the status for domain_id=1 was updated to CANCELED self.assertEqual( - DomainInvitation.objects.get(domain_id=1, email="invited@example.com").status, + DomainInvitation.objects.get(domain=self.domain1, email="invited@example.com").status, DomainInvitation.DomainInvitationStatus.CANCELED, ) # Check that domain_id=2 remains INVITED self.assertEqual( - DomainInvitation.objects.get(domain_id=2, email="invited@example.com").status, + DomainInvitation.objects.get(domain=self.domain2, email="invited@example.com").status, DomainInvitation.DomainInvitationStatus.INVITED, ) # Check for a success message and a redirect self.assertRedirects(response, reverse("invitedmember-domains", kwargs={"pk": self.invitation.pk})) + # assert that send_domain_invitation_email is not called + mock_send_domain_email.assert_not_called() @less_console_noise_decorator @override_flag("organization_feature", active=True) @@ -2530,6 +2682,37 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain self.assertEqual(len(messages), 1) self.assertEqual(str(messages[0]), "No changes detected.") + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("organization_members", active=True) + @patch("registrar.views.portfolios.send_domain_invitation_email") + def test_post_when_send_domain_email_raises_exception(self, mock_send_domain_email): + """Test attempt to add new domains when an EmailSendingError raised.""" + self.client.force_login(self.user) + + data = { + "added_domains": json.dumps([self.domain1.id, self.domain2.id, self.domain3.id]), + } + mock_send_domain_email.side_effect = EmailSendingError("Failed to send email") + response = self.client.post(self.url, data) + + # Check that the DomainInvitation objects were not created + self.assertEqual( + DomainInvitation.objects.filter( + email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED + ).count(), + 0, + ) + + # Check for an error message and a redirect to edit form + self.assertRedirects(response, reverse("invitedmember-domains-edit", kwargs={"pk": self.invitation.pk})) + messages = list(response.wsgi_request._messages) + self.assertEqual(len(messages), 1) + self.assertEqual( + str(messages[0]), + "An unexpected error occurred: Failed to send email. If the issue persists, please contact help@get.gov.", + ) + class TestRequestingEntity(WebTest): """The requesting entity page is a domain request form that only exists @@ -2879,7 +3062,7 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest): ], ) - cls.new_member_email = "davekenn4242@gmail.com" + cls.new_member_email = "newmember@example.com" AllowedEmail.objects.get_or_create(email=cls.new_member_email) @@ -2933,11 +3116,13 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest): self.assertEqual(final_response.status_code, 302) # Redirects # Validate Database Changes + # Validate that portfolio invitation was created but not retrieved portfolio_invite = PortfolioInvitation.objects.filter( email=self.new_member_email, portfolio=self.portfolio ).first() self.assertIsNotNone(portfolio_invite) self.assertEqual(portfolio_invite.email, self.new_member_email) + self.assertEqual(portfolio_invite.status, PortfolioInvitation.PortfolioInvitationStatus.INVITED) # Check that an email was sent self.assertTrue(mock_client.send_email.called) @@ -3228,6 +3413,52 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest): # assert that send_portfolio_invitation_email is not called mock_send_email.assert_not_called() + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("organization_members", active=True) + @patch("registrar.views.portfolios.send_portfolio_invitation_email") + def test_member_invite_for_existing_user_who_is_not_a_member(self, mock_send_email): + """Tests the member invitation flow for existing user who is not a portfolio member.""" + self.client.force_login(self.user) + + # Simulate a session to ensure continuity + session_id = self.client.session.session_key + self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) + + new_user = User.objects.create(email="newuser@example.com") + + # Simulate submission of member invite for the newly created user + response = self.client.post( + reverse("new-member"), + { + "role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER.value, + "domain_request_permission_member": UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS.value, + "email": "newuser@example.com", + }, + ) + self.assertEqual(response.status_code, 302) + + # Validate Database Changes + # Validate that portfolio invitation was created and retrieved + portfolio_invite = PortfolioInvitation.objects.filter( + email="newuser@example.com", portfolio=self.portfolio + ).first() + self.assertIsNotNone(portfolio_invite) + self.assertEqual(portfolio_invite.email, "newuser@example.com") + self.assertEqual(portfolio_invite.status, PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED) + # Validate UserPortfolioPermission + user_portfolio_permission = UserPortfolioPermission.objects.filter( + user=new_user, portfolio=self.portfolio + ).first() + self.assertIsNotNone(user_portfolio_permission) + + # assert that send_portfolio_invitation_email is called + mock_send_email.assert_called_once() + call_args = mock_send_email.call_args.kwargs + self.assertEqual(call_args["email"], "newuser@example.com") + self.assertEqual(call_args["requestor"], self.user) + self.assertIsNone(call_args.get("is_member_of_different_org")) + class TestEditPortfolioMemberView(WebTest): """Tests for the edit member page on portfolios""" diff --git a/src/registrar/utility/email_invitations.py b/src/registrar/utility/email_invitations.py index 7171b8902..48c796340 100644 --- a/src/registrar/utility/email_invitations.py +++ b/src/registrar/utility/email_invitations.py @@ -1,5 +1,6 @@ from django.conf import settings from registrar.models import DomainInvitation +from registrar.models.domain import Domain from registrar.utility.errors import ( AlreadyDomainInvitedError, AlreadyDomainManagerError, @@ -7,23 +8,24 @@ from registrar.utility.errors import ( OutsideOrgMemberError, ) from registrar.utility.waffle import flag_is_active_for_user -from registrar.utility.email import send_templated_email +from registrar.utility.email import EmailSendingError, send_templated_email import logging logger = logging.getLogger(__name__) -def send_domain_invitation_email(email: str, requestor, domain, is_member_of_different_org): +def send_domain_invitation_email( + email: str, requestor, domains: Domain | list[Domain], is_member_of_different_org, requested_user=None +): """ Sends a domain invitation email to the specified address. - Raises exceptions for validation or email-sending issues. - Args: email (str): Email address of the recipient. requestor (User): The user initiating the invitation. - domain (Domain): The domain object for which the invitation is being sent. + domains (Domain or list of Domain): The domain objects for which the invitation is being sent. is_member_of_different_org (bool): if an email belongs to a different org + requested_user (User | None): The recipient if the email belongs to a user in the registrar Raises: MissingEmailError: If the requestor has no email associated with their account. @@ -32,26 +34,54 @@ def send_domain_invitation_email(email: str, requestor, domain, is_member_of_dif OutsideOrgMemberError: If the requested_user is part of a different organization. EmailSendingError: If there is an error while sending the email. """ - # Default email address for staff - requestor_email = settings.DEFAULT_FROM_EMAIL + domains = normalize_domains(domains) + requestor_email = get_requestor_email(requestor, domains) - # Check if the requestor is staff and has an email - if not requestor.is_staff: - if not requestor.email or requestor.email.strip() == "": - raise MissingEmailError - else: - requestor_email = requestor.email + validate_invitation(email, domains, requestor, is_member_of_different_org) - # Check if the recipient is part of a different organization - # COMMENT: this does not account for multiple_portfolios flag being active + send_invitation_email(email, requestor_email, domains, requested_user) + + +def normalize_domains(domains): + """Ensures domains is always a list.""" + return [domains] if isinstance(domains, Domain) else domains + + +def get_requestor_email(requestor, domains): + """Get the requestor's email or raise an error if it's missing. + + If the requestor is staff, default email is returned. + """ + if requestor.is_staff: + return settings.DEFAULT_FROM_EMAIL + + if not requestor.email or requestor.email.strip() == "": + domain_names = ", ".join([domain.name for domain in domains]) + raise MissingEmailError(email=requestor.email, domain=domain_names) + + return requestor.email + + +def validate_invitation(email, domains, requestor, is_member_of_different_org): + """Validate the invitation conditions.""" + check_outside_org_membership(email, requestor, is_member_of_different_org) + + for domain in domains: + validate_existing_invitation(email, domain) + + +def check_outside_org_membership(email, requestor, is_member_of_different_org): + """Raise an error if the email belongs to a different organization.""" if ( flag_is_active_for_user(requestor, "organization_feature") and not flag_is_active_for_user(requestor, "multiple_portfolios") and is_member_of_different_org ): - raise OutsideOrgMemberError + raise OutsideOrgMemberError(email=email) - # Check for an existing invitation + +def validate_existing_invitation(email, domain): + """Check for existing invitations and handle their status.""" try: invite = DomainInvitation.objects.get(email=email, domain=domain) if invite.status == DomainInvitation.DomainInvitationStatus.RETRIEVED: @@ -64,16 +94,24 @@ def send_domain_invitation_email(email: str, requestor, domain, is_member_of_dif except DomainInvitation.DoesNotExist: pass - # Send the email - send_templated_email( - "emails/domain_invitation.txt", - "emails/domain_invitation_subject.txt", - to_address=email, - context={ - "domain": domain, - "requestor_email": requestor_email, - }, - ) + +def send_invitation_email(email, requestor_email, domains, requested_user): + """Send the invitation email.""" + try: + send_templated_email( + "emails/domain_invitation.txt", + "emails/domain_invitation_subject.txt", + to_address=email, + context={ + "domains": domains, + "requestor_email": requestor_email, + "invitee_email_address": email, + "requested_user": requested_user, + }, + ) + except EmailSendingError as err: + domain_names = ", ".join([domain.name for domain in domains]) + raise EmailSendingError(f"Could not send email invitation to {email} for domains: {domain_names}") from err def send_portfolio_invitation_email(email: str, requestor, portfolio): @@ -98,17 +136,22 @@ def send_portfolio_invitation_email(email: str, requestor, portfolio): # Check if the requestor is staff and has an email if not requestor.is_staff: if not requestor.email or requestor.email.strip() == "": - raise MissingEmailError + raise MissingEmailError(email=email, portfolio=portfolio) else: requestor_email = requestor.email - send_templated_email( - "emails/portfolio_invitation.txt", - "emails/portfolio_invitation_subject.txt", - to_address=email, - context={ - "portfolio": portfolio, - "requestor_email": requestor_email, - "email": email, - }, - ) + try: + send_templated_email( + "emails/portfolio_invitation.txt", + "emails/portfolio_invitation_subject.txt", + to_address=email, + context={ + "portfolio": portfolio, + "requestor_email": requestor_email, + "email": email, + }, + ) + except EmailSendingError as err: + raise EmailSendingError( + f"Could not sent email invitation to {email} for portfolio {portfolio}. Portfolio invitation not saved." + ) from err diff --git a/src/registrar/utility/errors.py b/src/registrar/utility/errors.py index 039fb3696..cc18d7269 100644 --- a/src/registrar/utility/errors.py +++ b/src/registrar/utility/errors.py @@ -46,8 +46,17 @@ class AlreadyDomainInvitedError(InvitationError): class MissingEmailError(InvitationError): """Raised when the requestor has no email associated with their account.""" - def __init__(self): - super().__init__("Can't send invitation email. No email is associated with your user account.") + def __init__(self, email=None, domain=None, portfolio=None): + # Default message if no additional info is provided + message = "Can't send invitation email. No email is associated with your user account." + + # Customize message based on provided arguments + if email and domain: + message = f"Can't send email to '{email}' on domain '{domain}'. No email exists for the requestor." + elif email and portfolio: + message = f"Can't send email to '{email}' for portfolio '{portfolio}'. No email exists for the requestor." + + super().__init__(message) class OutsideOrgMemberError(ValueError): diff --git a/src/registrar/views/domain.py b/src/registrar/views/domain.py index 4b2edba06..f82d7005d 100644 --- a/src/registrar/views/domain.py +++ b/src/registrar/views/domain.py @@ -10,7 +10,6 @@ import logging import requests from django.contrib import messages from django.contrib.messages.views import SuccessMessageMixin -from django.db import IntegrityError from django.http import HttpResponseRedirect from django.shortcuts import redirect, render, get_object_or_404 from django.urls import reverse @@ -31,22 +30,23 @@ from registrar.models.user_portfolio_permission import UserPortfolioPermission from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices from registrar.utility.enums import DefaultEmail from registrar.utility.errors import ( - AlreadyDomainInvitedError, - AlreadyDomainManagerError, GenericError, GenericErrorCodes, - MissingEmailError, NameserverError, NameserverErrorCodes as nsErrorCodes, DsDataError, DsDataErrorCodes, SecurityEmailError, SecurityEmailErrorCodes, - OutsideOrgMemberError, ) from registrar.models.utility.contact_error import ContactError from registrar.views.utility.permission_views import UserDomainRolePermissionDeleteView from registrar.utility.waffle import flag_is_active_for_user +from registrar.views.utility.invitation_helper import ( + get_org_membership, + get_requested_user, + handle_invitation_exceptions, +) from ..forms import ( SeniorOfficialContactForm, @@ -1190,43 +1190,13 @@ class DomainAddUserView(DomainFormBaseView): def get_success_url(self): return reverse("domain-users", kwargs={"pk": self.object.pk}) - def _get_org_membership(self, requestor_org, requested_email, requested_user): - """ - Verifies if an email belongs to a different organization as a member or invited member. - Verifies if an email belongs to this organization as a member or invited member. - User does not belong to any org can be deduced from the tuple returned. - - Returns a tuple (member_of_a_different_org, member_of_this_org). - """ - - # COMMENT: this code does not take into account multiple portfolios flag - - # COMMENT: shouldn't this code be based on the organization of the domain, not the org - # of the requestor? requestor could have multiple portfolios - - # Check for existing permissions or invitations for the requested user - existing_org_permission = UserPortfolioPermission.objects.filter(user=requested_user).first() - existing_org_invitation = PortfolioInvitation.objects.filter(email=requested_email).first() - - # Determine membership in a different organization - member_of_a_different_org = ( - existing_org_permission and existing_org_permission.portfolio != requestor_org - ) or (existing_org_invitation and existing_org_invitation.portfolio != requestor_org) - - # Determine membership in the same organization - member_of_this_org = (existing_org_permission and existing_org_permission.portfolio == requestor_org) or ( - existing_org_invitation and existing_org_invitation.portfolio == requestor_org - ) - - return member_of_a_different_org, member_of_this_org - def form_valid(self, form): """Add the specified user to this domain.""" requested_email = form.cleaned_data["email"] requestor = self.request.user # Look up a user with that email - requested_user = self._get_requested_user(requested_email) + requested_user = get_requested_user(requested_email) # NOTE: This does not account for multiple portfolios flag being set to True domain_org = self.object.domain_info.portfolio @@ -1237,55 +1207,47 @@ class DomainAddUserView(DomainFormBaseView): or requestor.is_staff ) - member_of_a_different_org, member_of_this_org = self._get_org_membership( - domain_org, requested_email, requested_user - ) - - # determine portfolio of the domain (code currently is looking at requestor's portfolio) - # if requested_email/user is not member or invited member of this portfolio - # COMMENT: this code does not take into account multiple portfolios flag - # send portfolio invitation email - # create portfolio invitation - # create message to view - if ( - flag_is_active_for_user(requestor, "organization_feature") - and not flag_is_active_for_user(requestor, "multiple_portfolios") - and domain_org is not None - and requestor_can_update_portfolio - and not member_of_this_org - ): - try: - send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=domain_org) - PortfolioInvitation.objects.get_or_create(email=requested_email, portfolio=domain_org) - messages.success(self.request, f"{requested_email} has been invited to the organization: {domain_org}") - except Exception as e: - self._handle_portfolio_exceptions(e, requested_email, domain_org) - # If that first invite does not succeed take an early exit - return redirect(self.get_success_url()) - + member_of_a_different_org, member_of_this_org = get_org_membership(domain_org, requested_email, requested_user) try: + # COMMENT: this code does not take into account multiple portfolios flag being set to TRUE + + # determine portfolio of the domain (code currently is looking at requestor's portfolio) + # if requested_email/user is not member or invited member of this portfolio + # send portfolio invitation email + # create portfolio invitation + # create message to view + if ( + flag_is_active_for_user(requestor, "organization_feature") + and not flag_is_active_for_user(requestor, "multiple_portfolios") + and domain_org is not None + and requestor_can_update_portfolio + and not member_of_this_org + ): + send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=domain_org) + portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create( + email=requested_email, portfolio=domain_org, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER] + ) + # if user exists for email, immediately retrieve portfolio invitation upon creation + if requested_user is not None: + portfolio_invitation.retrieve() + portfolio_invitation.save() + messages.success(self.request, f"{requested_email} has been invited to the organization: {domain_org}") + if requested_user is None: self._handle_new_user_invitation(requested_email, requestor, member_of_a_different_org) else: self._handle_existing_user(requested_email, requestor, requested_user, member_of_a_different_org) except Exception as e: - self._handle_exceptions(e, requested_email) + handle_invitation_exceptions(self.request, e, requested_email) return redirect(self.get_success_url()) - def _get_requested_user(self, email): - """Retrieve a user by email or return None if the user doesn't exist.""" - try: - return User.objects.get(email=email) - except User.DoesNotExist: - return None - def _handle_new_user_invitation(self, email, requestor, member_of_different_org): """Handle invitation for a new user who does not exist in the system.""" send_domain_invitation_email( email=email, requestor=requestor, - domain=self.object, + domains=self.object, is_member_of_different_org=member_of_different_org, ) DomainInvitation.objects.get_or_create(email=email, domain=self.object) @@ -1296,8 +1258,9 @@ class DomainAddUserView(DomainFormBaseView): send_domain_invitation_email( email=email, requestor=requestor, - domain=self.object, + domains=self.object, is_member_of_different_org=member_of_different_org, + requested_user=requested_user, ) UserDomainRole.objects.create( user=requested_user, @@ -1306,57 +1269,6 @@ class DomainAddUserView(DomainFormBaseView): ) messages.success(self.request, f"Added user {email}.") - def _handle_exceptions(self, exception, email): - """Handle exceptions raised during the process.""" - if isinstance(exception, EmailSendingError): - logger.warning( - "Could not send email invitation to %s for domain %s (EmailSendingError)", - email, - self.object, - exc_info=True, - ) - messages.warning(self.request, "Could not send email invitation.") - elif isinstance(exception, OutsideOrgMemberError): - logger.warning( - "Could not send email. Can not invite member of a .gov organization to a different organization.", - self.object, - exc_info=True, - ) - messages.error( - self.request, - f"{email} is already a member of another .gov organization.", - ) - elif isinstance(exception, AlreadyDomainManagerError): - messages.warning(self.request, str(exception)) - elif isinstance(exception, AlreadyDomainInvitedError): - messages.warning(self.request, str(exception)) - elif isinstance(exception, MissingEmailError): - messages.error(self.request, str(exception)) - logger.error( - f"Can't send email to '{email}' on domain '{self.object}'. No email exists for the requestor.", - exc_info=True, - ) - elif isinstance(exception, IntegrityError): - messages.warning(self.request, f"{email} is already a manager for this domain") - else: - logger.warning("Could not send email invitation (Other Exception)", exc_info=True) - messages.warning(self.request, "Could not send email invitation.") - - def _handle_portfolio_exceptions(self, exception, email, portfolio): - """Handle exceptions raised during the process.""" - if isinstance(exception, EmailSendingError): - logger.warning("Could not send email invitation (EmailSendingError)", exc_info=True) - messages.warning(self.request, "Could not send email invitation.") - elif isinstance(exception, MissingEmailError): - messages.error(self.request, str(exception)) - logger.error( - f"Can't send email to '{email}' for portfolio '{portfolio}'. No email exists for the requestor.", - exc_info=True, - ) - else: - logger.warning("Could not send email invitation (Other Exception)", exc_info=True) - messages.warning(self.request, "Could not send email invitation.") - class DomainInvitationCancelView(SuccessMessageMixin, DomainInvitationPermissionCancelView): object: DomainInvitation diff --git a/src/registrar/views/portfolios.py b/src/registrar/views/portfolios.py index 751e28d85..c4f60ca35 100644 --- a/src/registrar/views/portfolios.py +++ b/src/registrar/views/portfolios.py @@ -8,13 +8,14 @@ from django.utils.safestring import mark_safe from django.contrib import messages from registrar.forms import portfolio as portfolioForms from registrar.models import Portfolio, User +from registrar.models.domain import Domain from registrar.models.domain_invitation import DomainInvitation from registrar.models.portfolio_invitation import PortfolioInvitation from registrar.models.user_domain_role import UserDomainRole from registrar.models.user_portfolio_permission import UserPortfolioPermission from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices from registrar.utility.email import EmailSendingError -from registrar.utility.email_invitations import send_portfolio_invitation_email +from registrar.utility.email_invitations import send_domain_invitation_email, send_portfolio_invitation_email from registrar.utility.errors import MissingEmailError from registrar.utility.enums import DefaultUserValues from registrar.views.utility.mixins import PortfolioMemberPermission @@ -33,6 +34,8 @@ from django.views.generic import View from django.views.generic.edit import FormMixin from django.db import IntegrityError +from registrar.views.utility.invitation_helper import get_org_membership + logger = logging.getLogger(__name__) @@ -237,6 +240,7 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V removed_domains = request.POST.get("removed_domains") portfolio_permission = get_object_or_404(UserPortfolioPermission, pk=pk) member = portfolio_permission.user + portfolio = portfolio_permission.portfolio added_domain_ids = self._parse_domain_ids(added_domains, "added domains") if added_domain_ids is None: @@ -248,7 +252,7 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V if added_domain_ids or removed_domain_ids: try: - self._process_added_domains(added_domain_ids, member) + self._process_added_domains(added_domain_ids, member, request.user, portfolio) self._process_removed_domains(removed_domain_ids, member) messages.success(request, "The domain assignment changes have been saved.") return redirect(reverse("member-domains", kwargs={"pk": pk})) @@ -258,15 +262,15 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V "A database error occurred while saving changes. If the issue persists, " f"please contact {DefaultUserValues.HELP_EMAIL}.", ) - logger.error("A database error occurred while saving changes.") + logger.error("A database error occurred while saving changes.", exc_info=True) return redirect(reverse("member-domains-edit", kwargs={"pk": pk})) except Exception as e: messages.error( request, - "An unexpected error occurred: {str(e)}. If the issue persists, " + f"An unexpected error occurred: {str(e)}. If the issue persists, " f"please contact {DefaultUserValues.HELP_EMAIL}.", ) - logger.error(f"An unexpected error occurred: {str(e)}") + logger.error(f"An unexpected error occurred: {str(e)}", exc_info=True) return redirect(reverse("member-domains-edit", kwargs={"pk": pk})) else: messages.info(request, "No changes detected.") @@ -287,16 +291,26 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V logger.error(f"Invalid data for {domain_type}") return None - def _process_added_domains(self, added_domain_ids, member): + def _process_added_domains(self, added_domain_ids, member, requestor, portfolio): """ Processes added domains by bulk creating UserDomainRole instances. """ if added_domain_ids: + # get added_domains from ids to pass to send email method and bulk create + added_domains = Domain.objects.filter(id__in=added_domain_ids) + member_of_a_different_org, _ = get_org_membership(portfolio, member.email, member) + send_domain_invitation_email( + email=member.email, + requestor=requestor, + domains=added_domains, + is_member_of_different_org=member_of_a_different_org, + requested_user=member, + ) # Bulk create UserDomainRole instances for added domains UserDomainRole.objects.bulk_create( [ - UserDomainRole(domain_id=domain_id, user=member, role=UserDomainRole.Roles.MANAGER) - for domain_id in added_domain_ids + UserDomainRole(domain=domain, user=member, role=UserDomainRole.Roles.MANAGER) + for domain in added_domains ], ignore_conflicts=True, # Avoid duplicate entries ) @@ -443,6 +457,7 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission removed_domains = request.POST.get("removed_domains") portfolio_invitation = get_object_or_404(PortfolioInvitation, pk=pk) email = portfolio_invitation.email + portfolio = portfolio_invitation.portfolio added_domain_ids = self._parse_domain_ids(added_domains, "added domains") if added_domain_ids is None: @@ -454,7 +469,7 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission if added_domain_ids or removed_domain_ids: try: - self._process_added_domains(added_domain_ids, email) + self._process_added_domains(added_domain_ids, email, request.user, portfolio) self._process_removed_domains(removed_domain_ids, email) messages.success(request, "The domain assignment changes have been saved.") return redirect(reverse("invitedmember-domains", kwargs={"pk": pk})) @@ -464,15 +479,15 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission "A database error occurred while saving changes. If the issue persists, " f"please contact {DefaultUserValues.HELP_EMAIL}.", ) - logger.error("A database error occurred while saving changes.") + logger.error("A database error occurred while saving changes.", exc_info=True) return redirect(reverse("invitedmember-domains-edit", kwargs={"pk": pk})) except Exception as e: messages.error( request, - "An unexpected error occurred: {str(e)}. If the issue persists, " + f"An unexpected error occurred: {str(e)}. If the issue persists, " f"please contact {DefaultUserValues.HELP_EMAIL}.", ) - logger.error(f"An unexpected error occurred: {str(e)}.") + logger.error(f"An unexpected error occurred: {str(e)}.", exc_info=True) return redirect(reverse("invitedmember-domains-edit", kwargs={"pk": pk})) else: messages.info(request, "No changes detected.") @@ -493,33 +508,41 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission logger.error(f"Invalid data for {domain_type}.") return None - def _process_added_domains(self, added_domain_ids, email): + def _process_added_domains(self, added_domain_ids, email, requestor, portfolio): """ Processes added domain invitations by updating existing invitations or creating new ones. """ - if not added_domain_ids: - return + if added_domain_ids: + # get added_domains from ids to pass to send email method and bulk create + added_domains = Domain.objects.filter(id__in=added_domain_ids) + member_of_a_different_org, _ = get_org_membership(portfolio, email, None) + send_domain_invitation_email( + email=email, + requestor=requestor, + domains=added_domains, + is_member_of_different_org=member_of_a_different_org, + ) - # Update existing invitations from CANCELED to INVITED - existing_invitations = DomainInvitation.objects.filter(domain_id__in=added_domain_ids, email=email) - existing_invitations.update(status=DomainInvitation.DomainInvitationStatus.INVITED) + # Update existing invitations from CANCELED to INVITED + existing_invitations = DomainInvitation.objects.filter(domain__in=added_domains, email=email) + existing_invitations.update(status=DomainInvitation.DomainInvitationStatus.INVITED) - # Determine which domains need new invitations - existing_domain_ids = existing_invitations.values_list("domain_id", flat=True) - new_domain_ids = set(added_domain_ids) - set(existing_domain_ids) + # Determine which domains need new invitations + existing_domain_ids = existing_invitations.values_list("domain_id", flat=True) + new_domain_ids = set(added_domain_ids) - set(existing_domain_ids) - # Bulk create new invitations - DomainInvitation.objects.bulk_create( - [ - DomainInvitation( - domain_id=domain_id, - email=email, - status=DomainInvitation.DomainInvitationStatus.INVITED, - ) - for domain_id in new_domain_ids - ] - ) + # Bulk create new invitations + DomainInvitation.objects.bulk_create( + [ + DomainInvitation( + domain_id=domain_id, + email=email, + status=DomainInvitation.DomainInvitationStatus.INVITED, + ) + for domain_id in new_domain_ids + ] + ) def _process_removed_domains(self, removed_domain_ids, email): """ @@ -754,7 +777,11 @@ class PortfolioAddMemberView(PortfolioMembersPermissionView, FormMixin): try: if not requested_user or not permission_exists: send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=portfolio) - form.save() + portfolio_invitation = form.save() + # if user exists for email, immediately retrieve portfolio invitation upon creation + if requested_user is not None: + portfolio_invitation.retrieve() + portfolio_invitation.save() messages.success(self.request, f"{requested_email} has been invited.") else: if permission_exists: diff --git a/src/registrar/views/utility/invitation_helper.py b/src/registrar/views/utility/invitation_helper.py new file mode 100644 index 000000000..5c730d0c3 --- /dev/null +++ b/src/registrar/views/utility/invitation_helper.py @@ -0,0 +1,86 @@ +from django.contrib import messages +from django.db import IntegrityError +from registrar.models import PortfolioInvitation, User, UserPortfolioPermission +from registrar.utility.email import EmailSendingError +import logging + +from registrar.utility.errors import ( + AlreadyDomainInvitedError, + AlreadyDomainManagerError, + MissingEmailError, + OutsideOrgMemberError, +) + +logger = logging.getLogger(__name__) + +# These methods are used by multiple views which share similar logic and function +# when creating invitations and sending associated emails. These can be reused in +# any view, and were initially developed for domain.py, portfolios.py and admin.py + + +def get_org_membership(org, email, user): + """ + Determines if an email/user belongs to a different organization or this organization + as either a member or an invited member. + + This function returns a tuple (member_of_a_different_org, member_of_this_org), + which provides: + - member_of_a_different_org: True if the user/email is associated with an organization other than the given org. + - member_of_this_org: True if the user/email is associated with the given org. + + Note: This implementation assumes single portfolio ownership for a user. + If the "multiple portfolios" feature is enabled, this logic may not account for + situations where a user or email belongs to multiple organizations. + """ + + # Check for existing permissions or invitations for the user + existing_org_permission = UserPortfolioPermission.objects.filter(user=user).first() + existing_org_invitation = PortfolioInvitation.objects.filter(email=email).first() + + # Determine membership in a different organization + member_of_a_different_org = (existing_org_permission and existing_org_permission.portfolio != org) or ( + existing_org_invitation and existing_org_invitation.portfolio != org + ) + + # Determine membership in the same organization + member_of_this_org = (existing_org_permission and existing_org_permission.portfolio == org) or ( + existing_org_invitation and existing_org_invitation.portfolio == org + ) + + return member_of_a_different_org, member_of_this_org + + +def get_requested_user(email): + """Retrieve a user by email or return None if the user doesn't exist.""" + try: + return User.objects.get(email=email) + except User.DoesNotExist: + return None + + +def handle_invitation_exceptions(request, exception, email): + """Handle exceptions raised during the process.""" + if isinstance(exception, EmailSendingError): + logger.warning(str(exception), exc_info=True) + messages.error(request, str(exception)) + elif isinstance(exception, MissingEmailError): + messages.error(request, str(exception)) + logger.error(str(exception), exc_info=True) + elif isinstance(exception, OutsideOrgMemberError): + logger.warning( + "Could not send email. Can not invite member of a .gov organization to a different organization.", + exc_info=True, + ) + messages.error( + request, + f"{email} is already a member of another .gov organization.", + ) + elif isinstance(exception, AlreadyDomainManagerError): + messages.warning(request, str(exception)) + elif isinstance(exception, AlreadyDomainInvitedError): + messages.warning(request, str(exception)) + elif isinstance(exception, IntegrityError): + messages.warning(request, f"{email} is already a manager for this domain") + else: + logger.warning("Could not send email invitation (Other Exception)", exc_info=True) + messages.warning(request, "Could not send email invitation.")