Merge branch 'main' of https://github.com/cisagov/manage.get.gov into es/3175-email-updates

This commit is contained in:
Erin Song 2025-01-23 10:30:48 -08:00
commit 2e7ffa799f
No known key found for this signature in database
28 changed files with 951 additions and 330 deletions

View file

@ -1367,6 +1367,8 @@ class UserDomainRoleAdmin(ListHeaderAdmin, ImportExportModelAdmin):
autocomplete_fields = ["user", "domain"] autocomplete_fields = ["user", "domain"]
change_form_template = "django/admin/user_domain_role_change_form.html"
# Fixes a bug where non-superusers are redirected to the main page # Fixes a bug where non-superusers are redirected to the main page
def delete_view(self, request, object_id, extra_context=None): def delete_view(self, request, object_id, extra_context=None):
"""Custom delete_view implementation that specifies redirect behaviour""" """Custom delete_view implementation that specifies redirect behaviour"""
@ -1500,7 +1502,7 @@ class DomainInvitationAdmin(BaseInvitationAdmin):
autocomplete_fields = ["domain"] autocomplete_fields = ["domain"]
change_form_template = "django/admin/email_clipboard_change_form.html" change_form_template = "django/admin/domain_invitation_change_form.html"
# Select domain invitations to change -> Domain invitations # Select domain invitations to change -> Domain invitations
def changelist_view(self, request, extra_context=None): def changelist_view(self, request, extra_context=None):

View file

