Merge branch 'main' of https://github.com/cisagov/manage.get.gov into es/3175-email-updates

This commit is contained in:
Erin Song 2025-01-16 17:39:03 -08:00
commit a7d65e4a80
No known key found for this signature in database
22 changed files with 2686 additions and 596 deletions

View file

@ -918,3 +918,38 @@ Example (only requests): `./manage.py create_federal_portfolio --branch "executi
- Parameters #1-#2: Either `--agency_name` or `--branch` must be specified. Not both.
- Parameters #2-#3, you cannot use `--both` while using these. You must specify either `--parse_requests` or `--parse_domains` seperately. While all of these parameters are optional in that you do not need to specify all of them,
you must specify at least one to run this script.
## Patch suborganizations
This script deletes some duplicate suborganization data that exists in our database (one-time use).
It works in two ways:
1. If the only name difference between two suborg records is extra spaces or a capitalization difference,
then we delete all duplicate records of this type.
2. If the suborg name is one we manually specify to delete via the script.
Before it deletes records, it goes through each DomainInformation and DomainRequest object and updates the reference to "sub_organization" to match the non-duplicative record.
### Running on sandboxes
#### Step 1: Login to CloudFoundry
```cf login -a api.fr.cloud.gov --sso```
#### Step 2: SSH into your environment
```cf ssh getgov-{space}```
Example: `cf ssh getgov-za`
#### Step 3: Create a shell instance
```/tmp/lifecycle/shell```
#### Step 4: Upload your csv to the desired sandbox
[Follow these steps](#use-scp-to-transfer-data-to-sandboxes) to upload the federal_cio csv to a sandbox of your choice.
#### Step 5: Running the script
To create a specific portfolio:
```./manage.py patch_suborganizations```
### Running locally
#### Step 1: Running the script
```docker-compose exec app ./manage.py patch_suborganizations```

View file

@ -14,6 +14,7 @@ from django.db.models import (
from django.db.models.functions import Concat, Coalesce
from django.http import HttpResponseRedirect
from registrar.models.federal_agency import FederalAgency
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.utility.admin_helpers import (
AutocompleteSelectWithPlaceholder,
get_action_needed_reason_default_email,
@ -27,8 +28,12 @@ from django.shortcuts import redirect
from django_fsm import get_available_FIELD_transitions, FSMField
from registrar.models import DomainInformation, Portfolio, UserPortfolioPermission, DomainInvitation
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from registrar.utility.email import EmailSendingError
from registrar.utility.email_invitations import send_portfolio_invitation_email
from registrar.utility.email_invitations import send_domain_invitation_email, send_portfolio_invitation_email
from registrar.views.utility.invitation_helper import (
get_org_membership,
get_requested_user,
handle_invitation_exceptions,
)
from waffle.decorators import flag_is_active
from django.contrib import admin, messages
from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
@ -41,7 +46,7 @@ from waffle.admin import FlagAdmin
from waffle.models import Sample, Switch
from registrar.models import Contact, Domain, DomainRequest, DraftDomain, User, Website, SeniorOfficial
from registrar.utility.constants import BranchChoices
from registrar.utility.errors import FSMDomainRequestError, FSMErrorCodes, MissingEmailError
from registrar.utility.errors import FSMDomainRequestError, FSMErrorCodes
from registrar.utility.waffle import flag_is_active_for_user
from registrar.views.utility.mixins import OrderableFieldsMixin
from django.contrib.admin.views.main import ORDER_VAR
@ -1389,7 +1394,78 @@ class UserDomainRoleAdmin(ListHeaderAdmin, ImportExportModelAdmin):
return super().changeform_view(request, object_id, form_url, extra_context=extra_context)
class DomainInvitationAdmin(ListHeaderAdmin):
class BaseInvitationAdmin(ListHeaderAdmin):
"""Base class for admin classes which will customize save_model and send email invitations
on model adds, and require custom handling of forms and form errors."""
def response_add(self, request, obj, post_url_continue=None):
"""
Override response_add to handle rendering when exceptions are raised during add model.
Normal flow on successful save_model on add is to redirect to changelist_view.
If there are errors, flow is modified to instead render change form.
"""
# store current messages from request so that they are preserved throughout the method
storage = get_messages(request)
# Check if there are any error or warning messages in the `messages` framework
has_errors = any(message.level_tag in ["error", "warning"] for message in storage)
if has_errors:
# Re-render the change form if there are errors or warnings
# Prepare context for rendering the change form
# Get the model form
ModelForm = self.get_form(request, obj=obj)
form = ModelForm(instance=obj)
# Create an AdminForm instance
admin_form = AdminForm(
form,
list(self.get_fieldsets(request, obj)),
self.get_prepopulated_fields(request, obj),
self.get_readonly_fields(request, obj),
model_admin=self,
)
media = self.media + form.media
opts = obj._meta
change_form_context = {
**self.admin_site.each_context(request), # Add admin context
"title": f"Add {opts.verbose_name}",
"opts": opts,
"original": obj,
"save_as": self.save_as,
"has_change_permission": self.has_change_permission(request, obj),
"add": True, # Indicate this is an "Add" form
"change": False, # Indicate this is not a "Change" form
"is_popup": False,
"inline_admin_formsets": [],
"save_on_top": self.save_on_top,
"show_delete": self.has_delete_permission(request, obj),
"obj": obj,
"adminform": admin_form, # Pass the AdminForm instance
"media": media,
"errors": None,
}
return self.render_change_form(
request,
context=change_form_context,
add=True,
change=False,
obj=obj,
)
response = super().response_add(request, obj, post_url_continue)
# Re-add all messages from storage after `super().response_add`
# as super().response_add resets the success messages in request
for message in storage:
messages.add_message(request, message.level, message.message)
return response
class DomainInvitationAdmin(BaseInvitationAdmin):
"""Custom domain invitation admin class."""
class Meta:
@ -1442,14 +1518,60 @@ class DomainInvitationAdmin(ListHeaderAdmin):
which will be successful if a single User exists for that email; otherwise, will
just continue to create the invitation.
"""
if not change and User.objects.filter(email=obj.email).count() == 1:
# Domain Invitation creation for an existing User
obj.retrieve()
# Call the parent save method to save the object
super().save_model(request, obj, form, change)
if not change:
domain = obj.domain
domain_org = getattr(domain.domain_info, "portfolio", None)
requested_email = obj.email
# Look up a user with that email
requested_user = get_requested_user(requested_email)
requestor = request.user
member_of_a_different_org, member_of_this_org = get_org_membership(
domain_org, requested_email, requested_user
)
try:
if (
flag_is_active(request, "organization_feature")
and not flag_is_active(request, "multiple_portfolios")
and domain_org is not None
and not member_of_this_org
and not member_of_a_different_org
):
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=domain_org)
portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create(
email=requested_email,
portfolio=domain_org,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
)
# if user exists for email, immediately retrieve portfolio invitation upon creation
if requested_user is not None:
portfolio_invitation.retrieve()
portfolio_invitation.save()
messages.success(request, f"{requested_email} has been invited to the organization: {domain_org}")
send_domain_invitation_email(
email=requested_email,
requestor=requestor,
domains=domain,
is_member_of_different_org=member_of_a_different_org,
requested_user=requested_user,
)
if requested_user is not None:
# Domain Invitation creation for an existing User
obj.retrieve()
# Call the parent save method to save the object
super().save_model(request, obj, form, change)
messages.success(request, f"{requested_email} has been invited to the domain: {domain}")
except Exception as e:
handle_invitation_exceptions(request, e, requested_email)
return
else:
# Call the parent save method to save the object
super().save_model(request, obj, form, change)
class PortfolioInvitationAdmin(ListHeaderAdmin):
class PortfolioInvitationAdmin(BaseInvitationAdmin):
"""Custom portfolio invitation admin class."""
form = PortfolioInvitationAdminForm
@ -1472,7 +1594,7 @@ class PortfolioInvitationAdmin(ListHeaderAdmin):
# Search
search_fields = [
"email",
"portfolio__name",
"portfolio__organization_name",
]
# Filters
@ -1510,6 +1632,8 @@ class PortfolioInvitationAdmin(ListHeaderAdmin):
portfolio = obj.portfolio
requested_email = obj.email
requestor = request.user
# Look up a user with that email
requested_user = get_requested_user(requested_email)
permission_exists = UserPortfolioPermission.objects.filter(
user__email=requested_email, portfolio=portfolio, user__email__isnull=False
@ -1518,98 +1642,19 @@ class PortfolioInvitationAdmin(ListHeaderAdmin):
if not permission_exists:
# if permission does not exist for a user with requested_email, send email
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=portfolio)
# if user exists for email, immediately retrieve portfolio invitation upon creation
if requested_user is not None:
obj.retrieve()
messages.success(request, f"{requested_email} has been invited.")
else:
messages.warning(request, "User is already a member of this portfolio.")
except Exception as e:
# when exception is raised, handle and do not save the model
self._handle_exceptions(e, request, obj)
handle_invitation_exceptions(request, e, requested_email)
return
# Call the parent save method to save the object
super().save_model(request, obj, form, change)
def _handle_exceptions(self, exception, request, obj):
"""Handle exceptions raised during the process.
Log warnings / errors, and message errors to the user.
"""
if isinstance(exception, EmailSendingError):
logger.warning(
"Could not sent email invitation to %s for portfolio %s (EmailSendingError)",
obj.email,
obj.portfolio,
exc_info=True,
)
messages.error(request, "Could not send email invitation. Portfolio invitation not saved.")
elif isinstance(exception, MissingEmailError):
messages.error(request, str(exception))
logger.error(
f"Can't send email to '{obj.email}' for portfolio '{obj.portfolio}'. "
f"No email exists for the requestor.",
exc_info=True,
)
else:
logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
messages.error(request, "Could not send email invitation. Portfolio invitation not saved.")
def response_add(self, request, obj, post_url_continue=None):
"""
Override response_add to handle rendering when exceptions are raised during add model.
Normal flow on successful save_model on add is to redirect to changelist_view.
If there are errors, flow is modified to instead render change form.
"""
# Check if there are any error or warning messages in the `messages` framework
storage = get_messages(request)
has_errors = any(message.level_tag in ["error", "warning"] for message in storage)
if has_errors:
# Re-render the change form if there are errors or warnings
# Prepare context for rendering the change form
# Get the model form
ModelForm = self.get_form(request, obj=obj)
form = ModelForm(instance=obj)
# Create an AdminForm instance
admin_form = AdminForm(
form,
list(self.get_fieldsets(request, obj)),
self.get_prepopulated_fields(request, obj),
self.get_readonly_fields(request, obj),
model_admin=self,
)
media = self.media + form.media
opts = obj._meta
change_form_context = {
**self.admin_site.each_context(request), # Add admin context
"title": f"Add {opts.verbose_name}",
"opts": opts,
"original": obj,
"save_as": self.save_as,
"has_change_permission": self.has_change_permission(request, obj),
"add": True, # Indicate this is an "Add" form
"change": False, # Indicate this is not a "Change" form
"is_popup": False,
"inline_admin_formsets": [],
"save_on_top": self.save_on_top,
"show_delete": self.has_delete_permission(request, obj),
"obj": obj,
"adminform": admin_form, # Pass the AdminForm instance
"media": media,
"errors": None,
}
return self.render_change_form(
request,
context=change_form_context,
add=True,
change=False,
obj=obj,
)
return super().response_add(request, obj, post_url_continue)
class DomainInformationResource(resources.ModelResource):
"""defines how each field in the referenced model should be mapped to the corresponding fields in the
@ -2782,7 +2827,9 @@ class DomainRequestAdmin(ListHeaderAdmin, ImportExportModelAdmin):
try:
# Retrieve and order audit log entries by timestamp in descending order
audit_log_entries = LogEntry.objects.filter(object_id=object_id).order_by("-timestamp")
audit_log_entries = LogEntry.objects.filter(
object_id=object_id, content_type__model="domainrequest"
).order_by("-timestamp")
# Process each log entry to filter based on the change criteria
for log_entry in audit_log_entries:

View file

@ -66,9 +66,9 @@
text-align: center;
font-size: inherit; //inherit tooltip fontsize of .93rem
max-width: fit-content;
display: block;
@include at-media('desktop') {
width: 70vw;
}
display: block;
}
}

View file

@ -6,6 +6,7 @@ from django.core.management import BaseCommand, CommandError
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from registrar.models import DomainInformation, DomainRequest, FederalAgency, Suborganization, Portfolio, User
from registrar.models.utility.generic_helper import normalize_string
from django.db.models import F, Q
logger = logging.getLogger(__name__)
@ -104,12 +105,17 @@ class Command(BaseCommand):
message = f"Failed to create portfolio '{federal_agency.agency}'"
TerminalHelper.colorful_logger(logger.info, TerminalColors.FAIL, message)
# POST PROCESS STEP: Add additional suborg info where applicable.
updated_suborg_count = self.post_process_all_suborganization_fields(agencies)
message = f"Added city and state_territory information to {updated_suborg_count} suborgs."
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)
TerminalHelper.log_script_run_summary(
self.updated_portfolios,
self.failed_portfolios,
self.skipped_portfolios,
debug=False,
skipped_header="----- SOME PORTFOLIOS WERE SKIPPED -----",
skipped_header="----- SOME PORTFOLIOS WERENT CREATED -----",
display_as_str=True,
)
@ -169,14 +175,11 @@ class Command(BaseCommand):
def handle_populate_portfolio(self, federal_agency, parse_domains, parse_requests, both):
"""Attempts to create a portfolio. If successful, this function will
also create new suborganizations.
Returns the portfolio for the given federal_agency.
"""
portfolio, created = self.create_portfolio(federal_agency)
if created:
self.create_suborganizations(portfolio, federal_agency)
if parse_domains or both:
self.handle_portfolio_domains(portfolio, federal_agency)
also create new suborganizations"""
portfolio, _ = self.create_portfolio(federal_agency)
self.create_suborganizations(portfolio, federal_agency)
if parse_domains or both:
self.handle_portfolio_domains(portfolio, federal_agency)
if parse_requests or both:
self.handle_portfolio_requests(portfolio, federal_agency)
@ -233,7 +236,6 @@ class Command(BaseCommand):
federal_agency=federal_agency, organization_name__isnull=False
)
org_names = set(valid_agencies.values_list("organization_name", flat=True))
if not org_names:
message = (
"Could not add any suborganizations."
@ -352,3 +354,141 @@ class Command(BaseCommand):
DomainInformation.objects.bulk_update(domain_infos, ["portfolio", "sub_organization"])
message = f"Added portfolio '{portfolio}' to {len(domain_infos)} domains."
TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message)
def post_process_all_suborganization_fields(self, agencies):
"""Batch updates suborganization locations from domain and request data.
Args:
agencies: List of FederalAgency objects to process
Returns:
int: Number of suborganizations updated
Priority for location data:
1. Domain information
2. Domain request suborganization fields
3. Domain request standard fields
"""
# Common filter between domaininformation / domain request.
# Filter by only the agencies we've updated thus far.
# Then, only process records without null portfolio, org name, or suborg name.
base_filter = Q(
federal_agency__in=agencies,
portfolio__isnull=False,
organization_name__isnull=False,
sub_organization__isnull=False,
) & ~Q(organization_name__iexact=F("portfolio__organization_name"))
# First: Remove null city / state_territory values on domain info / domain requests.
# We want to add city data if there is data to add to begin with!
domains = DomainInformation.objects.filter(
base_filter,
Q(city__isnull=False, state_territory__isnull=False),
)
requests = DomainRequest.objects.filter(
base_filter,
(
Q(city__isnull=False, state_territory__isnull=False)
| Q(suborganization_city__isnull=False, suborganization_state_territory__isnull=False)
),
)
# Second: Group domains and requests by normalized organization name.
# This means that later down the line we have to account for "duplicate" org names.
domains_dict = {}
requests_dict = {}
for domain in domains:
normalized_name = normalize_string(domain.organization_name)
domains_dict.setdefault(normalized_name, []).append(domain)
for request in requests:
normalized_name = normalize_string(request.organization_name)
requests_dict.setdefault(normalized_name, []).append(request)
# Third: Get suborganizations to update
suborgs_to_edit = Suborganization.objects.filter(
Q(id__in=domains.values_list("sub_organization", flat=True))
| Q(id__in=requests.values_list("sub_organization", flat=True))
)
# Fourth: Process each suborg to add city / state territory info
for suborg in suborgs_to_edit:
self.post_process_suborganization_fields(suborg, domains_dict, requests_dict)
# Fifth: Perform a bulk update
return Suborganization.objects.bulk_update(suborgs_to_edit, ["city", "state_territory"])
def post_process_suborganization_fields(self, suborg, domains_dict, requests_dict):
"""Updates a single suborganization's location data if valid.
Args:
suborg: Suborganization to update
domains_dict: Dict of domain info records grouped by org name
requests_dict: Dict of domain requests grouped by org name
Priority matches parent method. Updates are skipped if location data conflicts
between multiple records of the same type.
"""
normalized_suborg_name = normalize_string(suborg.name)
domains = domains_dict.get(normalized_suborg_name, [])
requests = requests_dict.get(normalized_suborg_name, [])
# Try to get matching domain info
domain = None
if domains:
reference = domains[0]
use_location_for_domain = all(
d.city == reference.city and d.state_territory == reference.state_territory for d in domains
)
if use_location_for_domain:
domain = reference
# Try to get matching request info
# Uses consensus: if all city / state_territory info matches, then we can assume the data is "good".
# If not, take the safe route and just skip updating this particular record.
request = None
use_suborg_location_for_request = True
use_location_for_request = True
if requests:
reference = requests[0]
use_suborg_location_for_request = all(
r.suborganization_city
and r.suborganization_state_territory
and r.suborganization_city == reference.suborganization_city
and r.suborganization_state_territory == reference.suborganization_state_territory
for r in requests
)
use_location_for_request = all(
r.city
and r.state_territory
and r.city == reference.city
and r.state_territory == reference.state_territory
for r in requests
)
if use_suborg_location_for_request or use_location_for_request:
request = reference
if not domain and not request:
message = f"Skipping adding city / state_territory information to suborg: {suborg}. Bad data."
TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, message)
return
# PRIORITY:
# 1. Domain info
# 2. Domain request requested suborg fields
# 3. Domain request normal fields
if domain:
suborg.city = normalize_string(domain.city, lowercase=False)
suborg.state_territory = domain.state_territory
elif request and use_suborg_location_for_request:
suborg.city = normalize_string(request.suborganization_city, lowercase=False)
suborg.state_territory = request.suborganization_state_territory
elif request and use_location_for_request:
suborg.city = normalize_string(request.city, lowercase=False)
suborg.state_territory = request.state_territory
message = (
f"Added city/state_territory to suborg: {suborg}. "
f"city - {suborg.city}, state - {suborg.state_territory}"
)
TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message)

View file

@ -0,0 +1,133 @@
import logging
from django.core.management import BaseCommand
from registrar.models import Suborganization, DomainRequest, DomainInformation
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from registrar.models.utility.generic_helper import count_capitals, normalize_string
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Clean up duplicate suborganizations that differ only by spaces and capitalization"
def handle(self, **kwargs):
"""Process manual deletions and find/remove duplicates. Shows preview
and updates DomainInformation / DomainRequest sub_organization references before deletion."""
# First: get a preset list of records we want to delete.
# For extra_records_to_prune: the key gets deleted, the value gets kept.
extra_records_to_prune = {
normalize_string("Assistant Secretary for Preparedness and Response Office of the Secretary"): {
"replace_with": "Assistant Secretary for Preparedness and Response, Office of the Secretary"
},
normalize_string("US Geological Survey"): {"replace_with": "U.S. Geological Survey"},
normalize_string("USDA/OC"): {"replace_with": "USDA, Office of Communications"},
normalize_string("GSA, IC, OGP WebPortfolio"): {"replace_with": "GSA, IC, OGP Web Portfolio"},
normalize_string("USDA/ARS/NAL"): {"replace_with": "USDA, ARS, NAL"},
}
# Second: loop through every Suborganization and return a dict of what to keep, and what to delete
# for each duplicate or "incorrect" record. We do this by pruning records with extra spaces or bad caps
# Note that "extra_records_to_prune" is just a manual mapping.
records_to_prune = self.get_records_to_prune(extra_records_to_prune)
if len(records_to_prune) == 0:
TerminalHelper.colorful_logger(logger.error, TerminalColors.FAIL, "No suborganizations to delete.")
return
# Third: Build a preview of the changes
total_records_to_remove = 0
preview_lines = ["The following records will be removed:"]
for data in records_to_prune.values():
keep = data.get("keep")
delete = data.get("delete")
if keep:
preview_lines.append(f"Keeping: '{keep.name}' (id: {keep.id})")
for duplicate in delete:
preview_lines.append(f"Removing: '{duplicate.name}' (id: {duplicate.id})")
total_records_to_remove += 1
preview_lines.append("")
preview = "\n".join(preview_lines)
# Fourth: Get user confirmation and delete
if TerminalHelper.prompt_for_execution(
system_exit_on_terminate=True,
prompt_message=preview,
prompt_title=f"Remove {total_records_to_remove} suborganizations?",
verify_message="*** WARNING: This will replace the record on DomainInformation and DomainRequest! ***",
):
try:
# Update all references to point to the right suborg before deletion
all_suborgs_to_remove = set()
for record in records_to_prune.values():
best_record = record["keep"]
suborgs_to_remove = {dupe.id for dupe in record["delete"]}
DomainRequest.objects.filter(sub_organization_id__in=suborgs_to_remove).update(
sub_organization=best_record
)
DomainInformation.objects.filter(sub_organization_id__in=suborgs_to_remove).update(
sub_organization=best_record
)
all_suborgs_to_remove.update(suborgs_to_remove)
# Delete the suborgs
delete_count, _ = Suborganization.objects.filter(id__in=all_suborgs_to_remove).delete()
TerminalHelper.colorful_logger(
logger.info, TerminalColors.MAGENTA, f"Successfully deleted {delete_count} suborganizations."
)
except Exception as e:
TerminalHelper.colorful_logger(
logger.error, TerminalColors.FAIL, f"Failed to delete suborganizations: {str(e)}"
)
def get_records_to_prune(self, extra_records_to_prune):
"""Maps all suborgs into a dictionary with a record to keep, and an array of records to delete."""
# First: Group all suborganization names by their "normalized" names (finding duplicates).
# Returns a dict that looks like this:
# {
# "amtrak": [<Suborganization: AMTRAK>, <Suborganization: aMtRaK>, <Suborganization: AMTRAK >],
# "usda/oc": [<Suborganization: USDA/OC>],
# ...etc
# }
#
name_groups = {}
for suborg in Suborganization.objects.all():
normalized_name = normalize_string(suborg.name)
name_groups.setdefault(normalized_name, []).append(suborg)
# Second: find the record we should keep, and the records we should delete
# Returns a dict that looks like this:
# {
# "amtrak": {
# "keep": <Suborganization: AMTRAK>
# "delete": [<Suborganization: aMtRaK>, <Suborganization: AMTRAK >]
# },
# "usda/oc": {
# "keep": <Suborganization: USDA, Office of Communications>,
# "delete": [<Suborganization: USDA/OC>]
# },
# ...etc
# }
records_to_prune = {}
for normalized_name, duplicate_suborgs in name_groups.items():
# Delete data from our preset list
if normalized_name in extra_records_to_prune:
# The 'keep' field expects a Suborganization but we just pass in a string, so this is just a workaround.
# This assumes that there is only one item in the name_group array (see usda/oc example).
# But this should be fine, given our data.
hardcoded_record_name = extra_records_to_prune[normalized_name]["replace_with"]
name_group = name_groups.get(normalize_string(hardcoded_record_name))
keep = name_group[0] if name_group else None
records_to_prune[normalized_name] = {"keep": keep, "delete": duplicate_suborgs}
# Delete duplicates (extra spaces or casing differences)
elif len(duplicate_suborgs) > 1:
# Pick the best record (fewest spaces, most leading capitals)
best_record = max(
duplicate_suborgs,
key=lambda suborg: (-suborg.name.count(" "), count_capitals(suborg.name, leading_only=True)),
)
records_to_prune[normalized_name] = {
"keep": best_record,
"delete": [s for s in duplicate_suborgs if s != best_record],
}
return records_to_prune

View file

@ -401,16 +401,15 @@ class TerminalHelper:
# Allow the user to inspect the command string
# and ask if they wish to proceed
proceed_execution = TerminalHelper.query_yes_no_exit(
f"""{TerminalColors.OKCYAN}
=====================================================
{prompt_title}
=====================================================
{verify_message}
{prompt_message}
{TerminalColors.FAIL}
Proceed? (Y = proceed, N = {action_description_for_selecting_no})
{TerminalColors.ENDC}"""
f"\n{TerminalColors.OKCYAN}"
"====================================================="
f"\n{prompt_title}\n"
"====================================================="
f"\n{verify_message}\n"
f"\n{prompt_message}\n"
f"{TerminalColors.FAIL}"
f"Proceed? (Y = proceed, N = {action_description_for_selecting_no})"
f"{TerminalColors.ENDC}"
)
# If the user decided to proceed return true.
@ -443,13 +442,14 @@ class TerminalHelper:
f.write(file_contents)
@staticmethod
def colorful_logger(log_level, color, message):
def colorful_logger(log_level, color, message, exc_info=True):
"""Adds some color to your log output.
Args:
log_level: str | Logger.method -> Desired log level. ex: logger.info or "INFO"
color: str | TerminalColors -> Output color. ex: TerminalColors.YELLOW or "YELLOW"
message: str -> Message to display.
exc_info: bool -> Whether the log should print exc_info or not
"""
if isinstance(log_level, str) and hasattr(logger, log_level.lower()):
@ -463,4 +463,4 @@ class TerminalHelper:
terminal_color = color
colored_message = f"{terminal_color}{message}{TerminalColors.ENDC}"
log_method(colored_message)
log_method(colored_message, exc_info=exc_info)

View file

@ -353,3 +353,17 @@ def normalize_string(string_to_normalize, lowercase=True):
new_string = " ".join(string_to_normalize.split())
return new_string.lower() if lowercase else new_string
def count_capitals(text: str, leading_only: bool):
"""Counts capital letters in a string.
Args:
text (str): The string to analyze.
leading_only (bool): If False, counts all capital letters.
If True, only counts capitals at the start of words.
Returns:
int: Number of capital letters found.
"""
if leading_only:
return sum(word[0].isupper() for word in text.split() if word)
return sum(c.isupper() for c in text if c)

View file

@ -153,7 +153,9 @@ def validate_user_portfolio_permission(user_portfolio_permission):
"Based on current waffle flag settings, users cannot be assigned to multiple portfolios."
)
existing_invitations = PortfolioInvitation.objects.filter(email=user_portfolio_permission.user.email)
existing_invitations = PortfolioInvitation.objects.exclude(
portfolio=user_portfolio_permission.portfolio
).filter(email=user_portfolio_permission.user.email)
if existing_invitations.exists():
raise ValidationError(
"This user is already assigned to a portfolio invitation. "

View file

@ -1,36 +1,40 @@
{% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #}
Hi.
Hi,{% if requested_user and requested_user.first_name %} {{ requested_user.first_name }}.{% endif %}
{{ requestor_email }} has added you as a manager on {{ domain.name }}.
You can manage this domain on the .gov registrar <https://manage.get.gov>.
{{ requestor_email }} has invited you to manage:
{% for domain in domains %}{{ domain.name }}
{% endfor %}
To manage domain information, visit the .gov registrar <https://manage.get.gov>.
----------------------------------------------------------------
{% if not requested_user %}
YOU NEED A LOGIN.GOV ACCOUNT
Youll need a Login.gov account to manage your .gov domain. Login.gov provides
a simple and secure process for signing in to many government services with one
account.
Youll need a Login.gov account to access the .gov registrar. That account needs to be
associated with the following email address: {{ invitee_email_address }}
If you dont already have one, follow these steps to create your
Login.gov account <https://login.gov/help/get-started/create-your-account/>.
Login.gov provides a simple and secure process for signing in to many government
services with one account. If you dont already have one, follow these steps to create
your Login.gov account <https://login.gov/help/get-started/create-your-account/>.
{% endif %}
DOMAIN MANAGEMENT
As a .gov domain manager, you can add or update information about your domain.
Youll also serve as a contact for your .gov domain. Please keep your contact
information updated.
As a .gov domain manager, you can add or update information like name servers. Youll
also serve as a contact for the domains you manage. Please keep your contact
information updated.
Learn more about domain management <https://get.gov/help/domain-management>.
SOMETHING WRONG?
If youre not affiliated with {{ domain.name }} or think you received this
message in error, reply to this email.
If youre not affiliated with the .gov domains mentioned in this invitation or think you
received this message in error, reply to this email.
THANK YOU
.Gov helps the public identify official, trusted information. Thank you for using a .gov domain.
.Gov helps the public identify official, trusted information. Thank you for using a .gov
domain.
----------------------------------------------------------------
@ -38,5 +42,6 @@ The .gov team
Contact us: <https://get.gov/contact/>
Learn about .gov <https://get.gov>
The .gov registry is a part of the Cybersecurity and Infrastructure Security Agency (CISA) <https://cisa.gov/>
The .gov registry is a part of the Cybersecurity and Infrastructure Security Agency
(CISA) <https://cisa.gov/>
{% endautoescape %}

View file

@ -1 +1 @@
Youve been added to a .gov domain
You've been invited to manage {% if domains|length > 1 %}.gov domains{% else %}{{ domains.0.name }}{% endif %}

View file

@ -40,6 +40,7 @@ from epplibwrapper import (
ErrorCode,
responses,
)
from registrar.models.suborganization import Suborganization
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from registrar.models.user_domain_role import UserDomainRole
@ -911,6 +912,7 @@ class MockDb(TestCase):
DomainInformation.objects.all().delete()
DomainRequest.objects.all().delete()
UserDomainRole.objects.all().delete()
Suborganization.objects.all().delete()
Portfolio.objects.all().delete()
UserPortfolioPermission.objects.all().delete()
User.objects.all().delete()

File diff suppressed because it is too large Load diff

View file

@ -32,7 +32,7 @@ import tablib
from unittest.mock import patch, call, MagicMock, mock_open
from epplibwrapper import commands, common
from .common import MockEppLib, less_console_noise, completed_domain_request, MockSESClient
from .common import MockEppLib, less_console_noise, completed_domain_request, MockSESClient, MockDbForIndividualTests
from api.tests.common import less_console_noise_decorator
@ -1844,3 +1844,318 @@ class TestCreateFederalPortfolio(TestCase):
self.assertEqual(existing_portfolio.organization_name, self.federal_agency.agency)
self.assertEqual(existing_portfolio.notes, "Old notes")
self.assertEqual(existing_portfolio.creator, self.user)
@less_console_noise_decorator
def test_post_process_suborganization_fields(self):
"""Test suborganization field updates from domain and request data.
Also tests the priority order for updating city and state_territory:
1. Domain information fields
2. Domain request suborganization fields
3. Domain request standard fields
"""
# Create test data with different field combinations
self.domain_info.organization_name = "super"
self.domain_info.city = "Domain City "
self.domain_info.state_territory = "NY"
self.domain_info.save()
self.domain_request.organization_name = "super"
self.domain_request.suborganization_city = "Request Suborg City"
self.domain_request.suborganization_state_territory = "CA"
self.domain_request.city = "Request City"
self.domain_request.state_territory = "TX"
self.domain_request.save()
# Create another request/info pair without domain info data
self.domain_info_2.organization_name = "creative"
self.domain_info_2.city = None
self.domain_info_2.state_territory = None
self.domain_info_2.save()
self.domain_request_2.organization_name = "creative"
self.domain_request_2.suborganization_city = "Second Suborg City"
self.domain_request_2.suborganization_state_territory = "WA"
self.domain_request_2.city = "Second City"
self.domain_request_2.state_territory = "OR"
self.domain_request_2.save()
# Create a third request/info pair without suborg data
self.domain_info_3.organization_name = "names"
self.domain_info_3.city = None
self.domain_info_3.state_territory = None
self.domain_info_3.save()
self.domain_request_3.organization_name = "names"
self.domain_request_3.suborganization_city = None
self.domain_request_3.suborganization_state_territory = None
self.domain_request_3.city = "Third City"
self.domain_request_3.state_territory = "FL"
self.domain_request_3.save()
# Test running the script with both, and just with parse_requests
self.run_create_federal_portfolio(agency_name="Test Federal Agency", parse_requests=True, parse_domains=True)
self.run_create_federal_portfolio(
agency_name="Executive Agency 1",
parse_requests=True,
)
self.domain_info.refresh_from_db()
self.domain_request.refresh_from_db()
self.domain_info_2.refresh_from_db()
self.domain_request_2.refresh_from_db()
self.domain_info_3.refresh_from_db()
self.domain_request_3.refresh_from_db()
# Verify suborganizations were created with correct field values
# Should use domain info values
suborg_1 = Suborganization.objects.get(name=self.domain_info.organization_name)
self.assertEqual(suborg_1.city, "Domain City")
self.assertEqual(suborg_1.state_territory, "NY")
# Should use domain request suborg values
suborg_2 = Suborganization.objects.get(name=self.domain_info_2.organization_name)
self.assertEqual(suborg_2.city, "Second Suborg City")
self.assertEqual(suborg_2.state_territory, "WA")
# Should use domain request standard values
suborg_3 = Suborganization.objects.get(name=self.domain_info_3.organization_name)
self.assertEqual(suborg_3.city, "Third City")
self.assertEqual(suborg_3.state_territory, "FL")
@less_console_noise_decorator
def test_post_process_suborganization_fields_duplicate_records(self):
"""Test suborganization field updates when multiple domains/requests exist for the same org.
Tests that:
1. City / state_territory us updated when all location info matches
2. Updates are skipped when locations don't match
3. Priority order is maintained across multiple records:
a. Domain information fields
b. Domain request suborganization fields
c. Domain request standard fields
"""
# Case 1: Multiple records with all fields matching
matching_request_1 = completed_domain_request(
name="matching1.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
organization_name="matching org",
city="Standard City",
state_territory=DomainRequest.StateTerritoryChoices.TEXAS,
suborganization_city="Suborg City",
suborganization_state_territory=DomainRequest.StateTerritoryChoices.CALIFORNIA,
federal_agency=self.federal_agency,
)
matching_request_1.approve()
domain_info_1 = DomainInformation.objects.get(domain_request=matching_request_1)
domain_info_1.city = "Domain Info City"
domain_info_1.state_territory = DomainRequest.StateTerritoryChoices.NEW_YORK
domain_info_1.save()
matching_request_2 = completed_domain_request(
name="matching2.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
organization_name="matching org",
city="Standard City",
state_territory=DomainRequest.StateTerritoryChoices.TEXAS,
suborganization_city="Suborg City",
suborganization_state_territory=DomainRequest.StateTerritoryChoices.CALIFORNIA,
federal_agency=self.federal_agency,
)
matching_request_2.approve()
domain_info_2 = DomainInformation.objects.get(domain_request=matching_request_2)
domain_info_2.city = "Domain Info City"
domain_info_2.state_territory = DomainRequest.StateTerritoryChoices.NEW_YORK
domain_info_2.save()
# Case 2: Multiple records with only request fields (no domain info)
request_only_1 = completed_domain_request(
name="request1.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
organization_name="request org",
city="Standard City",
state_territory=DomainRequest.StateTerritoryChoices.TEXAS,
suborganization_city="Suborg City",
suborganization_state_territory=DomainRequest.StateTerritoryChoices.CALIFORNIA,
federal_agency=self.federal_agency,
)
request_only_1.approve()
domain_info_3 = DomainInformation.objects.get(domain_request=request_only_1)
domain_info_3.city = None
domain_info_3.state_territory = None
domain_info_3.save()
request_only_2 = completed_domain_request(
name="request2.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
organization_name="request org",
city="Standard City",
state_territory=DomainRequest.StateTerritoryChoices.TEXAS,
suborganization_city="Suborg City",
suborganization_state_territory=DomainRequest.StateTerritoryChoices.CALIFORNIA,
federal_agency=self.federal_agency,
)
request_only_2.approve()
domain_info_4 = DomainInformation.objects.get(domain_request=request_only_2)
domain_info_4.city = None
domain_info_4.state_territory = None
domain_info_4.save()
# Case 3: Multiple records with only standard fields (no suborg)
standard_only_1 = completed_domain_request(
name="standard1.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
organization_name="standard org",
city="Standard City",
state_territory=DomainRequest.StateTerritoryChoices.TEXAS,
federal_agency=self.federal_agency,
)
standard_only_1.approve()
domain_info_5 = DomainInformation.objects.get(domain_request=standard_only_1)
domain_info_5.city = None
domain_info_5.state_territory = None
domain_info_5.save()
standard_only_2 = completed_domain_request(
name="standard2.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
organization_name="standard org",
city="Standard City",
state_territory=DomainRequest.StateTerritoryChoices.TEXAS,
federal_agency=self.federal_agency,
)
standard_only_2.approve()
domain_info_6 = DomainInformation.objects.get(domain_request=standard_only_2)
domain_info_6.city = None
domain_info_6.state_territory = None
domain_info_6.save()
# Case 4: Multiple records with mismatched locations
mismatch_request_1 = completed_domain_request(
name="mismatch1.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
organization_name="mismatch org",
city="City One",
state_territory=DomainRequest.StateTerritoryChoices.FLORIDA,
federal_agency=self.federal_agency,
)
mismatch_request_1.approve()
domain_info_5 = DomainInformation.objects.get(domain_request=mismatch_request_1)
domain_info_5.city = "Different City"
domain_info_5.state_territory = DomainRequest.StateTerritoryChoices.ALASKA
domain_info_5.save()
mismatch_request_2 = completed_domain_request(
name="mismatch2.gov",
status=DomainRequest.DomainRequestStatus.IN_REVIEW,
organization_name="mismatch org",
city="City Two",
state_territory=DomainRequest.StateTerritoryChoices.HAWAII,
federal_agency=self.federal_agency,
)
mismatch_request_2.approve()
domain_info_6 = DomainInformation.objects.get(domain_request=mismatch_request_2)
domain_info_6.city = "Another City"
domain_info_6.state_territory = DomainRequest.StateTerritoryChoices.CALIFORNIA
domain_info_6.save()
# Run the portfolio creation script
self.run_create_federal_portfolio(agency_name="Test Federal Agency", parse_requests=True, parse_domains=True)
# Case 1: Should use domain info values (highest priority)
matching_suborg = Suborganization.objects.get(name="matching org")
self.assertEqual(matching_suborg.city, "Domain Info City")
self.assertEqual(matching_suborg.state_territory, DomainRequest.StateTerritoryChoices.NEW_YORK)
# Case 2: Should use suborg values (second priority)
request_suborg = Suborganization.objects.get(name="request org")
self.assertEqual(request_suborg.city, "Suborg City")
self.assertEqual(request_suborg.state_territory, DomainRequest.StateTerritoryChoices.CALIFORNIA)
# Case 3: Should use standard values (lowest priority)
standard_suborg = Suborganization.objects.get(name="standard org")
self.assertEqual(standard_suborg.city, "Standard City")
self.assertEqual(standard_suborg.state_territory, DomainRequest.StateTerritoryChoices.TEXAS)
# Case 4: Should skip update due to mismatched locations
mismatch_suborg = Suborganization.objects.get(name="mismatch org")
self.assertIsNone(mismatch_suborg.city)
self.assertIsNone(mismatch_suborg.state_territory)
class TestPatchSuborganizations(MockDbForIndividualTests):
"""Tests for the patch_suborganizations management command."""
@less_console_noise_decorator
def run_patch_suborganizations(self):
"""Helper method to run the patch_suborganizations command."""
with patch(
"registrar.management.commands.utility.terminal_helper.TerminalHelper.prompt_for_execution",
return_value=True,
):
call_command("patch_suborganizations")
@less_console_noise_decorator
def test_space_and_case_duplicates(self):
"""Test cleaning up duplicates that differ by spaces and case.
Should keep the version with:
1. Fewest spaces
2. Most leading capitals
"""
Suborganization.objects.create(name="Test Organization ", portfolio=self.portfolio_1)
Suborganization.objects.create(name="test organization", portfolio=self.portfolio_1)
Suborganization.objects.create(name="Test Organization", portfolio=self.portfolio_1)
# Create an unrelated record to test that it doesn't get deleted, too
Suborganization.objects.create(name="unrelated org", portfolio=self.portfolio_1)
self.run_patch_suborganizations()
self.assertEqual(Suborganization.objects.count(), 2)
self.assertEqual(Suborganization.objects.filter(name__in=["unrelated org", "Test Organization"]).count(), 2)
@less_console_noise_decorator
def test_hardcoded_record(self):
"""Tests that our hardcoded records update as we expect them to"""
# Create orgs with old and new name formats
old_name = "USDA/OC"
new_name = "USDA, Office of Communications"
Suborganization.objects.create(name=old_name, portfolio=self.portfolio_1)
Suborganization.objects.create(name=new_name, portfolio=self.portfolio_1)
self.run_patch_suborganizations()
# Verify only the new one remains
self.assertEqual(Suborganization.objects.count(), 1)
remaining = Suborganization.objects.first()
self.assertEqual(remaining.name, new_name)
@less_console_noise_decorator
def test_reference_updates(self):
"""Test that references are updated on domain info and domain request before deletion."""
# Create suborganizations
keep_org = Suborganization.objects.create(name="Test Organization", portfolio=self.portfolio_1)
delete_org = Suborganization.objects.create(name="test organization ", portfolio=self.portfolio_1)
unrelated_org = Suborganization.objects.create(name="awesome", portfolio=self.portfolio_1)
# We expect these references to update
self.domain_request_1.sub_organization = delete_org
self.domain_information_1.sub_organization = delete_org
self.domain_request_1.save()
self.domain_information_1.save()
# But not these ones
self.domain_request_2.sub_organization = unrelated_org
self.domain_information_2.sub_organization = unrelated_org
self.domain_request_2.save()
self.domain_information_2.save()
self.run_patch_suborganizations()
self.domain_request_1.refresh_from_db()
self.domain_information_1.refresh_from_db()
self.domain_request_2.refresh_from_db()
self.domain_information_2.refresh_from_db()
self.assertEqual(self.domain_request_1.sub_organization, keep_org)
self.assertEqual(self.domain_information_1.sub_organization, keep_org)
self.assertEqual(self.domain_request_2.sub_organization, unrelated_org)
self.assertEqual(self.domain_information_2.sub_organization, unrelated_org)

View file

@ -28,6 +28,7 @@ from registrar.models.verified_by_staff import VerifiedByStaff # type: ignore
from .common import (
MockSESClient,
completed_domain_request,
create_superuser,
create_test_user,
)
from waffle.testutils import override_flag
@ -155,6 +156,7 @@ class TestPortfolioInvitations(TestCase):
roles=[self.portfolio_role_base, self.portfolio_role_admin],
additional_permissions=[self.portfolio_permission_1, self.portfolio_permission_2],
)
self.superuser = create_superuser()
def tearDown(self):
super().tearDown()
@ -294,10 +296,158 @@ class TestPortfolioInvitations(TestCase):
# Verify
self.assertEquals(self.invitation.get_portfolio_permissions(), perm_list)
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=False)
def test_clean_multiple_portfolios_inactive(self):
"""Tests that users cannot have multiple portfolios or invitations when flag is inactive"""
# Create the first portfolio permission
UserPortfolioPermission.objects.create(
user=self.superuser, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
)
# Test a second portfolio permission object (should fail)
second_portfolio = Portfolio.objects.create(organization_name="Second Portfolio", creator=self.superuser)
second_permission = UserPortfolioPermission(
user=self.superuser, portfolio=second_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
)
with self.assertRaises(ValidationError) as err:
second_permission.clean()
self.assertIn("users cannot be assigned to multiple portfolios", str(err.exception))
# Test that adding a new portfolio invitation also fails
third_portfolio = Portfolio.objects.create(organization_name="Third Portfolio", creator=self.superuser)
invitation = PortfolioInvitation(
email=self.superuser.email, portfolio=third_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
)
with self.assertRaises(ValidationError) as err:
invitation.clean()
self.assertIn("users cannot be assigned to multiple portfolios", str(err.exception))
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=True)
def test_clean_multiple_portfolios_active(self):
"""Tests that users can have multiple portfolios and invitations when flag is active"""
# Create first portfolio permission
UserPortfolioPermission.objects.create(
user=self.superuser, portfolio=self.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
)
# Second portfolio permission should succeed
second_portfolio = Portfolio.objects.create(organization_name="Second Portfolio", creator=self.superuser)
second_permission = UserPortfolioPermission(
user=self.superuser, portfolio=second_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
)
second_permission.clean()
second_permission.save()
# Verify both permissions exist
user_permissions = UserPortfolioPermission.objects.filter(user=self.superuser)
self.assertEqual(user_permissions.count(), 2)
# Portfolio invitation should also succeed
third_portfolio = Portfolio.objects.create(organization_name="Third Portfolio", creator=self.superuser)
invitation = PortfolioInvitation(
email=self.superuser.email, portfolio=third_portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
)
invitation.clean()
invitation.save()
# Verify invitation exists
self.assertTrue(
PortfolioInvitation.objects.filter(
email=self.superuser.email,
portfolio=third_portfolio,
).exists()
)
@less_console_noise_decorator
def test_clean_portfolio_invitation(self):
"""Tests validation of portfolio invitation permissions"""
# Test validation fails when portfolio missing but permissions present
invitation = PortfolioInvitation(email="test@example.com", roles=["organization_admin"], portfolio=None)
with self.assertRaises(ValidationError) as err:
invitation.clean()
self.assertEqual(
str(err.exception),
"When portfolio roles or additional permissions are assigned, portfolio is required.",
)
# Test validation fails when portfolio present but no permissions
invitation = PortfolioInvitation(email="test@example.com", roles=None, portfolio=self.portfolio)
with self.assertRaises(ValidationError) as err:
invitation.clean()
self.assertEqual(
str(err.exception),
"When portfolio is assigned, portfolio roles or additional permissions are required.",
)
# Test validation fails with forbidden permissions
forbidden_member_roles = UserPortfolioPermission.FORBIDDEN_PORTFOLIO_ROLE_PERMISSIONS.get(
UserPortfolioRoleChoices.ORGANIZATION_MEMBER
)
invitation = PortfolioInvitation(
email="test@example.com",
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
additional_permissions=forbidden_member_roles,
portfolio=self.portfolio,
)
with self.assertRaises(ValidationError) as err:
invitation.clean()
self.assertEqual(
str(err.exception),
"These permissions cannot be assigned to Member: "
"<View all domains and domain reports, Create and edit members, View members>",
)
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=False)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_off_and_duplicate_permission(self):
"""MISSING TEST: Test validation of multiple_portfolios flag.
Scenario 1: Flag is inactive, and the user has existing portfolio permissions
NOTE: Refer to the same test under TestUserPortfolioPermission"""
pass
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=False)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_off_and_existing_invitation(self):
"""MISSING TEST: Test validation of multiple_portfolios flag.
Scenario 2: Flag is inactive, and the user has existing portfolio invitation to another portfolio
NOTE: Refer to the same test under TestUserPortfolioPermission"""
pass
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=True)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_on_and_duplicate_permission(self):
"""MISSING TEST: Test validation of multiple_portfolios flag.
Scenario 3: Flag is active, and the user has existing portfolio invitation
NOTE: Refer to the same test under TestUserPortfolioPermission"""
pass
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=True)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_on_and_existing_invitation(self):
"""MISSING TEST: Test validation of multiple_portfolios flag.
Scenario 4: Flag is active, and the user has existing portfolio invitation to another portfolio
NOTE: Refer to the same test under TestUserPortfolioPermission"""
pass
class TestUserPortfolioPermission(TestCase):
@less_console_noise_decorator
def setUp(self):
self.superuser = create_superuser()
self.portfolio = Portfolio.objects.create(organization_name="Test Portfolio", creator=self.superuser)
self.user, _ = User.objects.get_or_create(email="mayor@igorville.gov")
self.user2, _ = User.objects.get_or_create(email="user2@igorville.gov", username="user2")
super().setUp()
@ -311,6 +461,7 @@ class TestUserPortfolioPermission(TestCase):
Portfolio.objects.all().delete()
User.objects.all().delete()
UserDomainRole.objects.all().delete()
PortfolioInvitation.objects.all().delete()
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=True)
@ -427,6 +578,178 @@ class TestUserPortfolioPermission(TestCase):
# Assert
self.assertEqual(portfolio_permission.get_managed_domains_count(), 1)
@less_console_noise_decorator
def test_clean_user_portfolio_permission(self):
"""Tests validation of user portfolio permission"""
# Test validation fails when portfolio missing but permissions are present
permission = UserPortfolioPermission(user=self.superuser, roles=["organization_admin"], portfolio=None)
with self.assertRaises(ValidationError) as err:
permission.clean()
self.assertEqual(
str(err.exception),
"When portfolio roles or additional permissions are assigned, portfolio is required.",
)
# Test validation fails when portfolio present but no permissions are present
permission = UserPortfolioPermission(user=self.superuser, roles=None, portfolio=self.portfolio)
with self.assertRaises(ValidationError) as err:
permission.clean()
self.assertEqual(
str(err.exception),
"When portfolio is assigned, portfolio roles or additional permissions are required.",
)
# Test validation fails with forbidden permissions for single role
forbidden_member_roles = UserPortfolioPermission.FORBIDDEN_PORTFOLIO_ROLE_PERMISSIONS.get(
UserPortfolioRoleChoices.ORGANIZATION_MEMBER
)
permission = UserPortfolioPermission(
user=self.superuser,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
additional_permissions=forbidden_member_roles,
portfolio=self.portfolio,
)
with self.assertRaises(ValidationError) as err:
permission.clean()
self.assertEqual(
str(err.exception),
"These permissions cannot be assigned to Member: "
"<Create and edit members, View all domains and domain reports, View members>",
)
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=False)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_off_and_duplicate_permission(self):
"""Test validation of multiple_portfolios flag.
Scenario 1: Flag is inactive, and the user has existing portfolio permissions"""
# existing permission
UserPortfolioPermission.objects.create(
user=self.superuser,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
portfolio=self.portfolio,
)
permission = UserPortfolioPermission(
user=self.superuser,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
portfolio=self.portfolio,
)
with self.assertRaises(ValidationError) as err:
permission.clean()
self.assertEqual(
str(err.exception.messages[0]),
"This user is already assigned to a portfolio. "
"Based on current waffle flag settings, users cannot be assigned to multiple portfolios.",
)
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=False)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_off_and_existing_invitation(self):
"""Test validation of multiple_portfolios flag.
Scenario 2: Flag is inactive, and the user has existing portfolio invitation to another portfolio"""
portfolio2 = Portfolio.objects.create(creator=self.superuser, organization_name="Joey go away")
PortfolioInvitation.objects.create(
email=self.superuser.email, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], portfolio=portfolio2
)
permission = UserPortfolioPermission(
user=self.superuser,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
portfolio=self.portfolio,
)
with self.assertRaises(ValidationError) as err:
permission.clean()
self.assertEqual(
str(err.exception.messages[0]),
"This user is already assigned to a portfolio invitation. "
"Based on current waffle flag settings, users cannot be assigned to multiple portfolios.",
)
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=True)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_on_and_duplicate_permission(self):
"""Test validation of multiple_portfolios flag.
Scenario 3: Flag is active, and the user has existing portfolio invitation"""
# existing permission
UserPortfolioPermission.objects.create(
user=self.superuser,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
portfolio=self.portfolio,
)
permission = UserPortfolioPermission(
user=self.superuser,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
portfolio=self.portfolio,
)
# Should not raise any exceptions
try:
permission.clean()
except ValidationError:
self.fail("ValidationError was raised unexpectedly when flag is active.")
@less_console_noise_decorator
@override_flag("multiple_portfolios", active=True)
def test_clean_user_portfolio_permission_multiple_portfolios_flag_on_and_existing_invitation(self):
"""Test validation of multiple_portfolios flag.
Scenario 4: Flag is active, and the user has existing portfolio invitation to another portfolio"""
portfolio2 = Portfolio.objects.create(creator=self.superuser, organization_name="Joey go away")
PortfolioInvitation.objects.create(
email=self.superuser.email, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], portfolio=portfolio2
)
permission = UserPortfolioPermission(
user=self.superuser,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
portfolio=self.portfolio,
)
# Should not raise any exceptions
try:
permission.clean()
except ValidationError:
self.fail("ValidationError was raised unexpectedly when flag is active.")
@less_console_noise_decorator
def test_get_forbidden_permissions_with_multiple_roles(self):
"""Tests that forbidden permissions are properly handled when a user has multiple roles"""
# Get forbidden permissions for member role
member_forbidden = UserPortfolioPermission.FORBIDDEN_PORTFOLIO_ROLE_PERMISSIONS.get(
UserPortfolioRoleChoices.ORGANIZATION_MEMBER
)
# Test with both admin and member roles
roles = [UserPortfolioRoleChoices.ORGANIZATION_ADMIN, UserPortfolioRoleChoices.ORGANIZATION_MEMBER]
# These permissions would be forbidden for member alone, but should be allowed
# when combined with admin role
permissions = UserPortfolioPermission.get_forbidden_permissions(
roles=roles, additional_permissions=member_forbidden
)
# Should return empty set since no permissions are commonly forbidden between admin and member
self.assertEqual(permissions, set())
# Verify the same permissions are forbidden when only member role is present
member_only_permissions = UserPortfolioPermission.get_forbidden_permissions(
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER], additional_permissions=member_forbidden
)
# Should return the forbidden permissions for member role
self.assertEqual(member_only_permissions, set(member_forbidden))
class TestUser(TestCase):
"""Test actions that occur on user login,

View file

@ -900,6 +900,7 @@ class MemberExportTest(MockDbForIndividualTests, MockEppLib):
# spaces and leading/trailing whitespace
csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip()
expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip()
self.maxDiff = None
self.assertEqual(csv_content, expected_content)

View file

@ -720,6 +720,8 @@ class TestDomainManagers(TestDomainOverview):
def tearDown(self):
"""Ensure that the user has its original permissions"""
PortfolioInvitation.objects.all().delete()
UserPortfolioPermission.objects.all().delete()
User.objects.exclude(id=self.user.id).delete()
super().tearDown()
@less_console_noise_decorator
@ -807,21 +809,76 @@ class TestDomainManagers(TestDomainOverview):
call_args = mock_send_domain_email.call_args.kwargs
self.assertEqual(call_args["email"], "mayor@igorville.gov")
self.assertEqual(call_args["requestor"], self.user)
self.assertEqual(call_args["domain"], self.domain)
self.assertEqual(call_args["domains"], self.domain)
self.assertIsNone(call_args.get("is_member_of_different_org"))
# Assert that the PortfolioInvitation is created
# Assert that the PortfolioInvitation is created and retrieved
portfolio_invitation = PortfolioInvitation.objects.filter(
email="mayor@igorville.gov", portfolio=self.portfolio
).first()
self.assertIsNotNone(portfolio_invitation, "Portfolio invitation should be created.")
self.assertEqual(portfolio_invitation.email, "mayor@igorville.gov")
self.assertEqual(portfolio_invitation.portfolio, self.portfolio)
self.assertEqual(portfolio_invitation.status, PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED)
# Assert that the UserPortfolioPermission is created
user_portfolio_permission = UserPortfolioPermission.objects.filter(
user=self.user, portfolio=self.portfolio
).first()
self.assertIsNotNone(user_portfolio_permission, "User portfolio permission should be created")
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_page = success_result.follow()
self.assertContains(success_page, "mayor@igorville.gov")
@boto3_mocking.patching
@override_flag("organization_feature", active=True)
@less_console_noise_decorator
@patch("registrar.views.domain.send_portfolio_invitation_email")
@patch("registrar.views.domain.send_domain_invitation_email")
def test_domain_user_add_form_sends_portfolio_invitation_to_new_email(
self, mock_send_domain_email, mock_send_portfolio_email
):
"""Adding an email not associated with a user works and sends portfolio invitation."""
add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
add_page.form["email"] = "notauser@igorville.gov"
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_result = add_page.form.submit()
self.assertEqual(success_result.status_code, 302)
self.assertEqual(
success_result["Location"],
reverse("domain-users", kwargs={"pk": self.domain.id}),
)
# Verify that the invitation emails were sent
mock_send_portfolio_email.assert_called_once_with(
email="notauser@igorville.gov", requestor=self.user, portfolio=self.portfolio
)
mock_send_domain_email.assert_called_once()
call_args = mock_send_domain_email.call_args.kwargs
self.assertEqual(call_args["email"], "notauser@igorville.gov")
self.assertEqual(call_args["requestor"], self.user)
self.assertEqual(call_args["domains"], self.domain)
self.assertIsNone(call_args.get("is_member_of_different_org"))
# Assert that the PortfolioInvitation is created
portfolio_invitation = PortfolioInvitation.objects.filter(
email="notauser@igorville.gov", portfolio=self.portfolio
).first()
self.assertIsNotNone(portfolio_invitation, "Portfolio invitation should be created.")
self.assertEqual(portfolio_invitation.email, "notauser@igorville.gov")
self.assertEqual(portfolio_invitation.portfolio, self.portfolio)
self.assertEqual(portfolio_invitation.status, PortfolioInvitation.PortfolioInvitationStatus.INVITED)
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_page = success_result.follow()
self.assertContains(success_page, "notauser@igorville.gov")
@boto3_mocking.patching
@override_flag("organization_feature", active=True)
@less_console_noise_decorator
@ -857,7 +914,7 @@ class TestDomainManagers(TestDomainOverview):
call_args = mock_send_domain_email.call_args.kwargs
self.assertEqual(call_args["email"], "mayor@igorville.gov")
self.assertEqual(call_args["requestor"], self.user)
self.assertEqual(call_args["domain"], self.domain)
self.assertEqual(call_args["domains"], self.domain)
self.assertIsNone(call_args.get("is_member_of_different_org"))
# Assert that no PortfolioInvitation is created
@ -915,7 +972,7 @@ class TestDomainManagers(TestDomainOverview):
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_page = success_result.follow()
self.assertContains(success_page, "Could not send email invitation.")
self.assertContains(success_page, "Failed to send email.")
@boto3_mocking.patching
@less_console_noise_decorator

View file

@ -2106,25 +2106,75 @@ class TestPortfolioInvitedMemberDomainsView(TestWithUser, WebTest):
self.assertEqual(response.status_code, 404)
class TestPortfolioMemberDomainsEditView(TestPortfolioMemberDomainsView):
class TestPortfolioMemberDomainsEditView(TestWithUser, WebTest):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.url = reverse("member-domains-edit", kwargs={"pk": cls.portfolio_permission.pk})
# Create Portfolio
cls.portfolio = Portfolio.objects.create(creator=cls.user, organization_name="Test Portfolio")
# Create domains for testing
cls.domain1 = Domain.objects.create(name="1.gov")
cls.domain2 = Domain.objects.create(name="2.gov")
cls.domain3 = Domain.objects.create(name="3.gov")
@classmethod
def tearDownClass(cls):
super().tearDownClass()
Portfolio.objects.all().delete()
User.objects.all().delete()
Domain.objects.all().delete()
def setUp(self):
super().setUp()
names = ["1.gov", "2.gov", "3.gov"]
Domain.objects.bulk_create([Domain(name=name) for name in names])
# Create test member
self.user_member = User.objects.create(
username="test_member",
first_name="Second",
last_name="User",
email="second@example.com",
phone="8003112345",
title="Member",
)
# Create test user with no perms
self.user_no_perms = User.objects.create(
username="test_user_no_perms",
first_name="No",
last_name="Permissions",
email="user_no_perms@example.com",
phone="8003112345",
title="No Permissions",
)
# Assign permissions to the user making requests
self.portfolio_permission = UserPortfolioPermission.objects.create(
user=self.user,
portfolio=self.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_MEMBERS,
UserPortfolioPermissionChoices.EDIT_MEMBERS,
],
)
# Assign permissions to test member
self.permission = UserPortfolioPermission.objects.create(
user=self.user_member,
portfolio=self.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_MEMBERS,
UserPortfolioPermissionChoices.EDIT_MEMBERS,
],
)
# Create url to be used in all tests
self.url = reverse("member-domains-edit", kwargs={"pk": self.portfolio_permission.pk})
def tearDown(self):
super().tearDown()
UserDomainRole.objects.all().delete()
Domain.objects.all().delete()
DomainInvitation.objects.all().delete()
UserPortfolioPermission.objects.all().delete()
PortfolioInvitation.objects.all().delete()
Portfolio.objects.exclude(id=self.portfolio.id).delete()
User.objects.exclude(id=self.user.id).delete()
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@ -2180,12 +2230,13 @@ class TestPortfolioMemberDomainsEditView(TestPortfolioMemberDomainsView):
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_post_with_valid_added_domains(self):
@patch("registrar.views.portfolios.send_domain_invitation_email")
def test_post_with_valid_added_domains(self, mock_send_domain_email):
"""Test that domains can be successfully added."""
self.client.force_login(self.user)
data = {
"added_domains": json.dumps([1, 2, 3]), # Mock domain IDs
"added_domains": json.dumps([self.domain1.id, self.domain2.id, self.domain3.id]), # Mock domain IDs
}
response = self.client.post(self.url, data)
@ -2198,31 +2249,43 @@ class TestPortfolioMemberDomainsEditView(TestPortfolioMemberDomainsView):
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), "The domain assignment changes have been saved.")
expected_domains = [self.domain1, self.domain2, self.domain3]
# Verify that the invitation email was sent
mock_send_domain_email.assert_called_once()
call_args = mock_send_domain_email.call_args.kwargs
self.assertEqual(call_args["email"], "info@example.com")
self.assertEqual(call_args["requestor"], self.user)
self.assertEqual(list(call_args["domains"]), list(expected_domains))
self.assertIsNone(call_args.get("is_member_of_different_org"))
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_post_with_valid_removed_domains(self):
@patch("registrar.views.portfolios.send_domain_invitation_email")
def test_post_with_valid_removed_domains(self, mock_send_domain_email):
"""Test that domains can be successfully removed."""
self.client.force_login(self.user)
# Create some UserDomainRole objects
domains = [1, 2, 3]
UserDomainRole.objects.bulk_create([UserDomainRole(domain_id=domain, user=self.user) for domain in domains])
domains = [self.domain1, self.domain2, self.domain3]
UserDomainRole.objects.bulk_create([UserDomainRole(domain=domain, user=self.user) for domain in domains])
data = {
"removed_domains": json.dumps([1, 2]),
"removed_domains": json.dumps([self.domain1.id, self.domain2.id]),
}
response = self.client.post(self.url, data)
# Check that the UserDomainRole objects were deleted
self.assertEqual(UserDomainRole.objects.filter(user=self.user).count(), 1)
self.assertEqual(UserDomainRole.objects.filter(domain_id=3, user=self.user).count(), 1)
self.assertEqual(UserDomainRole.objects.filter(domain=self.domain3, user=self.user).count(), 1)
# Check for a success message and a redirect
self.assertRedirects(response, reverse("member-domains", kwargs={"pk": self.portfolio_permission.pk}))
messages = list(response.wsgi_request._messages)
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), "The domain assignment changes have been saved.")
# assert that send_domain_invitation_email is not called
mock_send_domain_email.assert_not_called()
UserDomainRole.objects.all().delete()
@ -2290,26 +2353,93 @@ class TestPortfolioMemberDomainsEditView(TestPortfolioMemberDomainsView):
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), "No changes detected.")
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_domain_invitation_email")
def test_post_when_send_domain_email_raises_exception(self, mock_send_domain_email):
"""Test attempt to add new domains when an EmailSendingError raised."""
self.client.force_login(self.user)
class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomainsView):
data = {
"added_domains": json.dumps([self.domain1.id, self.domain2.id, self.domain3.id]), # Mock domain IDs
}
mock_send_domain_email.side_effect = EmailSendingError("Failed to send email")
response = self.client.post(self.url, data)
# Check that the UserDomainRole objects were not created
self.assertEqual(UserDomainRole.objects.filter(user=self.user, role=UserDomainRole.Roles.MANAGER).count(), 0)
# Check for an error message and a redirect to edit form
self.assertRedirects(response, reverse("member-domains-edit", kwargs={"pk": self.portfolio_permission.pk}))
messages = list(response.wsgi_request._messages)
self.assertEqual(len(messages), 1)
self.assertEqual(
str(messages[0]),
"An unexpected error occurred: Failed to send email. If the issue persists, please contact help@get.gov.",
)
class TestPortfolioInvitedMemberEditDomainsView(TestWithUser, WebTest):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.url = reverse("invitedmember-domains-edit", kwargs={"pk": cls.invitation.pk})
# Create Portfolio
cls.portfolio = Portfolio.objects.create(creator=cls.user, organization_name="Test Portfolio")
# Create domains for testing
cls.domain1 = Domain.objects.create(name="1.gov")
cls.domain2 = Domain.objects.create(name="2.gov")
cls.domain3 = Domain.objects.create(name="3.gov")
@classmethod
def tearDownClass(cls):
super().tearDownClass()
Portfolio.objects.all().delete()
User.objects.all().delete()
Domain.objects.all().delete()
def setUp(self):
super().setUp()
names = ["1.gov", "2.gov", "3.gov"]
Domain.objects.bulk_create([Domain(name=name) for name in names])
# Add a user with no permissions
self.user_no_perms = User.objects.create(
username="test_user_no_perms",
first_name="No",
last_name="Permissions",
email="user_no_perms@example.com",
phone="8003112345",
title="No Permissions",
)
# Add an invited member who has been invited to manage domains
self.invited_member_email = "invited@example.com"
self.invitation = PortfolioInvitation.objects.create(
email=self.invited_member_email,
portfolio=self.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_MEMBERS,
],
)
# Assign permissions to the user making requests
UserPortfolioPermission.objects.create(
user=self.user,
portfolio=self.portfolio,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_MEMBERS,
UserPortfolioPermissionChoices.EDIT_MEMBERS,
],
)
self.url = reverse("invitedmember-domains-edit", kwargs={"pk": self.invitation.pk})
def tearDown(self):
super().tearDown()
Domain.objects.all().delete()
DomainInvitation.objects.all().delete()
UserPortfolioPermission.objects.all().delete()
PortfolioInvitation.objects.all().delete()
Portfolio.objects.exclude(id=self.portfolio.id).delete()
User.objects.exclude(id=self.user.id).delete()
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@ -2364,12 +2494,13 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_post_with_valid_added_domains(self):
@patch("registrar.views.portfolios.send_domain_invitation_email")
def test_post_with_valid_added_domains(self, mock_send_domain_email):
"""Test adding new domains successfully."""
self.client.force_login(self.user)
data = {
"added_domains": json.dumps([1, 2, 3]), # Mock domain IDs
"added_domains": json.dumps([self.domain1.id, self.domain2.id, self.domain3.id]),
}
response = self.client.post(self.url, data)
@ -2387,10 +2518,20 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), "The domain assignment changes have been saved.")
expected_domains = [self.domain1, self.domain2, self.domain3]
# Verify that the invitation email was sent
mock_send_domain_email.assert_called_once()
call_args = mock_send_domain_email.call_args.kwargs
self.assertEqual(call_args["email"], "invited@example.com")
self.assertEqual(call_args["requestor"], self.user)
self.assertEqual(list(call_args["domains"]), list(expected_domains))
self.assertFalse(call_args.get("is_member_of_different_org"))
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_post_with_existing_and_new_added_domains(self):
@patch("registrar.views.portfolios.send_domain_invitation_email")
def test_post_with_existing_and_new_added_domains(self, _):
"""Test updating existing and adding new invitations."""
self.client.force_login(self.user)
@ -2398,29 +2539,33 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain
DomainInvitation.objects.bulk_create(
[
DomainInvitation(
domain_id=1, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.CANCELED
domain=self.domain1,
email="invited@example.com",
status=DomainInvitation.DomainInvitationStatus.CANCELED,
),
DomainInvitation(
domain_id=2, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
domain=self.domain2,
email="invited@example.com",
status=DomainInvitation.DomainInvitationStatus.INVITED,
),
]
)
data = {
"added_domains": json.dumps([1, 2, 3]),
"added_domains": json.dumps([self.domain1.id, self.domain2.id, self.domain3.id]),
}
response = self.client.post(self.url, data)
# Check that status for domain_id=1 was updated to INVITED
self.assertEqual(
DomainInvitation.objects.get(domain_id=1, email="invited@example.com").status,
DomainInvitation.objects.get(domain=self.domain1, email="invited@example.com").status,
DomainInvitation.DomainInvitationStatus.INVITED,
)
# Check that domain_id=3 was created as INVITED
self.assertTrue(
DomainInvitation.objects.filter(
domain_id=3, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
domain=self.domain3, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
).exists()
)
@ -2430,7 +2575,8 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
def test_post_with_valid_removed_domains(self):
@patch("registrar.views.portfolios.send_domain_invitation_email")
def test_post_with_valid_removed_domains(self, mock_send_domain_email):
"""Test removing domains successfully."""
self.client.force_login(self.user)
@ -2438,33 +2584,39 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain
DomainInvitation.objects.bulk_create(
[
DomainInvitation(
domain_id=1, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
domain=self.domain1,
email="invited@example.com",
status=DomainInvitation.DomainInvitationStatus.INVITED,
),
DomainInvitation(
domain_id=2, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
domain=self.domain2,
email="invited@example.com",
status=DomainInvitation.DomainInvitationStatus.INVITED,
),
]
)
data = {
"removed_domains": json.dumps([1]),
"removed_domains": json.dumps([self.domain1.id]),
}
response = self.client.post(self.url, data)
# Check that the status for domain_id=1 was updated to CANCELED
self.assertEqual(
DomainInvitation.objects.get(domain_id=1, email="invited@example.com").status,
DomainInvitation.objects.get(domain=self.domain1, email="invited@example.com").status,
DomainInvitation.DomainInvitationStatus.CANCELED,
)
# Check that domain_id=2 remains INVITED
self.assertEqual(
DomainInvitation.objects.get(domain_id=2, email="invited@example.com").status,
DomainInvitation.objects.get(domain=self.domain2, email="invited@example.com").status,
DomainInvitation.DomainInvitationStatus.INVITED,
)
# Check for a success message and a redirect
self.assertRedirects(response, reverse("invitedmember-domains", kwargs={"pk": self.invitation.pk}))
# assert that send_domain_invitation_email is not called
mock_send_domain_email.assert_not_called()
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@ -2530,6 +2682,37 @@ class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomain
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), "No changes detected.")
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_domain_invitation_email")
def test_post_when_send_domain_email_raises_exception(self, mock_send_domain_email):
"""Test attempt to add new domains when an EmailSendingError raised."""
self.client.force_login(self.user)
data = {
"added_domains": json.dumps([self.domain1.id, self.domain2.id, self.domain3.id]),
}
mock_send_domain_email.side_effect = EmailSendingError("Failed to send email")
response = self.client.post(self.url, data)
# Check that the DomainInvitation objects were not created
self.assertEqual(
DomainInvitation.objects.filter(
email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED
).count(),
0,
)
# Check for an error message and a redirect to edit form
self.assertRedirects(response, reverse("invitedmember-domains-edit", kwargs={"pk": self.invitation.pk}))
messages = list(response.wsgi_request._messages)
self.assertEqual(len(messages), 1)
self.assertEqual(
str(messages[0]),
"An unexpected error occurred: Failed to send email. If the issue persists, please contact help@get.gov.",
)
class TestRequestingEntity(WebTest):
"""The requesting entity page is a domain request form that only exists
@ -2879,7 +3062,7 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest):
],
)
cls.new_member_email = "davekenn4242@gmail.com"
cls.new_member_email = "newmember@example.com"
AllowedEmail.objects.get_or_create(email=cls.new_member_email)
@ -2933,11 +3116,13 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest):
self.assertEqual(final_response.status_code, 302) # Redirects
# Validate Database Changes
# Validate that portfolio invitation was created but not retrieved
portfolio_invite = PortfolioInvitation.objects.filter(
email=self.new_member_email, portfolio=self.portfolio
).first()
self.assertIsNotNone(portfolio_invite)
self.assertEqual(portfolio_invite.email, self.new_member_email)
self.assertEqual(portfolio_invite.status, PortfolioInvitation.PortfolioInvitationStatus.INVITED)
# Check that an email was sent
self.assertTrue(mock_client.send_email.called)
@ -3228,6 +3413,52 @@ class TestPortfolioInviteNewMemberView(TestWithUser, WebTest):
# assert that send_portfolio_invitation_email is not called
mock_send_email.assert_not_called()
@less_console_noise_decorator
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@patch("registrar.views.portfolios.send_portfolio_invitation_email")
def test_member_invite_for_existing_user_who_is_not_a_member(self, mock_send_email):
"""Tests the member invitation flow for existing user who is not a portfolio member."""
self.client.force_login(self.user)
# Simulate a session to ensure continuity
session_id = self.client.session.session_key
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
new_user = User.objects.create(email="newuser@example.com")
# Simulate submission of member invite for the newly created user
response = self.client.post(
reverse("new-member"),
{
"role": UserPortfolioRoleChoices.ORGANIZATION_MEMBER.value,
"domain_request_permission_member": UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS.value,
"email": "newuser@example.com",
},
)
self.assertEqual(response.status_code, 302)
# Validate Database Changes
# Validate that portfolio invitation was created and retrieved
portfolio_invite = PortfolioInvitation.objects.filter(
email="newuser@example.com", portfolio=self.portfolio
).first()
self.assertIsNotNone(portfolio_invite)
self.assertEqual(portfolio_invite.email, "newuser@example.com")
self.assertEqual(portfolio_invite.status, PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED)
# Validate UserPortfolioPermission
user_portfolio_permission = UserPortfolioPermission.objects.filter(
user=new_user, portfolio=self.portfolio
).first()
self.assertIsNotNone(user_portfolio_permission)
# assert that send_portfolio_invitation_email is called
mock_send_email.assert_called_once()
call_args = mock_send_email.call_args.kwargs
self.assertEqual(call_args["email"], "newuser@example.com")
self.assertEqual(call_args["requestor"], self.user)
self.assertIsNone(call_args.get("is_member_of_different_org"))
class TestEditPortfolioMemberView(WebTest):
"""Tests for the edit member page on portfolios"""

View file

@ -1,5 +1,6 @@
from django.conf import settings
from registrar.models import DomainInvitation
from registrar.models.domain import Domain
from registrar.utility.errors import (
AlreadyDomainInvitedError,
AlreadyDomainManagerError,
@ -7,23 +8,24 @@ from registrar.utility.errors import (
OutsideOrgMemberError,
)
from registrar.utility.waffle import flag_is_active_for_user
from registrar.utility.email import send_templated_email
from registrar.utility.email import EmailSendingError, send_templated_email
import logging
logger = logging.getLogger(__name__)
def send_domain_invitation_email(email: str, requestor, domain, is_member_of_different_org):
def send_domain_invitation_email(
email: str, requestor, domains: Domain | list[Domain], is_member_of_different_org, requested_user=None
):
"""
Sends a domain invitation email to the specified address.
Raises exceptions for validation or email-sending issues.
Args:
email (str): Email address of the recipient.
requestor (User): The user initiating the invitation.
domain (Domain): The domain object for which the invitation is being sent.
domains (Domain or list of Domain): The domain objects for which the invitation is being sent.
is_member_of_different_org (bool): if an email belongs to a different org
requested_user (User | None): The recipient if the email belongs to a user in the registrar
Raises:
MissingEmailError: If the requestor has no email associated with their account.
@ -32,26 +34,54 @@ def send_domain_invitation_email(email: str, requestor, domain, is_member_of_dif
OutsideOrgMemberError: If the requested_user is part of a different organization.
EmailSendingError: If there is an error while sending the email.
"""
# Default email address for staff
requestor_email = settings.DEFAULT_FROM_EMAIL
domains = normalize_domains(domains)
requestor_email = get_requestor_email(requestor, domains)
# Check if the requestor is staff and has an email
if not requestor.is_staff:
if not requestor.email or requestor.email.strip() == "":
raise MissingEmailError
else:
requestor_email = requestor.email
validate_invitation(email, domains, requestor, is_member_of_different_org)
# Check if the recipient is part of a different organization
# COMMENT: this does not account for multiple_portfolios flag being active
send_invitation_email(email, requestor_email, domains, requested_user)
def normalize_domains(domains):
"""Ensures domains is always a list."""
return [domains] if isinstance(domains, Domain) else domains
def get_requestor_email(requestor, domains):
"""Get the requestor's email or raise an error if it's missing.
If the requestor is staff, default email is returned.
"""
if requestor.is_staff:
return settings.DEFAULT_FROM_EMAIL
if not requestor.email or requestor.email.strip() == "":
domain_names = ", ".join([domain.name for domain in domains])
raise MissingEmailError(email=requestor.email, domain=domain_names)
return requestor.email
def validate_invitation(email, domains, requestor, is_member_of_different_org):
"""Validate the invitation conditions."""
check_outside_org_membership(email, requestor, is_member_of_different_org)
for domain in domains:
validate_existing_invitation(email, domain)
def check_outside_org_membership(email, requestor, is_member_of_different_org):
"""Raise an error if the email belongs to a different organization."""
if (
flag_is_active_for_user(requestor, "organization_feature")
and not flag_is_active_for_user(requestor, "multiple_portfolios")
and is_member_of_different_org
):
raise OutsideOrgMemberError
raise OutsideOrgMemberError(email=email)
# Check for an existing invitation
def validate_existing_invitation(email, domain):
"""Check for existing invitations and handle their status."""
try:
invite = DomainInvitation.objects.get(email=email, domain=domain)
if invite.status == DomainInvitation.DomainInvitationStatus.RETRIEVED:
@ -64,16 +94,24 @@ def send_domain_invitation_email(email: str, requestor, domain, is_member_of_dif
except DomainInvitation.DoesNotExist:
pass
# Send the email
send_templated_email(
"emails/domain_invitation.txt",
"emails/domain_invitation_subject.txt",
to_address=email,
context={
"domain": domain,
"requestor_email": requestor_email,
},
)
def send_invitation_email(email, requestor_email, domains, requested_user):
"""Send the invitation email."""
try:
send_templated_email(
"emails/domain_invitation.txt",
"emails/domain_invitation_subject.txt",
to_address=email,
context={
"domains": domains,
"requestor_email": requestor_email,
"invitee_email_address": email,
"requested_user": requested_user,
},
)
except EmailSendingError as err:
domain_names = ", ".join([domain.name for domain in domains])
raise EmailSendingError(f"Could not send email invitation to {email} for domains: {domain_names}") from err
def send_portfolio_invitation_email(email: str, requestor, portfolio):
@ -98,17 +136,22 @@ def send_portfolio_invitation_email(email: str, requestor, portfolio):
# Check if the requestor is staff and has an email
if not requestor.is_staff:
if not requestor.email or requestor.email.strip() == "":
raise MissingEmailError
raise MissingEmailError(email=email, portfolio=portfolio)
else:
requestor_email = requestor.email
send_templated_email(
"emails/portfolio_invitation.txt",
"emails/portfolio_invitation_subject.txt",
to_address=email,
context={
"portfolio": portfolio,
"requestor_email": requestor_email,
"email": email,
},
)
try:
send_templated_email(
"emails/portfolio_invitation.txt",
"emails/portfolio_invitation_subject.txt",
to_address=email,
context={
"portfolio": portfolio,
"requestor_email": requestor_email,
"email": email,
},
)
except EmailSendingError as err:
raise EmailSendingError(
f"Could not sent email invitation to {email} for portfolio {portfolio}. Portfolio invitation not saved."
) from err

View file

@ -46,8 +46,17 @@ class AlreadyDomainInvitedError(InvitationError):
class MissingEmailError(InvitationError):
"""Raised when the requestor has no email associated with their account."""
def __init__(self):
super().__init__("Can't send invitation email. No email is associated with your user account.")
def __init__(self, email=None, domain=None, portfolio=None):
# Default message if no additional info is provided
message = "Can't send invitation email. No email is associated with your user account."
# Customize message based on provided arguments
if email and domain:
message = f"Can't send email to '{email}' on domain '{domain}'. No email exists for the requestor."
elif email and portfolio:
message = f"Can't send email to '{email}' for portfolio '{portfolio}'. No email exists for the requestor."
super().__init__(message)
class OutsideOrgMemberError(ValueError):

View file

@ -10,7 +10,6 @@ import logging
import requests
from django.contrib import messages
from django.contrib.messages.views import SuccessMessageMixin
from django.db import IntegrityError
from django.http import HttpResponseRedirect
from django.shortcuts import redirect, render, get_object_or_404
from django.urls import reverse
@ -31,22 +30,23 @@ from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
from registrar.utility.enums import DefaultEmail
from registrar.utility.errors import (
AlreadyDomainInvitedError,
AlreadyDomainManagerError,
GenericError,
GenericErrorCodes,
MissingEmailError,
NameserverError,
NameserverErrorCodes as nsErrorCodes,
DsDataError,
DsDataErrorCodes,
SecurityEmailError,
SecurityEmailErrorCodes,
OutsideOrgMemberError,
)
from registrar.models.utility.contact_error import ContactError
from registrar.views.utility.permission_views import UserDomainRolePermissionDeleteView
from registrar.utility.waffle import flag_is_active_for_user
from registrar.views.utility.invitation_helper import (
get_org_membership,
get_requested_user,
handle_invitation_exceptions,
)
from ..forms import (
SeniorOfficialContactForm,
@ -1190,43 +1190,13 @@ class DomainAddUserView(DomainFormBaseView):
def get_success_url(self):
return reverse("domain-users", kwargs={"pk": self.object.pk})
def _get_org_membership(self, requestor_org, requested_email, requested_user):
"""
Verifies if an email belongs to a different organization as a member or invited member.
Verifies if an email belongs to this organization as a member or invited member.
User does not belong to any org can be deduced from the tuple returned.
Returns a tuple (member_of_a_different_org, member_of_this_org).
"""
# COMMENT: this code does not take into account multiple portfolios flag
# COMMENT: shouldn't this code be based on the organization of the domain, not the org
# of the requestor? requestor could have multiple portfolios
# Check for existing permissions or invitations for the requested user
existing_org_permission = UserPortfolioPermission.objects.filter(user=requested_user).first()
existing_org_invitation = PortfolioInvitation.objects.filter(email=requested_email).first()
# Determine membership in a different organization
member_of_a_different_org = (
existing_org_permission and existing_org_permission.portfolio != requestor_org
) or (existing_org_invitation and existing_org_invitation.portfolio != requestor_org)
# Determine membership in the same organization
member_of_this_org = (existing_org_permission and existing_org_permission.portfolio == requestor_org) or (
existing_org_invitation and existing_org_invitation.portfolio == requestor_org
)
return member_of_a_different_org, member_of_this_org
def form_valid(self, form):
"""Add the specified user to this domain."""
requested_email = form.cleaned_data["email"]
requestor = self.request.user
# Look up a user with that email
requested_user = self._get_requested_user(requested_email)
requested_user = get_requested_user(requested_email)
# NOTE: This does not account for multiple portfolios flag being set to True
domain_org = self.object.domain_info.portfolio
@ -1237,55 +1207,47 @@ class DomainAddUserView(DomainFormBaseView):
or requestor.is_staff
)
member_of_a_different_org, member_of_this_org = self._get_org_membership(
domain_org, requested_email, requested_user
)
# determine portfolio of the domain (code currently is looking at requestor's portfolio)
# if requested_email/user is not member or invited member of this portfolio
# COMMENT: this code does not take into account multiple portfolios flag
# send portfolio invitation email
# create portfolio invitation
# create message to view
if (
flag_is_active_for_user(requestor, "organization_feature")
and not flag_is_active_for_user(requestor, "multiple_portfolios")
and domain_org is not None
and requestor_can_update_portfolio
and not member_of_this_org
):
try:
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=domain_org)
PortfolioInvitation.objects.get_or_create(email=requested_email, portfolio=domain_org)
messages.success(self.request, f"{requested_email} has been invited to the organization: {domain_org}")
except Exception as e:
self._handle_portfolio_exceptions(e, requested_email, domain_org)
# If that first invite does not succeed take an early exit
return redirect(self.get_success_url())
member_of_a_different_org, member_of_this_org = get_org_membership(domain_org, requested_email, requested_user)
try:
# COMMENT: this code does not take into account multiple portfolios flag being set to TRUE
# determine portfolio of the domain (code currently is looking at requestor's portfolio)
# if requested_email/user is not member or invited member of this portfolio
# send portfolio invitation email
# create portfolio invitation
# create message to view
if (
flag_is_active_for_user(requestor, "organization_feature")
and not flag_is_active_for_user(requestor, "multiple_portfolios")
and domain_org is not None
and requestor_can_update_portfolio
and not member_of_this_org
):
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=domain_org)
portfolio_invitation, _ = PortfolioInvitation.objects.get_or_create(
email=requested_email, portfolio=domain_org, roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER]
)
# if user exists for email, immediately retrieve portfolio invitation upon creation
if requested_user is not None:
portfolio_invitation.retrieve()
portfolio_invitation.save()
messages.success(self.request, f"{requested_email} has been invited to the organization: {domain_org}")
if requested_user is None:
self._handle_new_user_invitation(requested_email, requestor, member_of_a_different_org)
else:
self._handle_existing_user(requested_email, requestor, requested_user, member_of_a_different_org)
except Exception as e:
self._handle_exceptions(e, requested_email)
handle_invitation_exceptions(self.request, e, requested_email)
return redirect(self.get_success_url())
def _get_requested_user(self, email):
"""Retrieve a user by email or return None if the user doesn't exist."""
try:
return User.objects.get(email=email)
except User.DoesNotExist:
return None
def _handle_new_user_invitation(self, email, requestor, member_of_different_org):
"""Handle invitation for a new user who does not exist in the system."""
send_domain_invitation_email(
email=email,
requestor=requestor,
domain=self.object,
domains=self.object,
is_member_of_different_org=member_of_different_org,
)
DomainInvitation.objects.get_or_create(email=email, domain=self.object)
@ -1296,8 +1258,9 @@ class DomainAddUserView(DomainFormBaseView):
send_domain_invitation_email(
email=email,
requestor=requestor,
domain=self.object,
domains=self.object,
is_member_of_different_org=member_of_different_org,
requested_user=requested_user,
)
UserDomainRole.objects.create(
user=requested_user,
@ -1306,57 +1269,6 @@ class DomainAddUserView(DomainFormBaseView):
)
messages.success(self.request, f"Added user {email}.")
def _handle_exceptions(self, exception, email):
"""Handle exceptions raised during the process."""
if isinstance(exception, EmailSendingError):
logger.warning(
"Could not send email invitation to %s for domain %s (EmailSendingError)",
email,
self.object,
exc_info=True,
)
messages.warning(self.request, "Could not send email invitation.")
elif isinstance(exception, OutsideOrgMemberError):
logger.warning(
"Could not send email. Can not invite member of a .gov organization to a different organization.",
self.object,
exc_info=True,
)
messages.error(
self.request,
f"{email} is already a member of another .gov organization.",
)
elif isinstance(exception, AlreadyDomainManagerError):
messages.warning(self.request, str(exception))
elif isinstance(exception, AlreadyDomainInvitedError):
messages.warning(self.request, str(exception))
elif isinstance(exception, MissingEmailError):
messages.error(self.request, str(exception))
logger.error(
f"Can't send email to '{email}' on domain '{self.object}'. No email exists for the requestor.",
exc_info=True,
)
elif isinstance(exception, IntegrityError):
messages.warning(self.request, f"{email} is already a manager for this domain")
else:
logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
messages.warning(self.request, "Could not send email invitation.")
def _handle_portfolio_exceptions(self, exception, email, portfolio):
"""Handle exceptions raised during the process."""
if isinstance(exception, EmailSendingError):
logger.warning("Could not send email invitation (EmailSendingError)", exc_info=True)
messages.warning(self.request, "Could not send email invitation.")
elif isinstance(exception, MissingEmailError):
messages.error(self.request, str(exception))
logger.error(
f"Can't send email to '{email}' for portfolio '{portfolio}'. No email exists for the requestor.",
exc_info=True,
)
else:
logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
messages.warning(self.request, "Could not send email invitation.")
class DomainInvitationCancelView(SuccessMessageMixin, DomainInvitationPermissionCancelView):
object: DomainInvitation

View file

@ -8,13 +8,14 @@ from django.utils.safestring import mark_safe
from django.contrib import messages
from registrar.forms import portfolio as portfolioForms
from registrar.models import Portfolio, User
from registrar.models.domain import Domain
from registrar.models.domain_invitation import DomainInvitation
from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user_domain_role import UserDomainRole
from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from registrar.utility.email import EmailSendingError
from registrar.utility.email_invitations import send_portfolio_invitation_email
from registrar.utility.email_invitations import send_domain_invitation_email, send_portfolio_invitation_email
from registrar.utility.errors import MissingEmailError
from registrar.utility.enums import DefaultUserValues
from registrar.views.utility.mixins import PortfolioMemberPermission
@ -33,6 +34,8 @@ from django.views.generic import View
from django.views.generic.edit import FormMixin
from django.db import IntegrityError
from registrar.views.utility.invitation_helper import get_org_membership
logger = logging.getLogger(__name__)
@ -237,6 +240,7 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V
removed_domains = request.POST.get("removed_domains")
portfolio_permission = get_object_or_404(UserPortfolioPermission, pk=pk)
member = portfolio_permission.user
portfolio = portfolio_permission.portfolio
added_domain_ids = self._parse_domain_ids(added_domains, "added domains")
if added_domain_ids is None:
@ -248,7 +252,7 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V
if added_domain_ids or removed_domain_ids:
try:
self._process_added_domains(added_domain_ids, member)
self._process_added_domains(added_domain_ids, member, request.user, portfolio)
self._process_removed_domains(removed_domain_ids, member)
messages.success(request, "The domain assignment changes have been saved.")
return redirect(reverse("member-domains", kwargs={"pk": pk}))
@ -258,15 +262,15 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V
"A database error occurred while saving changes. If the issue persists, "
f"please contact {DefaultUserValues.HELP_EMAIL}.",
)
logger.error("A database error occurred while saving changes.")
logger.error("A database error occurred while saving changes.", exc_info=True)
return redirect(reverse("member-domains-edit", kwargs={"pk": pk}))
except Exception as e:
messages.error(
request,
"An unexpected error occurred: {str(e)}. If the issue persists, "
f"An unexpected error occurred: {str(e)}. If the issue persists, "
f"please contact {DefaultUserValues.HELP_EMAIL}.",
)
logger.error(f"An unexpected error occurred: {str(e)}")
logger.error(f"An unexpected error occurred: {str(e)}", exc_info=True)
return redirect(reverse("member-domains-edit", kwargs={"pk": pk}))
else:
messages.info(request, "No changes detected.")
@ -287,16 +291,26 @@ class PortfolioMemberDomainsEditView(PortfolioMemberDomainsEditPermissionView, V
logger.error(f"Invalid data for {domain_type}")
return None
def _process_added_domains(self, added_domain_ids, member):
def _process_added_domains(self, added_domain_ids, member, requestor, portfolio):
"""
Processes added domains by bulk creating UserDomainRole instances.
"""
if added_domain_ids:
# get added_domains from ids to pass to send email method and bulk create
added_domains = Domain.objects.filter(id__in=added_domain_ids)
member_of_a_different_org, _ = get_org_membership(portfolio, member.email, member)
send_domain_invitation_email(
email=member.email,
requestor=requestor,
domains=added_domains,
is_member_of_different_org=member_of_a_different_org,
requested_user=member,
)
# Bulk create UserDomainRole instances for added domains
UserDomainRole.objects.bulk_create(
[
UserDomainRole(domain_id=domain_id, user=member, role=UserDomainRole.Roles.MANAGER)
for domain_id in added_domain_ids
UserDomainRole(domain=domain, user=member, role=UserDomainRole.Roles.MANAGER)
for domain in added_domains
],
ignore_conflicts=True, # Avoid duplicate entries
)
@ -443,6 +457,7 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission
removed_domains = request.POST.get("removed_domains")
portfolio_invitation = get_object_or_404(PortfolioInvitation, pk=pk)
email = portfolio_invitation.email
portfolio = portfolio_invitation.portfolio
added_domain_ids = self._parse_domain_ids(added_domains, "added domains")
if added_domain_ids is None:
@ -454,7 +469,7 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission
if added_domain_ids or removed_domain_ids:
try:
self._process_added_domains(added_domain_ids, email)
self._process_added_domains(added_domain_ids, email, request.user, portfolio)
self._process_removed_domains(removed_domain_ids, email)
messages.success(request, "The domain assignment changes have been saved.")
return redirect(reverse("invitedmember-domains", kwargs={"pk": pk}))
@ -464,15 +479,15 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission
"A database error occurred while saving changes. If the issue persists, "
f"please contact {DefaultUserValues.HELP_EMAIL}.",
)
logger.error("A database error occurred while saving changes.")
logger.error("A database error occurred while saving changes.", exc_info=True)
return redirect(reverse("invitedmember-domains-edit", kwargs={"pk": pk}))
except Exception as e:
messages.error(
request,
"An unexpected error occurred: {str(e)}. If the issue persists, "
f"An unexpected error occurred: {str(e)}. If the issue persists, "
f"please contact {DefaultUserValues.HELP_EMAIL}.",
)
logger.error(f"An unexpected error occurred: {str(e)}.")
logger.error(f"An unexpected error occurred: {str(e)}.", exc_info=True)
return redirect(reverse("invitedmember-domains-edit", kwargs={"pk": pk}))
else:
messages.info(request, "No changes detected.")
@ -493,33 +508,41 @@ class PortfolioInvitedMemberDomainsEditView(PortfolioMemberDomainsEditPermission
logger.error(f"Invalid data for {domain_type}.")
return None
def _process_added_domains(self, added_domain_ids, email):
def _process_added_domains(self, added_domain_ids, email, requestor, portfolio):
"""
Processes added domain invitations by updating existing invitations
or creating new ones.
"""
if not added_domain_ids:
return
if added_domain_ids:
# get added_domains from ids to pass to send email method and bulk create
added_domains = Domain.objects.filter(id__in=added_domain_ids)
member_of_a_different_org, _ = get_org_membership(portfolio, email, None)
send_domain_invitation_email(
email=email,
requestor=requestor,
domains=added_domains,
is_member_of_different_org=member_of_a_different_org,
)
# Update existing invitations from CANCELED to INVITED
existing_invitations = DomainInvitation.objects.filter(domain_id__in=added_domain_ids, email=email)
existing_invitations.update(status=DomainInvitation.DomainInvitationStatus.INVITED)
# Update existing invitations from CANCELED to INVITED
existing_invitations = DomainInvitation.objects.filter(domain__in=added_domains, email=email)
existing_invitations.update(status=DomainInvitation.DomainInvitationStatus.INVITED)
# Determine which domains need new invitations
existing_domain_ids = existing_invitations.values_list("domain_id", flat=True)
new_domain_ids = set(added_domain_ids) - set(existing_domain_ids)
# Determine which domains need new invitations
existing_domain_ids = existing_invitations.values_list("domain_id", flat=True)
new_domain_ids = set(added_domain_ids) - set(existing_domain_ids)
# Bulk create new invitations
DomainInvitation.objects.bulk_create(
[
DomainInvitation(
domain_id=domain_id,
email=email,
status=DomainInvitation.DomainInvitationStatus.INVITED,
)
for domain_id in new_domain_ids
]
)
# Bulk create new invitations
DomainInvitation.objects.bulk_create(
[
DomainInvitation(
domain_id=domain_id,
email=email,
status=DomainInvitation.DomainInvitationStatus.INVITED,
)
for domain_id in new_domain_ids
]
)
def _process_removed_domains(self, removed_domain_ids, email):
"""
@ -754,7 +777,11 @@ class PortfolioAddMemberView(PortfolioMembersPermissionView, FormMixin):
try:
if not requested_user or not permission_exists:
send_portfolio_invitation_email(email=requested_email, requestor=requestor, portfolio=portfolio)
form.save()
portfolio_invitation = form.save()
# if user exists for email, immediately retrieve portfolio invitation upon creation
if requested_user is not None:
portfolio_invitation.retrieve()
portfolio_invitation.save()
messages.success(self.request, f"{requested_email} has been invited.")
else:
if permission_exists:

View file

@ -0,0 +1,86 @@
from django.contrib import messages
from django.db import IntegrityError
from registrar.models import PortfolioInvitation, User, UserPortfolioPermission
from registrar.utility.email import EmailSendingError
import logging
from registrar.utility.errors import (
AlreadyDomainInvitedError,
AlreadyDomainManagerError,
MissingEmailError,
OutsideOrgMemberError,
)
logger = logging.getLogger(__name__)
# These methods are used by multiple views which share similar logic and function
# when creating invitations and sending associated emails. These can be reused in
# any view, and were initially developed for domain.py, portfolios.py and admin.py
def get_org_membership(org, email, user):
"""
Determines if an email/user belongs to a different organization or this organization
as either a member or an invited member.
This function returns a tuple (member_of_a_different_org, member_of_this_org),
which provides:
- member_of_a_different_org: True if the user/email is associated with an organization other than the given org.
- member_of_this_org: True if the user/email is associated with the given org.
Note: This implementation assumes single portfolio ownership for a user.
If the "multiple portfolios" feature is enabled, this logic may not account for
situations where a user or email belongs to multiple organizations.
"""
# Check for existing permissions or invitations for the user
existing_org_permission = UserPortfolioPermission.objects.filter(user=user).first()
existing_org_invitation = PortfolioInvitation.objects.filter(email=email).first()
# Determine membership in a different organization
member_of_a_different_org = (existing_org_permission and existing_org_permission.portfolio != org) or (
existing_org_invitation and existing_org_invitation.portfolio != org
)
# Determine membership in the same organization
member_of_this_org = (existing_org_permission and existing_org_permission.portfolio == org) or (
existing_org_invitation and existing_org_invitation.portfolio == org
)
return member_of_a_different_org, member_of_this_org
def get_requested_user(email):
"""Retrieve a user by email or return None if the user doesn't exist."""
try:
return User.objects.get(email=email)
except User.DoesNotExist:
return None
def handle_invitation_exceptions(request, exception, email):
"""Handle exceptions raised during the process."""
if isinstance(exception, EmailSendingError):
logger.warning(str(exception), exc_info=True)
messages.error(request, str(exception))
elif isinstance(exception, MissingEmailError):
messages.error(request, str(exception))
logger.error(str(exception), exc_info=True)
elif isinstance(exception, OutsideOrgMemberError):
logger.warning(
"Could not send email. Can not invite member of a .gov organization to a different organization.",
exc_info=True,
)
messages.error(
request,
f"{email} is already a member of another .gov organization.",
)
elif isinstance(exception, AlreadyDomainManagerError):
messages.warning(request, str(exception))
elif isinstance(exception, AlreadyDomainInvitedError):
messages.warning(request, str(exception))
elif isinstance(exception, IntegrityError):
messages.warning(request, f"{email} is already a manager for this domain")
else:
logger.warning("Could not send email invitation (Other Exception)", exc_info=True)
messages.warning(request, "Could not send email invitation.")