@ -29,6 +29,7 @@
* - tooltip dynamic content updated to include nested element (for better sizing control) * - tooltip dynamic content updated to include nested element (for better sizing control)
* - modal exposed to window to be accessible in other js files * - modal exposed to window to be accessible in other js files
* - fixed bug in createHeaderButton which added newlines to header button tooltips * - fixed bug in createHeaderButton which added newlines to header button tooltips
* - modified combobox to handle error class
*/ */
if ("document" in window.self) { if ("document" in window.self) {
@ -1213,6 +1214,11 @@ const enhanceComboBox = _comboBoxEl => {
input.setAttribute("class", INPUT_CLASS); input.setAttribute("class", INPUT_CLASS);
input.setAttribute("type", "text"); input.setAttribute("type", "text");
input.setAttribute("role", "combobox"); input.setAttribute("role", "combobox");
// DOTGOV - handle error class for combobox
// Check if 'usa-input--error' exists in selectEl and add it to input if true
if (selectEl.classList.contains('usa-input--error')) {
input.classList.add('usa-input--error');
}
additionalAttributes.forEach(attr => Object.keys(attr).forEach(key => { additionalAttributes.forEach(attr => Object.keys(attr).forEach(key => {
const value = Sanitizer.escapeHTML`${attr[key]}`; const value = Sanitizer.escapeHTML`${attr[key]}`;
input.setAttribute(key, value); input.setAttribute(key, value);

View file

@ -1,113 +0,0 @@
import { hideElement, showElement } from './helpers.js';
export function loadInitialValuesForComboBoxes() {
var overrideDefaultClearButton = true;
var isTyping = false;
document.addEventListener('DOMContentLoaded', (event) => {
handleAllComboBoxElements();
});
function handleAllComboBoxElements() {
const comboBoxElements = document.querySelectorAll(".usa-combo-box");
comboBoxElements.forEach(comboBox => {
const input = comboBox.querySelector("input");
const select = comboBox.querySelector("select");
if (!input || !select) {
console.warn("No combobox element found");
return;
}
// Set the initial value of the combobox
let initialValue = select.getAttribute("data-default-value");
let clearInputButton = comboBox.querySelector(".usa-combo-box__clear-input");
if (!clearInputButton) {
console.warn("No clear element found");
return;
}
// Override the default clear button behavior such that it no longer clears the input,
// it just resets to the data-initial-value.
// Due to the nature of how uswds works, this is slightly hacky.
// Use a MutationObserver to watch for changes in the dropdown list
const dropdownList = comboBox.querySelector(`#${input.id}--list`);
const observer = new MutationObserver(function(mutations) {
mutations.forEach(function(mutation) {
if (mutation.type === "childList") {
addBlankOption(clearInputButton, dropdownList, initialValue);
}
});
});
// Configure the observer to watch for changes in the dropdown list
const config = { childList: true, subtree: true };
observer.observe(dropdownList, config);
// Input event listener to detect typing
input.addEventListener("input", () => {
isTyping = true;
});
// Blur event listener to reset typing state
input.addEventListener("blur", () => {
isTyping = false;
});
// Hide the reset button when there is nothing to reset.
// Do this once on init, then everytime a change occurs.
updateClearButtonVisibility(select, initialValue, clearInputButton)
select.addEventListener("change", () => {
updateClearButtonVisibility(select, initialValue, clearInputButton)
});
// Change the default input behaviour - have it reset to the data default instead
clearInputButton.addEventListener("click", (e) => {
if (overrideDefaultClearButton && initialValue) {
e.preventDefault();
e.stopPropagation();
input.click();
// Find the dropdown option with the desired value
const dropdownOptions = document.querySelectorAll(".usa-combo-box__list-option");
if (dropdownOptions) {
dropdownOptions.forEach(option => {
if (option.getAttribute("data-value") === initialValue) {
// Simulate a click event on the dropdown option
option.click();
}
});
}
}
});
});
}
function updateClearButtonVisibility(select, initialValue, clearInputButton) {
if (select.value === initialValue) {
hideElement(clearInputButton);
}else {
showElement(clearInputButton)
}
}
function addBlankOption(clearInputButton, dropdownList, initialValue) {
if (dropdownList && !dropdownList.querySelector('[data-value=""]') && !isTyping) {
const blankOption = document.createElement("li");
blankOption.setAttribute("role", "option");
blankOption.setAttribute("data-value", "");
blankOption.classList.add("usa-combo-box__list-option");
if (!initialValue){
blankOption.classList.add("usa-combo-box__list-option--selected")
}
blankOption.textContent = "⎯";
dropdownList.insertBefore(blankOption, dropdownList.firstChild);
blankOption.addEventListener("click", (e) => {
e.preventDefault();
e.stopPropagation();
overrideDefaultClearButton = false;
// Trigger the default clear behavior
clearInputButton.click();
overrideDefaultClearButton = true;
});
}
}
}

View file

@ -3,7 +3,6 @@ import { initDomainValidators } from './domain-validators.js';
import { initFormsetsForms, triggerModalOnDsDataForm, nameserversFormListener } from './formset-forms.js'; import { initFormsetsForms, triggerModalOnDsDataForm, nameserversFormListener } from './formset-forms.js';
import { initializeUrbanizationToggle } from './urbanization.js'; import { initializeUrbanizationToggle } from './urbanization.js';
import { userProfileListener, finishUserSetupListener } from './user-profile.js'; import { userProfileListener, finishUserSetupListener } from './user-profile.js';
import { loadInitialValuesForComboBoxes } from './combobox.js';
import { handleRequestingEntityFieldset } from './requesting-entity.js'; import { handleRequestingEntityFieldset } from './requesting-entity.js';
import { initDomainsTable } from './table-domains.js'; import { initDomainsTable } from './table-domains.js';
import { initDomainRequestsTable } from './table-domain-requests.js'; import { initDomainRequestsTable } from './table-domain-requests.js';
@ -31,8 +30,6 @@ initializeUrbanizationToggle();
userProfileListener(); userProfileListener();
finishUserSetupListener(); finishUserSetupListener();
loadInitialValuesForComboBoxes();
handleRequestingEntityFieldset(); handleRequestingEntityFieldset();
initDomainsTable(); initDomainsTable();

View file

@ -9,15 +9,15 @@ export function handleRequestingEntityFieldset() {
const formPrefix = "portfolio_requesting_entity"; const formPrefix = "portfolio_requesting_entity";
const radioFieldset = document.getElementById(`id_${formPrefix}-requesting_entity_is_suborganization__fieldset`); const radioFieldset = document.getElementById(`id_${formPrefix}-requesting_entity_is_suborganization__fieldset`);
const radios = radioFieldset?.querySelectorAll(`input[name="${formPrefix}-requesting_entity_is_suborganization"]`); const radios = radioFieldset?.querySelectorAll(`input[name="${formPrefix}-requesting_entity_is_suborganization"]`);
const select = document.getElementById(`id_${formPrefix}-sub_organization`); const input = document.getElementById(`id_${formPrefix}-sub_organization`);
const selectParent = select?.parentElement; const inputGrandParent = input?.parentElement?.parentElement;
const select = input?.previousElementSibling;
const suborgContainer = document.getElementById("suborganization-container"); const suborgContainer = document.getElementById("suborganization-container");
const suborgDetailsContainer = document.getElementById("suborganization-container__details"); const suborgDetailsContainer = document.getElementById("suborganization-container__details");
const suborgAddtlInstruction = document.getElementById("suborganization-addtl-instruction"); const suborgAddtlInstruction = document.getElementById("suborganization-addtl-instruction");
const subOrgCreateNewOption = document.getElementById("option-to-add-suborg")?.value;
// Make sure all crucial page elements exist before proceeding. // Make sure all crucial page elements exist before proceeding.
// This more or less ensures that we are on the Requesting Entity page, and not elsewhere. // This more or less ensures that we are on the Requesting Entity page, and not elsewhere.
if (!radios || !select || !selectParent || !suborgContainer || !suborgDetailsContainer) return; if (!radios || !input || !select || !inputGrandParent || !suborgContainer || !suborgDetailsContainer) return;
// requestingSuborganization: This just broadly determines if they're requesting a suborg at all // requestingSuborganization: This just broadly determines if they're requesting a suborg at all
// requestingNewSuborganization: This variable determines if the user is trying to *create* a new suborganization or not. // requestingNewSuborganization: This variable determines if the user is trying to *create* a new suborganization or not.
@ -27,8 +27,8 @@ export function handleRequestingEntityFieldset() {
function toggleSuborganization(radio=null) { function toggleSuborganization(radio=null) {
if (radio != null) requestingSuborganization = radio?.checked && radio.value === "True"; if (radio != null) requestingSuborganization = radio?.checked && radio.value === "True";
requestingSuborganization ? showElement(suborgContainer) : hideElement(suborgContainer); requestingSuborganization ? showElement(suborgContainer) : hideElement(suborgContainer);
if (select.options.length == 2) { // --Select-- and other are the only options if (select.options.length == 1) { // other is the only option
hideElement(selectParent); // Hide the select drop down and indicate requesting new suborg hideElement(inputGrandParent); // Hide the combo box and indicate requesting new suborg
hideElement(suborgAddtlInstruction); // Hide additional instruction related to the list hideElement(suborgAddtlInstruction); // Hide additional instruction related to the list
requestingNewSuborganization.value = "True"; requestingNewSuborganization.value = "True";
} else { } else {
@ -37,11 +37,6 @@ export function handleRequestingEntityFieldset() {
requestingNewSuborganization.value === "True" ? showElement(suborgDetailsContainer) : hideElement(suborgDetailsContainer); requestingNewSuborganization.value === "True" ? showElement(suborgDetailsContainer) : hideElement(suborgDetailsContainer);
} }
// Add fake "other" option to sub_organization select
if (select && !Array.from(select.options).some(option => option.value === "other")) {
select.add(new Option(subOrgCreateNewOption, "other"));
}
if (requestingNewSuborganization.value === "True") { if (requestingNewSuborganization.value === "True") {
select.value = "other"; select.value = "other";
} }

View file

@ -4,6 +4,7 @@ import logging
from django import forms from django import forms
from django.core.validators import MinValueValidator, MaxValueValidator, RegexValidator, MaxLengthValidator from django.core.validators import MinValueValidator, MaxValueValidator, RegexValidator, MaxLengthValidator
from django.forms import formset_factory from django.forms import formset_factory
from registrar.forms.utility.combobox import ComboboxWidget
from registrar.models import DomainRequest, FederalAgency from registrar.models import DomainRequest, FederalAgency
from phonenumber_field.widgets import RegionalPhoneNumberWidget from phonenumber_field.widgets import RegionalPhoneNumberWidget
from registrar.models.suborganization import Suborganization from registrar.models.suborganization import Suborganization
@ -161,9 +162,10 @@ class DomainSuborganizationForm(forms.ModelForm):
"""Form for updating the suborganization""" """Form for updating the suborganization"""
sub_organization = forms.ModelChoiceField( sub_organization = forms.ModelChoiceField(
label="Suborganization name",
queryset=Suborganization.objects.none(), queryset=Suborganization.objects.none(),
required=False, required=False,
widget=forms.Select(), widget=ComboboxWidget,
) )
class Meta: class Meta:
@ -178,20 +180,6 @@ class DomainSuborganizationForm(forms.ModelForm):
portfolio = self.instance.portfolio if self.instance else None portfolio = self.instance.portfolio if self.instance else None
self.fields["sub_organization"].queryset = Suborganization.objects.filter(portfolio=portfolio) self.fields["sub_organization"].queryset = Suborganization.objects.filter(portfolio=portfolio)
# Set initial value
if self.instance and self.instance.sub_organization:
self.fields["sub_organization"].initial = self.instance.sub_organization
# Set custom form label
self.fields["sub_organization"].label = "Suborganization name"
# Use the combobox rather than the regular select widget
self.fields["sub_organization"].widget.template_name = "django/forms/widgets/combobox.html"
# Set data-default-value attribute
if self.instance and self.instance.sub_organization:
self.fields["sub_organization"].widget.attrs["data-default-value"] = self.instance.sub_organization.pk
class BaseNameserverFormset(forms.BaseFormSet): class BaseNameserverFormset(forms.BaseFormSet):
def clean(self): def clean(self):
@ -456,6 +444,13 @@ class DomainSecurityEmailForm(forms.Form):
class DomainOrgNameAddressForm(forms.ModelForm): class DomainOrgNameAddressForm(forms.ModelForm):
"""Form for updating the organization name and mailing address.""" """Form for updating the organization name and mailing address."""
# for federal agencies we also want to know the top-level agency.
federal_agency = forms.ModelChoiceField(
label="Federal agency",
required=False,
queryset=FederalAgency.objects.all(),
widget=ComboboxWidget,
)
zipcode = forms.CharField( zipcode = forms.CharField(
label="Zip code", label="Zip code",
validators=[ validators=[
@ -469,6 +464,16 @@ class DomainOrgNameAddressForm(forms.ModelForm):
}, },
) )
state_territory = forms.ChoiceField(
label="State, territory, or military post",
required=True,
choices=DomainInformation.StateTerritoryChoices.choices,
error_messages={
"required": ("Select the state, territory, or military post where your organization is located.")
},
widget=ComboboxWidget(attrs={"required": True}),
)
class Meta: class Meta:
model = DomainInformation model = DomainInformation
fields = [ fields = [
@ -486,25 +491,12 @@ class DomainOrgNameAddressForm(forms.ModelForm):
"organization_name": {"required": "Enter the name of your organization."}, "organization_name": {"required": "Enter the name of your organization."},
"address_line1": {"required": "Enter the street address of your organization."}, "address_line1": {"required": "Enter the street address of your organization."},
"city": {"required": "Enter the city where your organization is located."}, "city": {"required": "Enter the city where your organization is located."},
"state_territory": {
"required": "Select the state, territory, or military post where your organization is located."
},
} }
widgets = { widgets = {
# We need to set the required attributed for State/territory
# because for this fields we are creating an individual
# instance of the Select. For the other fields we use the for loop to set
# the class's required attribute to true.
"organization_name": forms.TextInput, "organization_name": forms.TextInput,
"address_line1": forms.TextInput, "address_line1": forms.TextInput,
"address_line2": forms.TextInput, "address_line2": forms.TextInput,
"city": forms.TextInput, "city": forms.TextInput,
"state_territory": forms.Select(
attrs={
"required": True,
},
choices=DomainInformation.StateTerritoryChoices.choices,
),
"urbanization": forms.TextInput, "urbanization": forms.TextInput,
} }

View file

@ -7,6 +7,7 @@ from django import forms
from django.core.validators import RegexValidator, MaxLengthValidator from django.core.validators import RegexValidator, MaxLengthValidator
from django.utils.safestring import mark_safe from django.utils.safestring import mark_safe
from registrar.forms.utility.combobox import ComboboxWidget
from registrar.forms.utility.wizard_form_helper import ( from registrar.forms.utility.wizard_form_helper import (
RegistrarForm, RegistrarForm,
RegistrarFormSet, RegistrarFormSet,
@ -43,7 +44,7 @@ class RequestingEntityForm(RegistrarForm):
label="Suborganization name", label="Suborganization name",
required=False, required=False,
queryset=Suborganization.objects.none(), queryset=Suborganization.objects.none(),
empty_label="--Select--", widget=ComboboxWidget,
) )
requested_suborganization = forms.CharField( requested_suborganization = forms.CharField(
label="Requested suborganization", label="Requested suborganization",
@ -56,22 +57,44 @@ class RequestingEntityForm(RegistrarForm):
suborganization_state_territory = forms.ChoiceField( suborganization_state_territory = forms.ChoiceField(
label="State, territory, or military post", label="State, territory, or military post",
required=False, required=False,
choices=[("", "--Select--")] + DomainRequest.StateTerritoryChoices.choices, choices=DomainRequest.StateTerritoryChoices.choices,
widget=ComboboxWidget,
) )
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
"""Override of init to add the suborganization queryset""" """Override of init to add the suborganization queryset and 'other' option"""
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
if self.domain_request.portfolio: if self.domain_request.portfolio:
self.fields["sub_organization"].queryset = Suborganization.objects.filter( # Fetch the queryset for the portfolio
portfolio=self.domain_request.portfolio queryset = Suborganization.objects.filter(portfolio=self.domain_request.portfolio)
) # set the queryset appropriately so that post can validate against queryset
self.fields["sub_organization"].queryset = queryset
# Modify the choices to include "other" so that form can display options properly
self.fields["sub_organization"].choices = [(obj.id, str(obj)) for obj in queryset] + [
("other", "Other (enter your suborganization manually)")
]
@classmethod
def from_database(cls, obj: DomainRequest | Contact | None):
"""Returns a dict of form field values gotten from `obj`.
Overrides RegistrarForm method in order to set sub_organization to 'other'
on GETs of the RequestingEntityForm."""
if obj is None:
return {}
# get the domain request as a dict, per usual method
domain_request_dict = {name: getattr(obj, name) for name in cls.declared_fields.keys()} # type: ignore
# set sub_organization to 'other' if is_requesting_new_suborganization is True
if isinstance(obj, DomainRequest) and obj.is_requesting_new_suborganization():
domain_request_dict["sub_organization"] = "other"
return domain_request_dict
def clean_sub_organization(self): def clean_sub_organization(self):
"""On suborganization clean, set the suborganization value to None if the user is requesting """On suborganization clean, set the suborganization value to None if the user is requesting
a custom suborganization (as it doesn't exist yet)""" a custom suborganization (as it doesn't exist yet)"""
# If it's a new suborganization, return None (equivalent to selecting nothing) # If it's a new suborganization, return None (equivalent to selecting nothing)
if self.cleaned_data.get("is_requesting_new_suborganization"): if self.cleaned_data.get("is_requesting_new_suborganization"):
return None return None
@ -94,41 +117,60 @@ class RequestingEntityForm(RegistrarForm):
return name return name
def full_clean(self): def full_clean(self):
"""Validation logic to remove the custom suborganization value before clean is triggered. """Validation logic to temporarily remove the custom suborganization value before clean is triggered.
Without this override, the form will throw an 'invalid option' error.""" Without this override, the form will throw an 'invalid option' error."""
# Remove the custom other field before cleaning # Ensure self.data is not None before proceeding
data = self.data.copy() if self.data else None if self.data:
# handle case where form has been submitted
# Create a copy of the data for manipulation
data = self.data.copy()
# Remove the 'other' value from suborganization if it exists. # Retrieve sub_organization and store in _original_suborganization
# This is a special value that tracks if the user is requesting a new suborg. suborganization = data.get("portfolio_requesting_entity-sub_organization")
suborganization = self.data.get("portfolio_requesting_entity-sub_organization") self._original_suborganization = suborganization
if suborganization and "other" in suborganization: # If the original value was "other", clear it for validation
data["portfolio_requesting_entity-sub_organization"] = "" if self._original_suborganization == "other":
data["portfolio_requesting_entity-sub_organization"] = ""
# Set the modified data back to the form # Set the modified data back to the form
self.data = data self.data = data
else:
# handle case of a GET
suborganization = None
if self.initial and "sub_organization" in self.initial:
suborganization = self.initial["sub_organization"]
# Check if is_requesting_new_suborganization is True
is_requesting_new_suborganization = False
if self.initial and "is_requesting_new_suborganization" in self.initial:
# Call the method if it exists
is_requesting_new_suborganization = self.initial["is_requesting_new_suborganization"]()
# Determine if "other" should be set
if is_requesting_new_suborganization and suborganization is None:
self._original_suborganization = "other"
else:
self._original_suborganization = suborganization
# Call the parent's full_clean method # Call the parent's full_clean method
super().full_clean() super().full_clean()
# Restore "other" if there are errors
if self.errors:
self.data["portfolio_requesting_entity-sub_organization"] = self._original_suborganization
def clean(self): def clean(self):
"""Custom clean implementation to handle our desired logic flow for suborganization. """Custom clean implementation to handle our desired logic flow for suborganization."""
Given that these fields often rely on eachother, we need to do this in the parent function."""
cleaned_data = super().clean() cleaned_data = super().clean()
# Do some custom error validation if the requesting entity is a suborg. # Get the cleaned data
# Otherwise, just validate as normal. suborganization = cleaned_data.get("sub_organization")
suborganization = self.cleaned_data.get("sub_organization") is_requesting_new_suborganization = cleaned_data.get("is_requesting_new_suborganization")
is_requesting_new_suborganization = self.cleaned_data.get("is_requesting_new_suborganization")
# Get the value of the yes/no checkbox from RequestingEntityYesNoForm.
# Since self.data stores this as a string, we need to convert "True" => True.
requesting_entity_is_suborganization = self.data.get( requesting_entity_is_suborganization = self.data.get(
"portfolio_requesting_entity-requesting_entity_is_suborganization" "portfolio_requesting_entity-requesting_entity_is_suborganization"
) )
if requesting_entity_is_suborganization == "True": if requesting_entity_is_suborganization == "True":
if is_requesting_new_suborganization: if is_requesting_new_suborganization:
# Validate custom suborganization fields
if not cleaned_data.get("requested_suborganization") and "requested_suborganization" not in self.errors: if not cleaned_data.get("requested_suborganization") and "requested_suborganization" not in self.errors:
self.add_error("requested_suborganization", "Enter the name of your suborganization.") self.add_error("requested_suborganization", "Enter the name of your suborganization.")
if not cleaned_data.get("suborganization_city"): if not cleaned_data.get("suborganization_city"):
@ -141,6 +183,12 @@ class RequestingEntityForm(RegistrarForm):
elif not suborganization: elif not suborganization:
self.add_error("sub_organization", "Suborganization is required.") self.add_error("sub_organization", "Suborganization is required.")
# If there are errors, restore the "other" value for rendering
if self.errors and getattr(self, "_original_suborganization", None) == "other":
self.cleaned_data["sub_organization"] = self._original_suborganization
elif not self.data and getattr(self, "_original_suborganization", None) == "other":
self.cleaned_data["sub_organization"] = self._original_suborganization
return cleaned_data return cleaned_data
@ -274,7 +322,7 @@ class OrganizationContactForm(RegistrarForm):
# uncomment to see if modelChoiceField can be an arg later # uncomment to see if modelChoiceField can be an arg later
required=False, required=False,
queryset=FederalAgency.objects.exclude(agency__in=excluded_agencies), queryset=FederalAgency.objects.exclude(agency__in=excluded_agencies),
empty_label="--Select--", widget=ComboboxWidget,
) )
organization_name = forms.CharField( organization_name = forms.CharField(
label="Organization name", label="Organization name",
@ -294,10 +342,11 @@ class OrganizationContactForm(RegistrarForm):
) )
state_territory = forms.ChoiceField( state_territory = forms.ChoiceField(
label="State, territory, or military post", label="State, territory, or military post",
choices=[("", "--Select--")] + DomainRequest.StateTerritoryChoices.choices, choices=DomainRequest.StateTerritoryChoices.choices,
error_messages={ error_messages={
"required": ("Select the state, territory, or military post where your organization is located.") "required": ("Select the state, territory, or military post where your organization is located.")
}, },
widget=ComboboxWidget,
) )
zipcode = forms.CharField( zipcode = forms.CharField(
label="Zip code", label="Zip code",

View file

@ -6,6 +6,7 @@ from django.core.validators import RegexValidator
from django.core.validators import MaxLengthValidator from django.core.validators import MaxLengthValidator
from django.utils.safestring import mark_safe from django.utils.safestring import mark_safe
from registrar.forms.utility.combobox import ComboboxWidget
from registrar.models import ( from registrar.models import (
PortfolioInvitation, PortfolioInvitation,
UserPortfolioPermission, UserPortfolioPermission,
@ -33,6 +34,15 @@ class PortfolioOrgAddressForm(forms.ModelForm):
"required": "Enter a 5-digit or 9-digit zip code, like 12345 or 12345-6789.", "required": "Enter a 5-digit or 9-digit zip code, like 12345 or 12345-6789.",
}, },
) )
state_territory = forms.ChoiceField(
label="State, territory, or military post",
required=True,
choices=DomainInformation.StateTerritoryChoices.choices,
error_messages={
"required": ("Select the state, territory, or military post where your organization is located.")
},
widget=ComboboxWidget(attrs={"required": True}),
)
class Meta: class Meta:
model = Portfolio model = Portfolio
@ -47,25 +57,12 @@ class PortfolioOrgAddressForm(forms.ModelForm):
error_messages = { error_messages = {
"address_line1": {"required": "Enter the street address of your organization."}, "address_line1": {"required": "Enter the street address of your organization."},
"city": {"required": "Enter the city where your organization is located."}, "city": {"required": "Enter the city where your organization is located."},
"state_territory": {
"required": "Select the state, territory, or military post where your organization is located."
},
"zipcode": {"required": "Enter a 5-digit or 9-digit zip code, like 12345 or 12345-6789."}, "zipcode": {"required": "Enter a 5-digit or 9-digit zip code, like 12345 or 12345-6789."},
} }
widgets = { widgets = {
# We need to set the required attributed for State/territory
# because for this fields we are creating an individual
# instance of the Select. For the other fields we use the for loop to set
# the class's required attribute to true.
"address_line1": forms.TextInput, "address_line1": forms.TextInput,
"address_line2": forms.TextInput, "address_line2": forms.TextInput,
"city": forms.TextInput, "city": forms.TextInput,
"state_territory": forms.Select(
attrs={
"required": True,
},
choices=DomainInformation.StateTerritoryChoices.choices,
),
# "urbanization": forms.TextInput, # "urbanization": forms.TextInput,
} }

View file

@ -0,0 +1,5 @@
from django.forms import Select
class ComboboxWidget(Select):
template_name = "django/forms/widgets/combobox.html"

View file

@ -4,9 +4,9 @@ import ipaddress
import re import re
from datetime import date, timedelta from datetime import date, timedelta
from typing import Optional from typing import Optional
from django.db import transaction
from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore
from django.db import models, IntegrityError
from django.db import models
from django.utils import timezone from django.utils import timezone
from typing import Any from typing import Any
from registrar.models.host import Host from registrar.models.host import Host
@ -1329,14 +1329,14 @@ class Domain(TimeStampedModel, DomainHelper):
def get_default_administrative_contact(self): def get_default_administrative_contact(self):
"""Gets the default administrative contact.""" """Gets the default administrative contact."""
logger.info("get_default_security_contact() -> Adding administrative security contact") logger.info("get_default_administrative_contact() -> Adding default administrative contact")
contact = PublicContact.get_default_administrative() contact = PublicContact.get_default_administrative()
contact.domain = self contact.domain = self
return contact return contact
def get_default_technical_contact(self): def get_default_technical_contact(self):
"""Gets the default technical contact.""" """Gets the default technical contact."""
logger.info("get_default_security_contact() -> Adding technical security contact") logger.info("get_default_security_contact() -> Adding default technical contact")
contact = PublicContact.get_default_technical() contact = PublicContact.get_default_technical()
contact.domain = self contact.domain = self
return contact return contact
@ -1678,9 +1678,11 @@ class Domain(TimeStampedModel, DomainHelper):
for domainContact in contact_data: for domainContact in contact_data:
req = commands.InfoContact(id=domainContact.contact) req = commands.InfoContact(id=domainContact.contact)
data = registry.send(req, cleaned=True).res_data[0] data = registry.send(req, cleaned=True).res_data[0]
logger.info(f"_fetch_contacts => this is the data: {data}")
# Map the object we recieved from EPP to a PublicContact # Map the object we recieved from EPP to a PublicContact
mapped_object = self.map_epp_contact_to_public_contact(data, domainContact.contact, domainContact.type) mapped_object = self.map_epp_contact_to_public_contact(data, domainContact.contact, domainContact.type)
logger.info(f"_fetch_contacts => mapped_object: {mapped_object}")
# Find/create it in the DB # Find/create it in the DB
in_db = self._get_or_create_public_contact(mapped_object) in_db = self._get_or_create_public_contact(mapped_object)
@ -1871,8 +1873,9 @@ class Domain(TimeStampedModel, DomainHelper):
missingSecurity = True missingSecurity = True
missingTech = True missingTech = True
if len(cleaned.get("_contacts")) < 3: contacts = cleaned.get("_contacts", [])
for contact in cleaned.get("_contacts"): if len(contacts) < 3:
for contact in contacts:
if contact.type == PublicContact.ContactTypeChoices.ADMINISTRATIVE: if contact.type == PublicContact.ContactTypeChoices.ADMINISTRATIVE:
missingAdmin = False missingAdmin = False
if contact.type == PublicContact.ContactTypeChoices.SECURITY: if contact.type == PublicContact.ContactTypeChoices.SECURITY:
@ -1891,6 +1894,11 @@ class Domain(TimeStampedModel, DomainHelper):
technical_contact = self.get_default_technical_contact() technical_contact = self.get_default_technical_contact()
technical_contact.save() technical_contact.save()
logger.info(
"_add_missing_contacts_if_unknown => Adding contacts. Values are "
f"missingAdmin: {missingAdmin}, missingSecurity: {missingSecurity}, missingTech: {missingTech}"
)
def _fetch_cache(self, fetch_hosts=False, fetch_contacts=False): def _fetch_cache(self, fetch_hosts=False, fetch_contacts=False):
"""Contact registry for info about a domain.""" """Contact registry for info about a domain."""
try: try:
@ -2104,8 +2112,21 @@ class Domain(TimeStampedModel, DomainHelper):
# Save to DB if it doesn't exist already. # Save to DB if it doesn't exist already.
if db_contact.count() == 0: if db_contact.count() == 0:
# Doesn't run custom save logic, just saves to DB # Doesn't run custom save logic, just saves to DB
public_contact.save(skip_epp_save=True) try:
logger.info(f"Created a new PublicContact: {public_contact}") with transaction.atomic():
public_contact.save(skip_epp_save=True)
logger.info(f"Created a new PublicContact: {public_contact}")
except IntegrityError as err:
logger.error(
f"_get_or_create_public_contact() => tried to create a duplicate public contact: {err}",
exc_info=True,
)
return PublicContact.objects.get(
registry_id=public_contact.registry_id,
contact_type=public_contact.contact_type,
domain=self,
)
# Append the item we just created # Append the item we just created
return public_contact return public_contact
@ -2115,7 +2136,7 @@ class Domain(TimeStampedModel, DomainHelper):
if existing_contact.email != public_contact.email or existing_contact.registry_id != public_contact.registry_id: if existing_contact.email != public_contact.email or existing_contact.registry_id != public_contact.registry_id:
existing_contact.delete() existing_contact.delete()
public_contact.save() public_contact.save()
logger.warning("Requested PublicContact is out of sync " "with DB.") logger.warning("Requested PublicContact is out of sync with DB.")
return public_contact return public_contact
# If it already exists, we can assume that the DB instance was updated during set, so we should just use that. # If it already exists, we can assume that the DB instance was updated during set, so we should just use that.

View file

@ -0,0 +1,14 @@
{% extends 'django/admin/email_clipboard_change_form.html' %}
{% load custom_filters %}
{% load i18n static %}
{% block content_subtitle %}
<div class="usa-alert usa-alert--info usa-alert--slim">
<div class="usa-alert__body margin-left-1 maxw-none">
<p class="usa-alert__text maxw-none">
If you add someone to a domain here, it will trigger emails to the invitee and all managers of the domain when you click "save." If you don't want to trigger those emails, use the <a class="usa-link" href="{% url 'admin:registrar_userdomainrole_changelist' %}">User domain roles permissions table</a> instead.
</p>
</div>
</div>
{{ block.super }}
{% endblock %}

View file

@ -0,0 +1,14 @@
{% extends 'django/admin/email_clipboard_change_form.html' %}
{% load custom_filters %}
{% load i18n static %}
{% block content_subtitle %}
<div class="usa-alert usa-alert--info usa-alert--slim">
<div class="usa-alert__body margin-left-1 maxw-none">
<p class="usa-alert__text maxw-none">
If you add someone to a domain here, it will not trigger any emails. To trigger emails, use the <a class="usa-link" href="{% url 'admin:registrar_domaininvitation_changelist' %}">User Domain Role invitations table</a> instead.
</p>
</div>
</div>
{{ block.super }}
{% endblock %}

View file

@ -11,6 +11,7 @@ for now we just carry the attribute to both the parent element and the select.
{{ name }}="{{ value }}" {{ name }}="{{ value }}"
{% endif %} {% endif %}
{% endfor %} {% endfor %}
data-default-value="{% for group_name, group_choices, group_index in widget.optgroups %}{% for option in group_choices %}{% if option.selected %}{{ option.value }}{% endif %}{% endfor %}{% endfor %}"
> >
{% include "django/forms/widgets/select.html" %} {% include "django/forms/widgets/select.html" with is_combobox=True %}
</div> </div>

View file

@ -3,6 +3,9 @@
{# hint: spacing in the class string matters #} {# hint: spacing in the class string matters #}
class="usa-select{% if classes %} {{ classes }}{% endif %}" class="usa-select{% if classes %} {{ classes }}{% endif %}"
{% include "django/forms/widgets/attrs.html" %} {% include "django/forms/widgets/attrs.html" %}
{% if is_combobox %}
data-default-value="{% for group_name, group_choices, group_index in widget.optgroups %}{% for option in group_choices %}{% if option.selected %}{{ option.value }}{% endif %}{% endfor %}{% endfor %}"
{% endif %}
> >
{% for group, options, index in widget.optgroups %} {% for group, options, index in widget.optgroups %}
{% if group %}<optgroup label="{{ group }}">{% endif %} {% if group %}<optgroup label="{{ group }}">{% endif %}

View file

@ -0,0 +1,43 @@
{% autoescape off %}{# In a text file, we don't want to have HTML entities escaped #}
Hi,{% if domain_manager and domain_manager.first_name %} {{ domain_manager.first_name }}.{% endif %}
A domain manager was invited to {{ domain.name }}.
DOMAIN: {{ domain.name }}
INVITED BY: {{ requestor_email }}
INVITED ON: {{date}}
MANAGER INVITED: {{ invited_email_address }}
----------------------------------------------------------------
NEXT STEPS
The person who received the invitation will become a domain manager once they log in to the
.gov registrar. They'll need to access the registrar using a Login.gov account that's
associated with the invited email address.
If you need to cancel this invitation or remove the domain manager (because they've already
logged in), you can do that by going to this domain in the .gov registrar <https://manage.get.gov/>.
WHY DID YOU RECEIVE THIS EMAIL?
Youre listed as a domain manager for {{ domain.name }}, so youll receive a notification whenever
someone is invited to manage that domain.
If you have questions or concerns, reach out to the person who sent the invitation or reply to this email.
THANK YOU
.Gov helps the public identify official, trusted information. Thank you for using a .gov domain.
----------------------------------------------------------------
The .gov team
Contact us: <https://get.gov/contact/>
Learn about .gov <https://get.gov>
The .gov registry is a part of the Cybersecurity and Infrastructure Security Agency
(CISA) <https://cisa.gov/>
{% endautoescape %}

View file

@ -0,0 +1 @@
A domain manager was invited to {{ domain.name }}

View file

@ -578,6 +578,13 @@ class MockDb(TestCase):
creator=cls.custom_superuser, federal_agency=cls.federal_agency_3, organization_type="federal" creator=cls.custom_superuser, federal_agency=cls.federal_agency_3, organization_type="federal"
) )
cls.suborganization_1, _ = Suborganization.objects.get_or_create(
name="SubOrg 1",
portfolio=cls.portfolio_1,
city="Nashville",
state_territory="TN",
)
current_date = get_time_aware_date(datetime(2024, 4, 2)) current_date = get_time_aware_date(datetime(2024, 4, 2))
# Create start and end dates using timedelta # Create start and end dates using timedelta
@ -848,6 +855,7 @@ class MockDb(TestCase):
status=DomainRequest.DomainRequestStatus.IN_REVIEW, status=DomainRequest.DomainRequestStatus.IN_REVIEW,
name="city2.gov", name="city2.gov",
portfolio=cls.portfolio_1, portfolio=cls.portfolio_1,
sub_organization=cls.suborganization_1,
) )
cls.domain_request_3 = completed_domain_request( cls.domain_request_3 = completed_domain_request(
status=DomainRequest.DomainRequestStatus.STARTED, status=DomainRequest.DomainRequestStatus.STARTED,
@ -863,6 +871,9 @@ class MockDb(TestCase):
cls.domain_request_5 = completed_domain_request( cls.domain_request_5 = completed_domain_request(
status=DomainRequest.DomainRequestStatus.APPROVED, status=DomainRequest.DomainRequestStatus.APPROVED,
name="city5.gov", name="city5.gov",
requested_suborganization="requested_suborg",
suborganization_city="SanFran",
suborganization_state_territory="CA",
) )
cls.domain_request_6 = completed_domain_request( cls.domain_request_6 = completed_domain_request(
status=DomainRequest.DomainRequestStatus.STARTED, status=DomainRequest.DomainRequestStatus.STARTED,

View file

@ -166,6 +166,29 @@ class TestDomainInvitationAdmin(TestCase):
) )
self.assertContains(response, "Show more") self.assertContains(response, "Show more")
@less_console_noise_decorator
def test_has_change_form_description(self):
"""Tests if this model has a model description on the change form view"""
self.client.force_login(self.superuser)
domain, _ = Domain.objects.get_or_create(name="systemofadown.com")
domain_invitation, _ = DomainInvitation.objects.get_or_create(email="toxicity@systemofadown.com", domain=domain)
response = self.client.get(
"/admin/registrar/domaininvitation/{}/change/".format(domain_invitation.pk),
follow=True,
)
# Make sure that the page is loaded correctly
self.assertEqual(response.status_code, 200)
# Test for a description snippet
self.assertContains(
response,
"If you add someone to a domain here, it will trigger emails to the invitee and all managers of the domain",
)
@less_console_noise_decorator @less_console_noise_decorator
def test_get_filters(self): def test_get_filters(self):
"""Ensures that our filters are displaying correctly""" """Ensures that our filters are displaying correctly"""
@ -1957,6 +1980,31 @@ class TestUserDomainRoleAdmin(TestCase):
) )
self.assertContains(response, "Show more") self.assertContains(response, "Show more")
@less_console_noise_decorator
def test_has_change_form_description(self):
"""Tests if this model has a model description on the change form view"""
self.client.force_login(self.superuser)
domain, _ = Domain.objects.get_or_create(name="systemofadown.com")
user_domain_role, _ = UserDomainRole.objects.get_or_create(
user=self.superuser, domain=domain, role=[UserDomainRole.Roles.MANAGER]
)
response = self.client.get(
"/admin/registrar/userdomainrole/{}/change/".format(user_domain_role.pk),
follow=True,
)
# Make sure that the page is loaded correctly
self.assertEqual(response.status_code, 200)
# Test for a description snippet
self.assertContains(
response,
"If you add someone to a domain here, it will not trigger any emails.",
)
def test_domain_sortable(self): def test_domain_sortable(self):
"""Tests if the UserDomainrole sorts by domain correctly""" """Tests if the UserDomainrole sorts by domain correctly"""
with less_console_noise(): with less_console_noise():
@ -3442,7 +3490,7 @@ class TestTransferUser(WebTest):
@less_console_noise_decorator @less_console_noise_decorator
def test_transfer_user_transfers_user_portfolio_roles_no_error_when_duplicates(self): def test_transfer_user_transfers_user_portfolio_roles_no_error_when_duplicates(self):
"""Assert that duplicate portfolio user roles do not throw errorsd""" """Assert that duplicate portfolio user roles do not throw errors"""
portfolio1 = Portfolio.objects.create(organization_name="Hotel California", creator=self.user2) portfolio1 = Portfolio.objects.create(organization_name="Hotel California", creator=self.user2)
UserPortfolioPermission.objects.create( UserPortfolioPermission.objects.create(
user=self.user1, portfolio=portfolio1, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] user=self.user1, portfolio=portfolio1, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN]
@ -3574,7 +3622,7 @@ class TestTransferUser(WebTest):
with self.assertRaises(User.DoesNotExist): with self.assertRaises(User.DoesNotExist):
self.user2.refresh_from_db() self.user2.refresh_from_db()
@less_console_noise_decorator # @less_console_noise_decorator
def test_transfer_user_throws_transfer_and_delete_success_messages(self): def test_transfer_user_throws_transfer_and_delete_success_messages(self):
"""Test that success messages for data transfer and user deletion are displayed.""" """Test that success messages for data transfer and user deletion are displayed."""
# Ensure the setup for VerifiedByStaff # Ensure the setup for VerifiedByStaff
@ -3592,11 +3640,13 @@ class TestTransferUser(WebTest):
self.assertContains(after_submit, "<h1>Change user</h1>") self.assertContains(after_submit, "<h1>Change user</h1>")
print(mock_success_message.call_args_list)
mock_success_message.assert_any_call( mock_success_message.assert_any_call(
ANY, ANY,
( (
"Data transferred successfully for the following objects: ['Changed requestor " "Data transferred successfully for the following objects: ['Changed requestor "
+ 'from "Furiosa Jabassa " to "Max Rokatanski " on immortan.joe@citadel.com\']' + "from Furiosa Jabassa to Max Rokatanski on immortan.joe@citadel.com']"
), ),
) )
@ -3606,7 +3656,7 @@ class TestTransferUser(WebTest):
def test_transfer_user_throws_error_message(self): def test_transfer_user_throws_error_message(self):
"""Test that an error message is thrown if the transfer fails.""" """Test that an error message is thrown if the transfer fails."""
with patch( with patch(
"registrar.views.TransferUserView.transfer_user_fields_and_log", side_effect=Exception("Simulated Error") "registrar.views.TransferUserView.transfer_related_fields_and_log", side_effect=Exception("Simulated Error")
): ):
with patch("django.contrib.messages.error") as mock_error: with patch("django.contrib.messages.error") as mock_error:
# Access the transfer user page # Access the transfer user page

View file

@ -0,0 +1,311 @@
import unittest
from unittest.mock import patch, MagicMock
from datetime import date
from registrar.utility.email import EmailSendingError
from registrar.utility.email_invitations import send_domain_invitation_email
from api.tests.common import less_console_noise_decorator
class DomainInvitationEmail(unittest.TestCase):
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.send_templated_email")
@patch("registrar.utility.email_invitations.UserDomainRole.objects.filter")
@patch("registrar.utility.email_invitations.validate_invitation")
@patch("registrar.utility.email_invitations.get_requestor_email")
@patch("registrar.utility.email_invitations.send_invitation_email")
@patch("registrar.utility.email_invitations.normalize_domains")
def test_send_domain_invitation_email(
self,
mock_normalize_domains,
mock_send_invitation_email,
mock_get_requestor_email,
mock_validate_invitation,
mock_user_domain_role_filter,
mock_send_templated_email,
):
"""Test sending domain invitation email for one domain.
Should also send emails to manager of that domain.
"""
# Setup
mock_domain = MagicMock(name="domain1")
mock_domain.name = "example.com"
mock_normalize_domains.return_value = [mock_domain]
mock_requestor = MagicMock()
mock_requestor_email = "requestor@example.com"
mock_get_requestor_email.return_value = mock_requestor_email
mock_user1 = MagicMock()
mock_user1.email = "manager1@example.com"
mock_user_domain_role_filter.return_value = [MagicMock(user=mock_user1)]
email = "invitee@example.com"
is_member_of_different_org = False
# Call the function
send_domain_invitation_email(
email=email,
requestor=mock_requestor,
domains=mock_domain,
is_member_of_different_org=is_member_of_different_org,
)
# Assertions
mock_normalize_domains.assert_called_once_with(mock_domain)
mock_get_requestor_email.assert_called_once_with(mock_requestor, [mock_domain])
mock_validate_invitation.assert_called_once_with(
email, [mock_domain], mock_requestor, is_member_of_different_org
)
mock_send_invitation_email.assert_called_once_with(email, mock_requestor_email, [mock_domain], None)
mock_user_domain_role_filter.assert_called_once_with(domain=mock_domain)
mock_send_templated_email.assert_called_once_with(
"emails/domain_manager_notification.txt",
"emails/domain_manager_notification_subject.txt",
to_address=mock_user1.email,
context={
"domain": mock_domain,
"requestor_email": mock_requestor_email,
"invited_email_address": email,
"domain_manager": mock_user1,
"date": date.today(),
},
)
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.send_templated_email")
@patch("registrar.utility.email_invitations.UserDomainRole.objects.filter")
@patch("registrar.utility.email_invitations.validate_invitation")
@patch("registrar.utility.email_invitations.get_requestor_email")
@patch("registrar.utility.email_invitations.send_invitation_email")
@patch("registrar.utility.email_invitations.normalize_domains")
def test_send_domain_invitation_email_multiple_domains(
self,
mock_normalize_domains,
mock_send_invitation_email,
mock_get_requestor_email,
mock_validate_invitation,
mock_user_domain_role_filter,
mock_send_templated_email,
):
"""Test sending domain invitation email for multiple domains.
Should also send emails to managers of each domain.
"""
# Setup
# Create multiple mock domains
mock_domain1 = MagicMock(name="domain1")
mock_domain1.name = "example.com"
mock_domain2 = MagicMock(name="domain2")
mock_domain2.name = "example.org"
mock_normalize_domains.return_value = [mock_domain1, mock_domain2]
mock_requestor = MagicMock()
mock_requestor_email = "requestor@example.com"
mock_get_requestor_email.return_value = mock_requestor_email
mock_user1 = MagicMock()
mock_user1.email = "manager1@example.com"
mock_user2 = MagicMock()
mock_user2.email = "manager2@example.com"
# Configure domain roles for each domain
def filter_side_effect(domain):
if domain == mock_domain1:
return [MagicMock(user=mock_user1)]
elif domain == mock_domain2:
return [MagicMock(user=mock_user2)]
return []
mock_user_domain_role_filter.side_effect = filter_side_effect
email = "invitee@example.com"
is_member_of_different_org = False
# Call the function
send_domain_invitation_email(
email=email,
requestor=mock_requestor,
domains=[mock_domain1, mock_domain2],
is_member_of_different_org=is_member_of_different_org,
)
# Assertions
mock_normalize_domains.assert_called_once_with([mock_domain1, mock_domain2])
mock_get_requestor_email.assert_called_once_with(mock_requestor, [mock_domain1, mock_domain2])
mock_validate_invitation.assert_called_once_with(
email, [mock_domain1, mock_domain2], mock_requestor, is_member_of_different_org
)
mock_send_invitation_email.assert_called_once_with(
email, mock_requestor_email, [mock_domain1, mock_domain2], None
)
# Check that domain manager emails were sent for both domains
mock_user_domain_role_filter.assert_any_call(domain=mock_domain1)
mock_user_domain_role_filter.assert_any_call(domain=mock_domain2)
mock_send_templated_email.assert_any_call(
"emails/domain_manager_notification.txt",
"emails/domain_manager_notification_subject.txt",
to_address=mock_user1.email,
context={
"domain": mock_domain1,
"requestor_email": mock_requestor_email,
"invited_email_address": email,
"domain_manager": mock_user1,
"date": date.today(),
},
)
mock_send_templated_email.assert_any_call(
"emails/domain_manager_notification.txt",
"emails/domain_manager_notification_subject.txt",
to_address=mock_user2.email,
context={
"domain": mock_domain2,
"requestor_email": mock_requestor_email,
"invited_email_address": email,
"domain_manager": mock_user2,
"date": date.today(),
},
)
# Verify the total number of calls to send_templated_email
self.assertEqual(mock_send_templated_email.call_count, 2)
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.validate_invitation")
def test_send_domain_invitation_email_raises_invite_validation_exception(self, mock_validate_invitation):
"""Test sending domain invitation email for one domain and assert exception
when invite validation fails.
"""
# Setup
mock_validate_invitation.side_effect = ValueError("Validation failed")
email = "invitee@example.com"
requestor = MagicMock()
domain = MagicMock()
# Call and assert exception
with self.assertRaises(ValueError) as context:
send_domain_invitation_email(email, requestor, domain, is_member_of_different_org=False)
self.assertEqual(str(context.exception), "Validation failed")
mock_validate_invitation.assert_called_once()
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.get_requestor_email")
def test_send_domain_invitation_email_raises_get_requestor_email_exception(self, mock_get_requestor_email):
"""Test sending domain invitation email for one domain and assert exception
when get_requestor_email fails.
"""
# Setup
mock_get_requestor_email.side_effect = ValueError("Validation failed")
email = "invitee@example.com"
requestor = MagicMock()
domain = MagicMock()
# Call and assert exception
with self.assertRaises(ValueError) as context:
send_domain_invitation_email(email, requestor, domain, is_member_of_different_org=False)
self.assertEqual(str(context.exception), "Validation failed")
mock_get_requestor_email.assert_called_once()
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.validate_invitation")
@patch("registrar.utility.email_invitations.get_requestor_email")
@patch("registrar.utility.email_invitations.send_invitation_email")
@patch("registrar.utility.email_invitations.normalize_domains")
def test_send_domain_invitation_email_raises_sending_email_exception(
self,
mock_normalize_domains,
mock_send_invitation_email,
mock_get_requestor_email,
mock_validate_invitation,
):
"""Test sending domain invitation email for one domain and assert exception
when send_invitation_email fails.
"""
# Setup
mock_domain = MagicMock(name="domain1")
mock_domain.name = "example.com"
mock_normalize_domains.return_value = [mock_domain]
mock_requestor = MagicMock()
mock_requestor_email = "requestor@example.com"
mock_get_requestor_email.return_value = mock_requestor_email
mock_user1 = MagicMock()
mock_user1.email = "manager1@example.com"
email = "invitee@example.com"
is_member_of_different_org = False
mock_send_invitation_email.side_effect = EmailSendingError("Error sending email")
# Call and assert exception
with self.assertRaises(EmailSendingError) as context:
send_domain_invitation_email(
email=email,
requestor=mock_requestor,
domains=mock_domain,
is_member_of_different_org=is_member_of_different_org,
)
# Assertions
mock_normalize_domains.assert_called_once_with(mock_domain)
mock_get_requestor_email.assert_called_once_with(mock_requestor, [mock_domain])
mock_validate_invitation.assert_called_once_with(
email, [mock_domain], mock_requestor, is_member_of_different_org
)
self.assertEqual(str(context.exception), "Error sending email")
@less_console_noise_decorator
@patch("registrar.utility.email_invitations.send_emails_to_domain_managers")
@patch("registrar.utility.email_invitations.validate_invitation")
@patch("registrar.utility.email_invitations.get_requestor_email")
@patch("registrar.utility.email_invitations.send_invitation_email")
@patch("registrar.utility.email_invitations.normalize_domains")
def test_send_domain_invitation_email_manager_emails_send_mail_exception(
self,
mock_normalize_domains,
mock_send_invitation_email,
mock_get_requestor_email,
mock_validate_invitation,
mock_send_domain_manager_emails,
):
"""Test sending domain invitation email for one domain and assert exception
when send_emails_to_domain_managers fails.
"""
# Setup
mock_domain = MagicMock(name="domain1")
mock_domain.name = "example.com"
mock_normalize_domains.return_value = [mock_domain]
mock_requestor = MagicMock()
mock_requestor_email = "requestor@example.com"
mock_get_requestor_email.return_value = mock_requestor_email
email = "invitee@example.com"
is_member_of_different_org = False
mock_send_domain_manager_emails.side_effect = EmailSendingError("Error sending email")
# Call and assert exception
with self.assertRaises(EmailSendingError) as context:
send_domain_invitation_email(
email=email,
requestor=mock_requestor,
domains=mock_domain,
is_member_of_different_org=is_member_of_different_org,
)
# Assertions
mock_normalize_domains.assert_called_once_with(mock_domain)
mock_get_requestor_email.assert_called_once_with(mock_requestor, [mock_domain])
mock_validate_invitation.assert_called_once_with(
email, [mock_domain], mock_requestor, is_member_of_different_org
)
mock_send_invitation_email.assert_called_once_with(email, mock_requestor_email, [mock_domain], None)
self.assertEqual(str(context.exception), "Error sending email")

View file

@ -2101,6 +2101,10 @@ class TestPatchSuborganizations(MockDbForIndividualTests):
1. Fewest spaces 1. Fewest spaces
2. Most leading capitals 2. Most leading capitals
""" """
# Delete any other suborganizations defined in the initial test dataset
DomainRequest.objects.all().delete()
Suborganization.objects.all().delete()
Suborganization.objects.create(name="Test Organization ", portfolio=self.portfolio_1) Suborganization.objects.create(name="Test Organization ", portfolio=self.portfolio_1)
Suborganization.objects.create(name="test organization", portfolio=self.portfolio_1) Suborganization.objects.create(name="test organization", portfolio=self.portfolio_1)
Suborganization.objects.create(name="Test Organization", portfolio=self.portfolio_1) Suborganization.objects.create(name="Test Organization", portfolio=self.portfolio_1)
@ -2114,6 +2118,10 @@ class TestPatchSuborganizations(MockDbForIndividualTests):
@less_console_noise_decorator @less_console_noise_decorator
def test_hardcoded_record(self): def test_hardcoded_record(self):
"""Tests that our hardcoded records update as we expect them to""" """Tests that our hardcoded records update as we expect them to"""
# Delete any other suborganizations defined in the initial test dataset
DomainRequest.objects.all().delete()
Suborganization.objects.all().delete()
# Create orgs with old and new name formats # Create orgs with old and new name formats
old_name = "USDA/OC" old_name = "USDA/OC"
new_name = "USDA, Office of Communications" new_name = "USDA, Office of Communications"
@ -2123,7 +2131,7 @@ class TestPatchSuborganizations(MockDbForIndividualTests):
self.run_patch_suborganizations() self.run_patch_suborganizations()
# Verify only the new one remains # Verify only the new one of the two remains
self.assertEqual(Suborganization.objects.count(), 1) self.assertEqual(Suborganization.objects.count(), 1)
remaining = Suborganization.objects.first() remaining = Suborganization.objects.first()
self.assertEqual(remaining.name, new_name) self.assertEqual(remaining.name, new_name)

View file

@ -349,6 +349,70 @@ class TestDomainCache(MockEppLib):
class TestDomainCreation(MockEppLib): class TestDomainCreation(MockEppLib):
"""Rule: An approved domain request must result in a domain""" """Rule: An approved domain request must result in a domain"""
@less_console_noise_decorator
def test_get_or_create_public_contact_race_condition(self):
"""
Scenario: Two processes try to create the same security contact simultaneously
Given a domain in UNKNOWN state
When a race condition occurs during contact creation
Then no IntegrityError is raised
And only one security contact exists in database
And the correct public contact is returned
CONTEXT: We ran into an intermittent but somewhat rare issue where IntegrityError
was raised when creating PublicContact.
Per our logs, this seemed to appear during periods of high app activity.
"""
domain, _ = Domain.objects.get_or_create(name="defaultsecurity.gov")
self.first_call = True
def mock_filter(*args, **kwargs):
"""Simulates a race condition by creating a
duplicate contact between the first filter and save.
"""
# Return an empty queryset for the first call. Otherwise just proceed as normal.
if self.first_call:
self.first_call = False
duplicate = PublicContact(
domain=domain,
contact_type=PublicContact.ContactTypeChoices.SECURITY,
registry_id="defaultSec",
email="dotgov@cisa.dhs.gov",
name="Registry Customer Service",
)
duplicate.save(skip_epp_save=True)
return PublicContact.objects.none()
return PublicContact.objects.filter(*args, **kwargs)
with patch.object(PublicContact.objects, "filter", side_effect=mock_filter):
try:
public_contact = PublicContact(
domain=domain,
contact_type=PublicContact.ContactTypeChoices.SECURITY,
registry_id="defaultSec",
email="dotgov@cisa.dhs.gov",
name="Registry Customer Service",
)
returned_public_contact = domain._get_or_create_public_contact(public_contact)
except IntegrityError:
self.fail(
"IntegrityError was raised during contact creation due to a race condition. "
"This indicates that concurrent contact creation is not working in some cases. "
"The error occurs when two processes try to create the same contact simultaneously. "
"Expected behavior: gracefully handle duplicate creation and return existing contact."
)
# Verify that only one contact exists and its correctness
security_contacts = PublicContact.objects.filter(
domain=domain, contact_type=PublicContact.ContactTypeChoices.SECURITY
)
self.assertEqual(security_contacts.count(), 1)
self.assertEqual(returned_public_contact, security_contacts.get())
self.assertEqual(returned_public_contact.registry_id, "defaultSec")
self.assertEqual(returned_public_contact.email, "dotgov@cisa.dhs.gov")
@boto3_mocking.patching @boto3_mocking.patching
def test_approved_domain_request_creates_domain_locally(self): def test_approved_domain_request_creates_domain_locally(self):
""" """

View file

@ -729,6 +729,7 @@ class ExportDataTest(MockDbForIndividualTests, MockEppLib):
# "Submitted at", # "Submitted at",
"Status", "Status",
"Domain type", "Domain type",
"Portfolio",
"Federal type", "Federal type",
"Federal agency", "Federal agency",
"Organization name", "Organization name",
@ -736,6 +737,10 @@ class ExportDataTest(MockDbForIndividualTests, MockEppLib):
"City", "City",
"State/territory", "State/territory",
"Region", "Region",
"Suborganization",
"Requested suborg",
"Suborg city",
"Suborg state/territory",
"Creator first name", "Creator first name",
"Creator last name", "Creator last name",
"Creator email", "Creator email",
@ -765,28 +770,30 @@ class ExportDataTest(MockDbForIndividualTests, MockEppLib):
expected_content = ( expected_content = (
# Header # Header
"Domain request,Status,Domain type,Federal type,Federal agency,Organization name,Election office," "Domain request,Status,Domain type,Portfolio,Federal type,Federal agency,Organization name,"
"City,State/territory,Region,Creator first name,Creator last name,Creator email," "Election office,City,State/territory,Region,Suborganization,Requested suborg,Suborg city,"
"Suborg state/territory,Creator first name,Creator last name,Creator email,"
"Creator approved domains count,Creator active requests count,Alternative domains,SO first name," "Creator approved domains count,Creator active requests count,Alternative domains,SO first name,"
"SO last name,SO email,SO title/role,Request purpose,Request additional details,Other contacts," "SO last name,SO email,SO title/role,Request purpose,Request additional details,Other contacts,"
"CISA regional representative,Current websites,Investigator\n" "CISA regional representative,Current websites,Investigator\n"
# Content # Content
"city5.gov,Approved,Federal,Executive,,Testorg,N/A,,NY,2,,,,1,0,city1.gov,Testy,Tester,testy@town.com," "city5.gov,Approved,Federal,No,Executive,,Testorg,N/A,,NY,2,requested_suborg,SanFran,CA,,,,,1,0,"
"Chief Tester,Purpose of the site,There is more,Testy Tester testy2@town.com,,city.com,\n" "city1.gov,Testy,Tester,testy@town.com,Chief Tester,Purpose of the site,There is more,"
"city2.gov,In review,Federal,Executive,Portfolio 1 Federal Agency,,N/A,,NY,2,,,,0,1,city1.gov,,,,," "Testy Tester testy2@town.com,,city.com,\n"
"Purpose of the site,There is more,Testy Tester testy2@town.com,,city.com,\n" "city2.gov,In review,Federal,Yes,Executive,Portfolio 1 Federal Agency,,N/A,,,2,SubOrg 1,,,,,,,0,"
"city3.gov,Submitted,Federal,Executive,Portfolio 1 Federal Agency,,N/A,,NY,2,,,,0,1," "1,city1.gov,,,,,Purpose of the site,There is more,Testy Tester testy2@town.com,,city.com,\n"
"city3.gov,Submitted,Federal,Yes,Executive,Portfolio 1 Federal Agency,,N/A,,,2,,,,,,,,0,1,"
'"cheeseville.gov, city1.gov, igorville.gov",,,,,Purpose of the site,CISA-first-name CISA-last-name | ' '"cheeseville.gov, city1.gov, igorville.gov",,,,,Purpose of the site,CISA-first-name CISA-last-name | '
'There is more,"Meow Tester24 te2@town.com, Testy1232 Tester24 te2@town.com, ' 'There is more,"Meow Tester24 te2@town.com, Testy1232 Tester24 te2@town.com, '
'Testy Tester testy2@town.com",' 'Testy Tester testy2@town.com",'
'test@igorville.com,"city.com, https://www.example2.com, https://www.example.com",\n' 'test@igorville.com,"city.com, https://www.example2.com, https://www.example.com",\n'
"city4.gov,Submitted,City,Executive,,Testorg,Yes,,NY,2,,,,0,1,city1.gov,Testy," "city4.gov,Submitted,City,No,Executive,,Testorg,Yes,,NY,2,,,,,,,,0,1,city1.gov,Testy,"
"Tester,testy@town.com," "Tester,testy@town.com,"
"Chief Tester,Purpose of the site,CISA-first-name CISA-last-name | There is more," "Chief Tester,Purpose of the site,CISA-first-name CISA-last-name | There is more,"
"Testy Tester testy2@town.com," "Testy Tester testy2@town.com,"
"cisaRep@igorville.gov,city.com,\n" "cisaRep@igorville.gov,city.com,\n"
"city6.gov,Submitted,Federal,Executive,Portfolio 1 Federal Agency,,N/A,,NY,2,,,,0,1,city1.gov,,,,," "city6.gov,Submitted,Federal,Yes,Executive,Portfolio 1 Federal Agency,,N/A,,,2,,,,,,,,0,1,city1.gov,"
"Purpose of the site,CISA-first-name CISA-last-name | There is more,Testy Tester testy2@town.com," ",,,,Purpose of the site,CISA-first-name CISA-last-name | There is more,Testy Tester testy2@town.com,"
"cisaRep@igorville.gov,city.com,\n" "cisaRep@igorville.gov,city.com,\n"
) )

View file

@ -9,7 +9,7 @@ from registrar.utility.email import EmailSendingError
from waffle.testutils import override_flag from waffle.testutils import override_flag
from api.tests.common import less_console_noise_decorator from api.tests.common import less_console_noise_decorator
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from .common import MockEppLib, MockSESClient, create_user # type: ignore from .common import MockEppLib, create_user # type: ignore
from django_webtest import WebTest # type: ignore from django_webtest import WebTest # type: ignore
import boto3_mocking # type: ignore import boto3_mocking # type: ignore
@ -750,11 +750,12 @@ class TestDomainManagers(TestDomainOverview):
response = self.client.get(reverse("domain-users-add", kwargs={"pk": self.domain.id})) response = self.client.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
self.assertContains(response, "Add a domain manager") self.assertContains(response, "Add a domain manager")
@boto3_mocking.patching
@less_console_noise_decorator @less_console_noise_decorator
def test_domain_user_add_form(self): @patch("registrar.views.domain.send_domain_invitation_email")
def test_domain_user_add_form(self, mock_send_domain_email):
"""Adding an existing user works.""" """Adding an existing user works."""
get_user_model().objects.get_or_create(email="mayor@igorville.gov") get_user_model().objects.get_or_create(email="mayor@igorville.gov")
user = User.objects.filter(email="mayor@igorville.gov").first()
add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id})) add_page = self.app.get(reverse("domain-users-add", kwargs={"pk": self.domain.id}))
session_id = self.app.cookies[settings.SESSION_COOKIE_NAME] session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
@ -762,10 +763,15 @@ class TestDomainManagers(TestDomainOverview):
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
mock_client = MockSESClient() success_result = add_page.form.submit()
with boto3_mocking.clients.handler_for("sesv2", mock_client):
with less_console_noise(): mock_send_domain_email.assert_called_once_with(
success_result = add_page.form.submit() email="mayor@igorville.gov",
requestor=self.user,
domains=self.domain,
is_member_of_different_org=None,
requested_user=user,
)
self.assertEqual(success_result.status_code, 302) self.assertEqual(success_result.status_code, 302)
self.assertEqual( self.assertEqual(
@ -974,13 +980,13 @@ class TestDomainManagers(TestDomainOverview):
success_page = success_result.follow() success_page = success_result.follow()
self.assertContains(success_page, "Failed to send email.") self.assertContains(success_page, "Failed to send email.")
@boto3_mocking.patching
@less_console_noise_decorator @less_console_noise_decorator
def test_domain_invitation_created(self): @patch("registrar.views.domain.send_domain_invitation_email")
def test_domain_invitation_created(self, mock_send_domain_email):
"""Add user on a nonexistent email creates an invitation. """Add user on a nonexistent email creates an invitation.
Adding a non-existent user sends an email as a side-effect, so mock Adding a non-existent user sends an email as a side-effect, so mock
out the boto3 SES email sending here. out send_domain_invitation_email here.
""" """
# make sure there is no user with this email # make sure there is no user with this email
email_address = "mayor@igorville.gov" email_address = "mayor@igorville.gov"
@ -993,10 +999,11 @@ class TestDomainManagers(TestDomainOverview):
add_page.form["email"] = email_address add_page.form["email"] = email_address
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
mock_client = MockSESClient() success_result = add_page.form.submit()
with boto3_mocking.clients.handler_for("sesv2", mock_client):
with less_console_noise(): mock_send_domain_email.assert_called_once_with(
success_result = add_page.form.submit() email="mayor@igorville.gov", requestor=self.user, domains=self.domain, is_member_of_different_org=None
)
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_page = success_result.follow() success_page = success_result.follow()
@ -1005,13 +1012,13 @@ class TestDomainManagers(TestDomainOverview):
self.assertContains(success_page, "Cancel") # link to cancel invitation self.assertContains(success_page, "Cancel") # link to cancel invitation
self.assertTrue(DomainInvitation.objects.filter(email=email_address).exists()) self.assertTrue(DomainInvitation.objects.filter(email=email_address).exists())
@boto3_mocking.patching
@less_console_noise_decorator @less_console_noise_decorator
def test_domain_invitation_created_for_caps_email(self): @patch("registrar.views.domain.send_domain_invitation_email")
def test_domain_invitation_created_for_caps_email(self, mock_send_domain_email):
"""Add user on a nonexistent email with CAPS creates an invitation to lowercase email. """Add user on a nonexistent email with CAPS creates an invitation to lowercase email.
Adding a non-existent user sends an email as a side-effect, so mock Adding a non-existent user sends an email as a side-effect, so mock
out the boto3 SES email sending here. out send_domain_invitation_email here.
""" """
# make sure there is no user with this email # make sure there is no user with this email
email_address = "mayor@igorville.gov" email_address = "mayor@igorville.gov"
@ -1025,9 +1032,11 @@ class TestDomainManagers(TestDomainOverview):
add_page.form["email"] = caps_email_address add_page.form["email"] = caps_email_address
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
mock_client = MockSESClient() success_result = add_page.form.submit()
with boto3_mocking.clients.handler_for("sesv2", mock_client):
success_result = add_page.form.submit() mock_send_domain_email.assert_called_once_with(
email="mayor@igorville.gov", requestor=self.user, domains=self.domain, is_member_of_different_org=None
)
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_page = success_result.follow() success_page = success_result.follow()

View file

@ -2879,7 +2879,7 @@ class TestRequestingEntity(WebTest):
form["portfolio_requesting_entity-requesting_entity_is_suborganization"] = True form["portfolio_requesting_entity-requesting_entity_is_suborganization"] = True
form["portfolio_requesting_entity-is_requesting_new_suborganization"] = True form["portfolio_requesting_entity-is_requesting_new_suborganization"] = True
form["portfolio_requesting_entity-sub_organization"] = "" form["portfolio_requesting_entity-sub_organization"] = "other"
form["portfolio_requesting_entity-requested_suborganization"] = "moon" form["portfolio_requesting_entity-requested_suborganization"] = "moon"
form["portfolio_requesting_entity-suborganization_city"] = "kepler" form["portfolio_requesting_entity-suborganization_city"] = "kepler"
@ -2942,18 +2942,34 @@ class TestRequestingEntity(WebTest):
session_id = self.app.cookies[settings.SESSION_COOKIE_NAME] session_id = self.app.cookies[settings.SESSION_COOKIE_NAME]
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
# For 2 the tests below, it is required to submit a form without submitting a value
# for the select/combobox. WebTest will not do this; by default, WebTest will submit
# the first choice in a select. So, need to manipulate the form to remove the
# particular select/combobox that will not be submitted, and then post the form.
form_action = f"/request/{domain_request.pk}/portfolio_requesting_entity/"
# Test missing suborganization selection # Test missing suborganization selection
form["portfolio_requesting_entity-requesting_entity_is_suborganization"] = True form["portfolio_requesting_entity-requesting_entity_is_suborganization"] = True
form["portfolio_requesting_entity-sub_organization"] = "" form["portfolio_requesting_entity-is_requesting_new_suborganization"] = False
# remove sub_organization from the form submission
response = form.submit() form_data = form.submit_fields()
form_data = [(key, value) for key, value in form_data if key != "portfolio_requesting_entity-sub_organization"]
response = self.app.post(form_action, dict(form_data))
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
self.assertContains(response, "Suborganization is required.", status_code=200) self.assertContains(response, "Suborganization is required.", status_code=200)
# Test missing custom suborganization details # Test missing custom suborganization details
form["portfolio_requesting_entity-requesting_entity_is_suborganization"] = True
form["portfolio_requesting_entity-is_requesting_new_suborganization"] = True form["portfolio_requesting_entity-is_requesting_new_suborganization"] = True
response = form.submit() form["portfolio_requesting_entity-sub_organization"] = "other"
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) # remove suborganization_state_territory from the form submission
form_data = form.submit_fields()
form_data = [
(key, value)
for key, value in form_data
if key != "portfolio_requesting_entity-suborganization_state_territory"
]
response = self.app.post(form_action, dict(form_data))
self.assertContains(response, "Enter the name of your suborganization.", status_code=200) self.assertContains(response, "Enter the name of your suborganization.", status_code=200)
self.assertContains(response, "Enter the city where your suborganization is located.", status_code=200) self.assertContains(response, "Enter the city where your suborganization is located.", status_code=200)
self.assertContains( self.assertContains(

View file

@ -1660,6 +1660,27 @@ class DomainRequestExport(BaseExport):
default=F("organization_name"), default=F("organization_name"),
output_field=CharField(), output_field=CharField(),
), ),
"converted_city": Case(
# When portfolio is present, use its value instead
When(portfolio__isnull=False, then=F("portfolio__city")),
# Otherwise, return the natively assigned value
default=F("city"),
output_field=CharField(),
),
"converted_state_territory": Case(
# When portfolio is present, use its value instead
When(portfolio__isnull=False, then=F("portfolio__state_territory")),
# Otherwise, return the natively assigned value
default=F("state_territory"),
output_field=CharField(),
),
"converted_suborganization_name": Case(
# When sub_organization is present, use its name
When(sub_organization__isnull=False, then=F("sub_organization__name")),
# Otherwise, return empty string
default=Value(""),
output_field=CharField(),
),
"converted_so_email": Case( "converted_so_email": Case(
# When portfolio is present, use its value instead # When portfolio is present, use its value instead
When(portfolio__isnull=False, then=F("portfolio__senior_official__email")), When(portfolio__isnull=False, then=F("portfolio__senior_official__email")),
@ -1786,6 +1807,10 @@ class DomainRequestExport(BaseExport):
status = model.get("status") status = model.get("status")
status_display = DomainRequest.DomainRequestStatus.get_status_label(status) if status else None status_display = DomainRequest.DomainRequestStatus.get_status_label(status) if status else None
# Handle the portfolio field. Display as a Yes/No
portfolio = model.get("portfolio")
portfolio_display = "Yes" if portfolio is not None else "No"
# Handle the region field. # Handle the region field.
state_territory = model.get("state_territory") state_territory = model.get("state_territory")
region = get_region(state_territory) if state_territory else None region = get_region(state_territory) if state_territory else None
@ -1819,6 +1844,7 @@ class DomainRequestExport(BaseExport):
"Election office": human_readable_election_board, "Election office": human_readable_election_board,
"Federal type": human_readable_federal_type, "Federal type": human_readable_federal_type,
"Domain type": human_readable_org_type, "Domain type": human_readable_org_type,
"Portfolio": portfolio_display,
"Request additional details": additional_details, "Request additional details": additional_details,
# Annotated fields - passed into the request dict. # Annotated fields - passed into the request dict.
"Creator approved domains count": model.get("creator_approved_domains_count", 0), "Creator approved domains count": model.get("creator_approved_domains_count", 0),
@ -1827,6 +1853,10 @@ class DomainRequestExport(BaseExport):
"Other contacts": model.get("all_other_contacts"), "Other contacts": model.get("all_other_contacts"),
"Current websites": model.get("all_current_websites"), "Current websites": model.get("all_current_websites"),
# Untouched FK fields - passed into the request dict. # Untouched FK fields - passed into the request dict.
"Suborganization": model.get("converted_suborganization_name"),
"Requested suborg": model.get("requested_suborganization"),
"Suborg city": model.get("suborganization_city"),
"Suborg state/territory": model.get("suborganization_state_territory"),
"Federal agency": model.get("converted_federal_agency"), "Federal agency": model.get("converted_federal_agency"),
"SO first name": model.get("converted_senior_official_first_name"), "SO first name": model.get("converted_senior_official_first_name"),
"SO last name": model.get("converted_senior_official_last_name"), "SO last name": model.get("converted_senior_official_last_name"),
@ -1838,8 +1868,8 @@ class DomainRequestExport(BaseExport):
"Investigator": model.get("investigator__email"), "Investigator": model.get("investigator__email"),
# Untouched fields # Untouched fields
"Organization name": model.get("converted_organization_name"), "Organization name": model.get("converted_organization_name"),
"City": model.get("city"), "City": model.get("converted_city"),
"State/territory": model.get("state_territory"), "State/territory": model.get("converted_state_territory"),
"Request purpose": model.get("purpose"), "Request purpose": model.get("purpose"),
"CISA regional representative": model.get("cisa_representative_email"), "CISA regional representative": model.get("cisa_representative_email"),
"Last submitted date": model.get("last_submitted_date"), "Last submitted date": model.get("last_submitted_date"),
@ -2006,6 +2036,7 @@ class DomainRequestDataFull(DomainRequestExport):
"Last status update", "Last status update",
"Status", "Status",
"Domain type", "Domain type",
"Portfolio",
"Federal type", "Federal type",
"Federal agency", "Federal agency",
"Organization name", "Organization name",
@ -2013,6 +2044,10 @@ class DomainRequestDataFull(DomainRequestExport):
"City", "City",
"State/territory", "State/territory",
"Region", "Region",
"Suborganization",
"Requested suborg",
"Suborg city",
"Suborg state/territory",
"Creator first name", "Creator first name",
"Creator last name", "Creator last name",
"Creator email", "Creator email",

View file

@ -0,0 +1,20 @@
from contextlib import contextmanager
from django.db import transaction, IntegrityError
from psycopg2 import errorcodes
@contextmanager
def ignore_unique_violation():
"""
Execute within an atomic transaction so that if a unique constraint violation occurs,
the individual transaction is rolled back without invalidating any larger transaction.
"""
with transaction.atomic():
try:
yield
except IntegrityError as e:
if e.__cause__.pgcode == errorcodes.UNIQUE_VIOLATION:
# roll back to the savepoint, effectively ignoring this transaction
pass
else:
raise e

View file

@ -1,6 +1,6 @@
from datetime import date
from django.conf import settings from django.conf import settings
from registrar.models import DomainInvitation from registrar.models import Domain, DomainInvitation, UserDomainRole
from registrar.models.domain import Domain
from registrar.utility.errors import ( from registrar.utility.errors import (
AlreadyDomainInvitedError, AlreadyDomainInvitedError,
AlreadyDomainManagerError, AlreadyDomainManagerError,
@ -41,8 +41,47 @@ def send_domain_invitation_email(
send_invitation_email(email, requestor_email, domains, requested_user) send_invitation_email(email, requestor_email, domains, requested_user)
# send emails to domain managers
for domain in domains:
send_emails_to_domain_managers(
email=email,
requestor_email=requestor_email,
domain=domain,
requested_user=requested_user,
)
def normalize_domains(domains):
def send_emails_to_domain_managers(email: str, requestor_email, domain: Domain, requested_user=None):
"""
Notifies all domain managers of the provided domain of a change
Raises:
EmailSendingError
"""
# Get each domain manager from list
user_domain_roles = UserDomainRole.objects.filter(domain=domain)
for user_domain_role in user_domain_roles:
# Send email to each domain manager
user = user_domain_role.user
try:
send_templated_email(
"emails/domain_manager_notification.txt",
"emails/domain_manager_notification_subject.txt",
to_address=user.email,
context={
"domain": domain,
"requestor_email": requestor_email,
"invited_email_address": email,
"domain_manager": user,
"date": date.today(),
},
)
except EmailSendingError as err:
raise EmailSendingError(
f"Could not send email manager notification to {user.email} for domain: {domain.name}"
) from err
def normalize_domains(domains: Domain | list[Domain]) -> list[Domain]:
"""Ensures domains is always a list.""" """Ensures domains is always a list."""
return [domains] if isinstance(domains, Domain) else domains return [domains] if isinstance(domains, Domain) else domains
@ -69,6 +108,8 @@ def validate_invitation(email, domains, requestor, is_member_of_different_org):
for domain in domains: for domain in domains:
validate_existing_invitation(email, domain) validate_existing_invitation(email, domain)
# NOTE: should we also be validating against existing user_domain_roles
def check_outside_org_membership(email, requestor, is_member_of_different_org): def check_outside_org_membership(email, requestor, is_member_of_different_org):
"""Raise an error if the email belongs to a different organization.""" """Raise an error if the email belongs to a different organization."""

View file

@ -1,19 +1,19 @@
import logging import logging
from django.db import transaction
from django.db.models import ForeignKey, OneToOneField, ManyToManyField, ManyToOneRel, ManyToManyRel, OneToOneRel
from django.shortcuts import render, get_object_or_404, redirect from django.shortcuts import render, get_object_or_404, redirect
from django.views import View from django.views import View
from registrar.models.domain import Domain from registrar.models.domain import Domain
from registrar.models.domain_information import DomainInformation
from registrar.models.domain_request import DomainRequest from registrar.models.domain_request import DomainRequest
from registrar.models.portfolio import Portfolio
from registrar.models.user import User from registrar.models.user import User
from django.contrib.admin import site from django.contrib.admin import site
from django.contrib import messages from django.contrib import messages
from registrar.models.user_domain_role import UserDomainRole from registrar.models.user_domain_role import UserDomainRole
from registrar.models.user_portfolio_permission import UserPortfolioPermission from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.verified_by_staff import VerifiedByStaff
from typing import Any, List from registrar.utility.db_helpers import ignore_unique_violation
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -21,22 +21,8 @@ logger = logging.getLogger(__name__)
class TransferUserView(View): class TransferUserView(View):
"""Transfer user methods that set up the transfer_user template and handle the forms on it.""" """Transfer user methods that set up the transfer_user template and handle the forms on it."""
JOINS = [
(DomainRequest, "creator"),
(DomainInformation, "creator"),
(Portfolio, "creator"),
(DomainRequest, "investigator"),
(UserDomainRole, "user"),
(VerifiedByStaff, "requestor"),
(UserPortfolioPermission, "user"),
]
# Future-proofing in case joined fields get added on the user model side
# This was tested in the first portfolio model iteration and works
USER_FIELDS: List[Any] = []
def get(self, request, user_id): def get(self, request, user_id):
"""current_user referes to the 'source' user where the button that redirects to this view was clicked. """current_user refers to the 'source' user where the button that redirects to this view was clicked.
other_users exclude current_user and populate a dropdown, selected_user is the selection in the dropdown. other_users exclude current_user and populate a dropdown, selected_user is the selection in the dropdown.
This also querries the relevant domains and domain requests, and the admin context needed for the sidenav.""" This also querries the relevant domains and domain requests, and the admin context needed for the sidenav."""
@ -70,86 +56,122 @@ class TransferUserView(View):
return render(request, "admin/transfer_user.html", context) return render(request, "admin/transfer_user.html", context)
def post(self, request, user_id): def post(self, request, user_id):
"""This handles the transfer from selected_user to current_user then deletes selected_user. """This handles the transfer from selected_user to current_user then deletes selected_user."""
NOTE: We have a ticket to refactor this into a more solid lookup for related fields in #2645"""
current_user = get_object_or_404(User, pk=user_id) current_user = get_object_or_404(User, pk=user_id)
selected_user_id = request.POST.get("selected_user") selected_user_id = request.POST.get("selected_user")
selected_user = get_object_or_404(User, pk=selected_user_id) selected_user = get_object_or_404(User, pk=selected_user_id)
try: try:
change_logs = [] # Make this atomic so that we don't get any partial transfers
with transaction.atomic():
change_logs = []
# Transfer specific fields # Dynamically handle related fields
self.transfer_user_fields_and_log(selected_user, current_user, change_logs) self.transfer_related_fields_and_log(selected_user, current_user, change_logs)
# Perform the updates and log the changes # Success message if any related objects were updated
for model_class, field_name in self.JOINS: if change_logs:
self.update_joins_and_log(model_class, field_name, selected_user, current_user, change_logs) success_message = f"Data transferred successfully for the following objects: {change_logs}"
messages.success(request, success_message)
# Success message if any related objects were updated
if change_logs:
success_message = f"Data transferred successfully for the following objects: {change_logs}"
messages.success(request, success_message)
selected_user.delete()
messages.success(request, f"Deleted {selected_user} {selected_user.username}")
selected_user.delete()
messages.success(request, f"Deleted {selected_user} {selected_user.username}")
except Exception as e: except Exception as e:
messages.error(request, f"An error occurred during the transfer: {e}") messages.error(request, f"An error occurred during the transfer: {e}")
logger.error(f"An error occurred during the transfer: {e}", exc_info=True)
return redirect("admin:registrar_user_change", object_id=user_id) return redirect("admin:registrar_user_change", object_id=user_id)
@classmethod def transfer_related_fields_and_log(self, selected_user, current_user, change_logs):
def update_joins_and_log(cls, model_class, field_name, selected_user, current_user, change_logs):
""" """
Helper function to update the user join fields for a given model and log the changes. Dynamically find all related fields to the User model and transfer them from selected_user to current_user.
Handles ForeignKey, OneToOneField, ManyToManyField, and ManyToOneRel relationships.
""" """
user_model = User
filter_kwargs = {field_name: selected_user} for related_field in user_model._meta.get_fields():
updated_objects = model_class.objects.filter(**filter_kwargs) if related_field.is_relation:
# Field objects represent forward relationships
if isinstance(related_field, OneToOneField):
self._handle_one_to_one(related_field, selected_user, current_user, change_logs)
elif isinstance(related_field, ManyToManyField):
self._handle_many_to_many(related_field, selected_user, current_user, change_logs)
elif isinstance(related_field, ForeignKey):
self._handle_foreign_key(related_field, selected_user, current_user, change_logs)
# Relationship objects represent reverse relationships
elif isinstance(related_field, ManyToOneRel):
# ManyToOneRel is a reverse ForeignKey
self._handle_foreign_key_reverse(related_field, selected_user, current_user, change_logs)
elif isinstance(related_field, OneToOneRel):
self._handle_one_to_one_reverse(related_field, selected_user, current_user, change_logs)
elif isinstance(related_field, ManyToManyRel):
self._handle_many_to_many_reverse(related_field, selected_user, current_user, change_logs)
else:
logger.error(f"Unknown relationship type for field {related_field}")
raise ValueError(f"Unknown relationship type for field {related_field}")
for obj in updated_objects: def _handle_foreign_key_reverse(self, related_field: ManyToOneRel, selected_user, current_user, change_logs):
# Check for duplicate UserDomainRole before updating # Handle reverse ForeignKey relationships
if model_class == UserDomainRole: related_manager = getattr(selected_user, related_field.get_accessor_name(), None)
if model_class.objects.filter(user=current_user, domain=obj.domain).exists(): if related_manager and related_manager.exists():
continue # Skip the update to avoid a duplicate for related_object in related_manager.all():
with ignore_unique_violation():
setattr(related_object, related_field.field.name, current_user)
related_object.save()
self.log_change(related_object, selected_user, current_user, related_field.field.name, change_logs)
if model_class == UserPortfolioPermission: def _handle_foreign_key(self, related_field: ForeignKey, selected_user, current_user, change_logs):
if model_class.objects.filter(user=current_user, portfolio=obj.portfolio).exists(): # Handle ForeignKey relationships
continue # Skip the update to avoid a duplicate related_object = getattr(selected_user, related_field.name, None)
if related_object:
setattr(current_user, related_field.name, related_object)
current_user.save()
self.log_change(related_object, selected_user, current_user, related_field.name, change_logs)
# Update the field on the object and save it def _handle_one_to_one(self, related_field: OneToOneField, selected_user, current_user, change_logs):
setattr(obj, field_name, current_user) # Handle OneToOne relationship
obj.save() related_object = getattr(selected_user, related_field.name, None)
if related_object:
with ignore_unique_violation():
setattr(current_user, related_field.name, related_object)
current_user.save()
self.log_change(related_object, selected_user, current_user, related_field.name, change_logs)
# Log the change def _handle_many_to_many(self, related_field: ManyToManyField, selected_user, current_user, change_logs):
cls.log_change(obj, field_name, selected_user, current_user, change_logs) # Handle ManyToMany relationship
related_name = related_field.remote_field.name
related_manager = getattr(selected_user, related_name, None)
if related_manager and related_manager.exists():
for instance in related_manager.all():
with ignore_unique_violation():
getattr(instance, related_name).remove(selected_user)
getattr(instance, related_name).add(current_user)
self.log_change(instance, selected_user, current_user, related_name, change_logs)
def _handle_many_to_many_reverse(self, related_field: ManyToManyRel, selected_user, current_user, change_logs):
# Handle reverse relationship
related_name = related_field.field.name
related_manager = getattr(selected_user, related_name, None)
if related_manager and related_manager.exists():
for instance in related_manager.all():
with ignore_unique_violation():
getattr(instance, related_name).remove(selected_user)
getattr(instance, related_name).add(current_user)
self.log_change(instance, selected_user, current_user, related_name, change_logs)
def _handle_one_to_one_reverse(self, related_field: OneToOneRel, selected_user, current_user, change_logs):
# Handle reverse relationship
field_name = related_field.get_accessor_name()
related_instance = getattr(selected_user, field_name, None)
if related_instance:
setattr(related_instance, field_name, current_user)
related_instance.save()
self.log_change(related_instance, selected_user, current_user, field_name, change_logs)
@classmethod @classmethod
def transfer_user_fields_and_log(cls, selected_user, current_user, change_logs): def log_change(cls, obj, selected_user, current_user, field_name, change_logs):
""" log_entry = f"Changed {field_name} from {selected_user} to {current_user} on {obj}"
Transfers portfolio fields from the selected_user to the current_user.
Logs the changes for each transferred field.
"""
for field in cls.USER_FIELDS:
field_value = getattr(selected_user, field, None)
if field_value:
setattr(current_user, field, field_value)
cls.log_change(current_user, field, field_value, field_value, change_logs)
current_user.save()
@classmethod
def log_change(cls, obj, field_name, field_value, new_value, change_logs):
"""Logs the change for a specific field on an object"""
log_entry = f'Changed {field_name} from "{field_value}" to "{new_value}" on {obj}'
logger.info(log_entry) logger.info(log_entry)
# Collect the related object for the success message
change_logs.append(log_entry) change_logs.append(log_entry)
@classmethod @classmethod