fix handleRequestingEntityFieldset jas and re-merge main

This commit is contained in:
Rachid Mrad 2024-11-26 16:14:04 -05:00
commit 772c29bb5f
No known key found for this signature in database
15 changed files with 885 additions and 160 deletions

50
.github/workflows/load-fixtures.yaml vendored Normal file
View file

@ -0,0 +1,50 @@
# Manually load fixtures to an environment of choice.
name: Load fixtures
run-name: Manually load fixtures to sandbox of choice
on:
workflow_dispatch:
inputs:
environment:
description: Which environment should we load data for?
type: 'choice'
options:
- ab
- backup
- el
- cb
- dk
- es
- gd
- ko
- ky
- nl
- rb
- rh
- rjm
- meoward
- bob
- hotgov
- litterbox
- ms
- ad
- ag
jobs:
load-fixtures:
runs-on: ubuntu-latest
env:
CF_USERNAME: CF_${{ github.event.inputs.environment }}_USERNAME
CF_PASSWORD: CF_${{ github.event.inputs.environment }}_PASSWORD
steps:
- uses: GitHubSecurityLab/actions-permissions/monitor@v1
- name: Load fake data for ${{ github.event.inputs.environment }}
uses: cloud-gov/cg-cli-tools@main
with:
cf_username: ${{ secrets[env.CF_USERNAME] }}
cf_password: ${{ secrets[env.CF_PASSWORD] }}
cf_org: cisa-dotgov
cf_space: ${{ github.event.inputs.environment }}
cf_command: "run-task getgov-${{ github.event.inputs.environment }} --command 'python manage.py load' --name loaddata"

View file

@ -6,45 +6,49 @@ import { hideElement, showElement } from './helpers.js';
*/ */
export function handleRequestingEntityFieldset() { export function handleRequestingEntityFieldset() {
// Sadly, these ugly ids are the auto generated with this prefix // Sadly, these ugly ids are the auto generated with this prefix
const formPrefix = "portfolio_requesting_entity" const formPrefix = "portfolio_requesting_entity";
const radioFieldset = document.getElementById(`id_${formPrefix}-requesting_entity_is_suborganization__fieldset`); if (formPrefix) {
const radios = radioFieldset?.querySelectorAll(`input[name="${formPrefix}-requesting_entity_is_suborganization"]`); const radioFieldset = document.getElementById(`id_${formPrefix}-requesting_entity_is_suborganization__fieldset`);
const select = document.getElementById(`id_${formPrefix}-sub_organization`); const radios = radioFieldset?.querySelectorAll(`input[name="${formPrefix}-requesting_entity_is_suborganization"]`);
const selectParent = select?.parentElement; const select = document.getElementById(`id_${formPrefix}-sub_organization`);
const suborgContainer = document.getElementById("suborganization-container"); const selectParent = select?.parentElement;
const suborgDetailsContainer = document.getElementById("suborganization-container__details"); const suborgContainer = document.getElementById("suborganization-container");
const subOrgCreateNewOption = document.getElementById("option-to-add-suborg").value const suborgDetailsContainer = document.getElementById("suborganization-container__details");
// Make sure all crucial page elements exist before proceeding. let subOrgCreateNewOption;
// This more or less ensures that we are on the Requesting Entity page, and not elsewhere. if (subOrgCreateNewOption)
if (!radios || !select || !selectParent || !suborgContainer || !suborgDetailsContainer) return; subOrgCreateNewOption = document.getElementById("option-to-add-suborg").value;
// Make sure all crucial page elements exist before proceeding.
// requestingSuborganization: This just broadly determines if they're requesting a suborg at all // This more or less ensures that we are on the Requesting Entity page, and not elsewhere.
// requestingNewSuborganization: This variable determines if the user is trying to *create* a new suborganization or not. if (!radios || !select || !selectParent || !suborgContainer || !suborgDetailsContainer) return;
var requestingSuborganization = Array.from(radios).find(radio => radio.checked)?.value === "True";
var requestingNewSuborganization = document.getElementById(`id_${formPrefix}-is_requesting_new_suborganization`); // requestingSuborganization: This just broadly determines if they're requesting a suborg at all
// requestingNewSuborganization: This variable determines if the user is trying to *create* a new suborganization or not.
function toggleSuborganization(radio=null) { var requestingSuborganization = Array.from(radios).find(radio => radio.checked)?.value === "True";
if (radio != null) requestingSuborganization = radio?.checked && radio.value === "True"; var requestingNewSuborganization = document.getElementById(`id_${formPrefix}-is_requesting_new_suborganization`);
requestingSuborganization ? showElement(suborgContainer) : hideElement(suborgContainer);
requestingNewSuborganization.value = requestingSuborganization && select.value === "other" ? "True" : "False"; function toggleSuborganization(radio=null) {
requestingNewSuborganization.value === "True" ? showElement(suborgDetailsContainer) : hideElement(suborgDetailsContainer); if (radio != null) requestingSuborganization = radio?.checked && radio.value === "True";
requestingSuborganization ? showElement(suborgContainer) : hideElement(suborgContainer);
requestingNewSuborganization.value = requestingSuborganization && select.value === "other" ? "True" : "False";
requestingNewSuborganization.value === "True" ? showElement(suborgDetailsContainer) : hideElement(suborgDetailsContainer);
}
// Add fake "other" option to sub_organization select
if (select && !Array.from(select.options).some(option => option.value === "other")) {
select.add(new Option(subOrgCreateNewOption, "other"));
}
if (requestingNewSuborganization.value === "True") {
select.value = "other";
}
// Add event listener to is_suborganization radio buttons, and run for initial display
toggleSuborganization();
radios.forEach(radio => {
radio.addEventListener("click", () => toggleSuborganization(radio));
});
// Add event listener to the suborg dropdown to show/hide the suborg details section
select.addEventListener("change", () => toggleSuborganization());
} }
// Add fake "other" option to sub_organization select
if (select && !Array.from(select.options).some(option => option.value === "other")) {
select.add(new Option(subOrgCreateNewOption, "other"));
}
if (requestingNewSuborganization.value === "True") {
select.value = "other";
}
// Add event listener to is_suborganization radio buttons, and run for initial display
toggleSuborganization();
radios.forEach(radio => {
radio.addEventListener("click", () => toggleSuborganization(radio));
});
// Add event listener to the suborg dropdown to show/hide the suborg details section
select.addEventListener("change", () => toggleSuborganization());
} }

View file

@ -21,6 +21,7 @@ from registrar.views.report_views import (
ExportDomainRequestDataFull, ExportDomainRequestDataFull,
ExportDataTypeUser, ExportDataTypeUser,
ExportDataTypeRequests, ExportDataTypeRequests,
ExportMembersPortfolio,
) )
# --jsons # --jsons
@ -239,6 +240,11 @@ urlpatterns = [
name="get-rejection-email-for-user-json", name="get-rejection-email-for-user-json",
), ),
path("admin/", admin.site.urls), path("admin/", admin.site.urls),
path(
"reports/export_members_portfolio/",
ExportMembersPortfolio.as_view(),
name="export_members_portfolio",
),
path( path(
"reports/export_data_type_user/", "reports/export_data_type_user/",
ExportDataTypeUser.as_view(), ExportDataTypeUser.as_view(),

View file

@ -6,8 +6,10 @@ from faker import Faker
from django.db import transaction from django.db import transaction
from registrar.fixtures.fixtures_portfolios import PortfolioFixture from registrar.fixtures.fixtures_portfolios import PortfolioFixture
from registrar.fixtures.fixtures_suborganizations import SuborganizationFixture
from registrar.fixtures.fixtures_users import UserFixture from registrar.fixtures.fixtures_users import UserFixture
from registrar.models import User, DomainRequest, DraftDomain, Contact, Website, FederalAgency from registrar.models import User, DomainRequest, DraftDomain, Contact, Website, FederalAgency
from registrar.models.domain import Domain
from registrar.models.portfolio import Portfolio from registrar.models.portfolio import Portfolio
from registrar.models.suborganization import Suborganization from registrar.models.suborganization import Suborganization
@ -101,8 +103,13 @@ class DomainRequestFixture:
} }
@classmethod @classmethod
def fake_dot_gov(cls): def fake_dot_gov(cls, max_attempts=100):
return f"{fake.slug()}.gov" """Generate a unique .gov domain name without using an infinite loop."""
for _ in range(max_attempts):
fake_name = f"{fake.slug()}.gov"
if not Domain.objects.filter(name=fake_name).exists():
return DraftDomain.objects.create(name=fake_name)
raise RuntimeError(f"Failed to generate a unique .gov domain after {max_attempts} attempts")
@classmethod @classmethod
def fake_expiration_date(cls): def fake_expiration_date(cls):
@ -189,7 +196,9 @@ class DomainRequestFixture:
if not request.requested_domain: if not request.requested_domain:
if "requested_domain" in request_dict and request_dict["requested_domain"] is not None: if "requested_domain" in request_dict and request_dict["requested_domain"] is not None:
return DraftDomain.objects.get_or_create(name=request_dict["requested_domain"])[0] return DraftDomain.objects.get_or_create(name=request_dict["requested_domain"])[0]
return DraftDomain.objects.create(name=cls.fake_dot_gov())
# Generate a unique fake domain
return cls.fake_dot_gov()
return request.requested_domain return request.requested_domain
@classmethod @classmethod
@ -213,7 +222,7 @@ class DomainRequestFixture:
if not request.sub_organization: if not request.sub_organization:
if "sub_organization" in request_dict and request_dict["sub_organization"] is not None: if "sub_organization" in request_dict and request_dict["sub_organization"] is not None:
return Suborganization.objects.get_or_create(name=request_dict["sub_organization"])[0] return Suborganization.objects.get_or_create(name=request_dict["sub_organization"])[0]
return cls._get_random_sub_organization() return cls._get_random_sub_organization(request)
return request.sub_organization return request.sub_organization
@classmethod @classmethod
@ -228,10 +237,19 @@ class DomainRequestFixture:
return None return None
@classmethod @classmethod
def _get_random_sub_organization(cls): def _get_random_sub_organization(cls, request):
try: try:
suborg_options = [Suborganization.objects.first(), Suborganization.objects.last()] # Filter Suborganizations by the request's portfolio
return random.choice(suborg_options) # nosec portfolio_suborganizations = Suborganization.objects.filter(portfolio=request.portfolio)
# Select a suborg that's defined in the fixtures
suborganization_names = [suborg["name"] for suborg in SuborganizationFixture.SUBORGS]
# Further filter by names in suborganization_names
suborganization_options = portfolio_suborganizations.filter(name__in=suborganization_names)
# Randomly choose one if any exist
return random.choice(suborganization_options) if suborganization_options.exists() else None # nosec
except Exception as e: except Exception as e:
logger.warning(f"Expected fixture sub_organization, did not find it: {e}") logger.warning(f"Expected fixture sub_organization, did not find it: {e}")
return None return None
@ -273,6 +291,9 @@ class DomainRequestFixture:
# Lumped under .atomic to ensure we don't make redundant DB calls. # Lumped under .atomic to ensure we don't make redundant DB calls.
# This bundles them all together, and then saves it in a single call. # This bundles them all together, and then saves it in a single call.
# The atomic block will cause the code to stop executing if one instance in the
# nested iteration fails, which will cause an early exit and make it hard to debug.
# Comment out with transaction.atomic() when debugging.
with transaction.atomic(): with transaction.atomic():
try: try:
# Get the usernames of users created in the UserFixture # Get the usernames of users created in the UserFixture

View file

@ -267,54 +267,24 @@ class UserFixture:
"""Loads the users into the database and assigns them to the specified group.""" """Loads the users into the database and assigns them to the specified group."""
logger.info(f"Going to load {len(users)} users for group {group_name}") logger.info(f"Going to load {len(users)} users for group {group_name}")
# Step 1: Fetch the group
group = UserGroup.objects.get(name=group_name) group = UserGroup.objects.get(name=group_name)
# Prepare sets of existing usernames and IDs in one query # Step 2: Identify new and existing users
user_identifiers = [(user.get("username"), user.get("id")) for user in users] existing_usernames, existing_user_ids = cls._get_existing_users(users)
existing_users = User.objects.filter( new_users = cls._prepare_new_users(users, existing_usernames, existing_user_ids, are_superusers)
username__in=[user[0] for user in user_identifiers] + [user[1] for user in user_identifiers]
).values_list("username", "id")
existing_usernames = set(user[0] for user in existing_users) # Step 3: Create new users
existing_user_ids = set(user[1] for user in existing_users) cls._create_new_users(new_users)
# Filter out users with existing IDs or usernames
new_users = [
User(
id=user_data.get("id"),
first_name=user_data.get("first_name"),
last_name=user_data.get("last_name"),
username=user_data.get("username"),
email=user_data.get("email", ""),
title=user_data.get("title", "Peon"),
phone=user_data.get("phone", "2022222222"),
is_active=user_data.get("is_active", True),
is_staff=True,
is_superuser=are_superusers,
)
for user_data in users
if user_data.get("username") not in existing_usernames and user_data.get("id") not in existing_user_ids
]
# Perform bulk creation for new users
if new_users:
try:
User.objects.bulk_create(new_users)
logger.info(f"Created {len(new_users)} new users.")
except Exception as e:
logger.error(f"Unexpected error during user bulk creation: {e}")
else:
logger.info("No new users to create.")
# Step 4: Update existing users
# Get all users to be updated (both new and existing) # Get all users to be updated (both new and existing)
created_or_existing_users = User.objects.filter(username__in=[user.get("username") for user in users]) created_or_existing_users = User.objects.filter(username__in=[user.get("username") for user in users])
users_to_update = cls._get_users_to_update(created_or_existing_users)
cls._update_existing_users(users_to_update)
# Filter out users who are already in the group # Step 5: Assign users to the group
users_not_in_group = created_or_existing_users.exclude(groups__id=group.id) cls._assign_users_to_group(group, created_or_existing_users)
# Add only users who are not already in the group
if users_not_in_group.exists():
group.user_set.add(*users_not_in_group)
logger.info(f"Users loaded for group {group_name}.") logger.info(f"Users loaded for group {group_name}.")
@ -346,6 +316,76 @@ class UserFixture:
else: else:
logger.info("No allowed emails to load") logger.info("No allowed emails to load")
@staticmethod
def _get_existing_users(users):
user_identifiers = [(user.get("username"), user.get("id")) for user in users]
existing_users = User.objects.filter(
username__in=[user[0] for user in user_identifiers] + [user[1] for user in user_identifiers]
).values_list("username", "id")
existing_usernames = set(user[0] for user in existing_users)
existing_user_ids = set(user[1] for user in existing_users)
return existing_usernames, existing_user_ids
@staticmethod
def _prepare_new_users(users, existing_usernames, existing_user_ids, are_superusers):
return [
User(
id=user_data.get("id"),
first_name=user_data.get("first_name"),
last_name=user_data.get("last_name"),
username=user_data.get("username"),
email=user_data.get("email", ""),
title=user_data.get("title", "Peon"),
phone=user_data.get("phone", "2022222222"),
is_active=user_data.get("is_active", True),
is_staff=True,
is_superuser=are_superusers,
)
for user_data in users
if user_data.get("username") not in existing_usernames and user_data.get("id") not in existing_user_ids
]
@staticmethod
def _create_new_users(new_users):
if new_users:
try:
User.objects.bulk_create(new_users)
logger.info(f"Created {len(new_users)} new users.")
except Exception as e:
logger.error(f"Unexpected error during user bulk creation: {e}")
else:
logger.info("No new users to create.")
@staticmethod
def _get_users_to_update(users):
users_to_update = []
for user in users:
updated = False
if not user.title:
user.title = "Peon"
updated = True
if not user.phone:
user.phone = "2022222222"
updated = True
if not user.is_staff:
user.is_staff = True
updated = True
if updated:
users_to_update.append(user)
return users_to_update
@staticmethod
def _update_existing_users(users_to_update):
if users_to_update:
User.objects.bulk_update(users_to_update, ["is_staff", "title", "phone"])
logger.info(f"Updated {len(users_to_update)} existing users.")
@staticmethod
def _assign_users_to_group(group, users):
users_not_in_group = users.exclude(groups__id=group.id)
if users_not_in_group.exists():
group.user_set.add(*users_not_in_group)
@classmethod @classmethod
def load(cls): def load(cls):
with transaction.atomic(): with transaction.atomic():

View file

@ -2,7 +2,7 @@ from django.db import models
from django.forms import ValidationError from django.forms import ValidationError
from registrar.models.user_domain_role import UserDomainRole from registrar.models.user_domain_role import UserDomainRole
from registrar.utility.waffle import flag_is_active_for_user from registrar.utility.waffle import flag_is_active_for_user
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices, DomainRequestPermissionDisplay, MemberPermissionDisplay
from .utility.time_stamped_model import TimeStampedModel from .utility.time_stamped_model import TimeStampedModel
from django.contrib.postgres.fields import ArrayField from django.contrib.postgres.fields import ArrayField
@ -106,6 +106,37 @@ class UserPortfolioPermission(TimeStampedModel):
portfolio_permissions.update(additional_permissions) portfolio_permissions.update(additional_permissions)
return list(portfolio_permissions) return list(portfolio_permissions)
@classmethod
def get_domain_request_permission_display(cls, roles, additional_permissions):
"""Class method to return a readable string for domain request permissions"""
# Tracks if they can view, create requests, or not do anything
all_permissions = UserPortfolioPermission.get_portfolio_permissions(roles, additional_permissions)
all_domain_perms = [
UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS,
UserPortfolioPermissionChoices.EDIT_REQUESTS,
]
if all(perm in all_permissions for perm in all_domain_perms):
return DomainRequestPermissionDisplay.VIEWER_REQUESTER
elif UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS in all_permissions:
return DomainRequestPermissionDisplay.VIEWER
else:
return DomainRequestPermissionDisplay.NONE
@classmethod
def get_member_permission_display(cls, roles, additional_permissions):
"""Class method to return a readable string for member permissions"""
# Tracks if they can view, create requests, or not do anything.
# This is different than get_domain_request_permission_display because member tracks
# permissions slightly differently.
all_permissions = UserPortfolioPermission.get_portfolio_permissions(roles, additional_permissions)
if UserPortfolioPermissionChoices.EDIT_MEMBERS in all_permissions:
return MemberPermissionDisplay.MANAGER
elif UserPortfolioPermissionChoices.VIEW_MEMBERS in all_permissions:
return MemberPermissionDisplay.VIEWER
else:
return MemberPermissionDisplay.NONE
def clean(self): def clean(self):
"""Extends clean method to perform additional validation, which can raise errors in django admin.""" """Extends clean method to perform additional validation, which can raise errors in django admin."""
super().clean() super().clean()

View file

@ -0,0 +1,8 @@
from django.db.models.expressions import Func
class ArrayRemoveNull(Func):
"""Custom Func to use array_remove to remove null values"""
function = "array_remove"
template = "%(function)s(%(expressions)s, NULL)"

View file

@ -1,3 +1,4 @@
from registrar.utility import StrEnum
from django.db import models from django.db import models
@ -40,3 +41,29 @@ class UserPortfolioPermissionChoices(models.TextChoices):
@classmethod @classmethod
def to_dict(cls): def to_dict(cls):
return {key: value.value for key, value in cls.__members__.items()} return {key: value.value for key, value in cls.__members__.items()}
class DomainRequestPermissionDisplay(StrEnum):
"""Stores display values for domain request permission combinations.
Overview of values:
- VIEWER_REQUESTER: "Viewer Requester"
- VIEWER: "Viewer"
- NONE: "None"
"""
VIEWER_REQUESTER = "Viewer Requester"
VIEWER = "Viewer"
NONE = "None"
class MemberPermissionDisplay(StrEnum):
"""Stores display values for member permission combinations.
Overview of values:
- MANAGER: "Manager"
- VIEWER: "Viewer"
- NONE: "None"
"""
MANAGER = "Manager"
VIEWER = "Viewer"
NONE = "None"

View file

@ -8,7 +8,7 @@
<section class="section-outlined members margin-top-0 section-outlined--border-base-light" id="members"> <section class="section-outlined members margin-top-0 section-outlined--border-base-light" id="members">
<div class="section-outlined__header margin-bottom-3 grid-row"> <div class="section-outlined__header margin-bottom-3 grid-row">
<!-- ---------- SEARCH ---------- --> <!-- ---------- SEARCH ---------- -->
<div class="section-outlined__search mobile:grid-col-12 desktop:grid-col-6"> <div class="section-outlined__search mobile:grid-col-12 desktop:grid-col-6 {% if is_widescreen_mode %} section-outlined__search--widescreen {% endif %}">
<section aria-label="Members search component" class="margin-top-2"> <section aria-label="Members search component" class="margin-top-2">
<form class="usa-search usa-search--small" method="POST" role="search"> <form class="usa-search usa-search--small" method="POST" role="search">
{% csrf_token %} {% csrf_token %}
@ -36,6 +36,15 @@
</form> </form>
</section> </section>
</div> </div>
<div class="section-outlined__utility-button mobile-lg:padding-right-105 {% if portfolio %} mobile:grid-col-12 desktop:grid-col-6 desktop:padding-left-3{% endif %}">
<section aria-label="Domains report component" class="margin-top-205">
<a href="{% url 'export_members_portfolio' %}" class="usa-button usa-button--unstyled usa-button--with-icon usa-button--justify-right" role="button">
<svg class="usa-icon usa-icon--big" aria-hidden="true" focusable="false" role="img" width="24" height="24">
<use xlink:href="{%static 'img/sprite.svg'%}#file_download"></use>
</svg>Export as CSV
</a>
</section>
</div>
</div> </div>
<!-- ---------- MAIN TABLE ---------- --> <!-- ---------- MAIN TABLE ---------- -->

View file

@ -1,6 +1,5 @@
import os import os
import logging import logging
from contextlib import contextmanager from contextlib import contextmanager
import random import random
from string import ascii_uppercase from string import ascii_uppercase
@ -29,6 +28,7 @@ from registrar.models import (
FederalAgency, FederalAgency,
UserPortfolioPermission, UserPortfolioPermission,
Portfolio, Portfolio,
PortfolioInvitation,
) )
from epplibwrapper import ( from epplibwrapper import (
commands, commands,
@ -39,6 +39,7 @@ from epplibwrapper import (
ErrorCode, ErrorCode,
responses, responses,
) )
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from registrar.models.user_domain_role import UserDomainRole from registrar.models.user_domain_role import UserDomainRole
from registrar.models.utility.contact_error import ContactError, ContactErrorCodes from registrar.models.utility.contact_error import ContactError, ContactErrorCodes
@ -196,6 +197,7 @@ class GenericTestHelper(TestCase):
self.assertEqual(expected_sort_order, returned_sort_order) self.assertEqual(expected_sort_order, returned_sort_order)
@classmethod
def _mock_user_request_for_factory(self, request): def _mock_user_request_for_factory(self, request):
"""Adds sessionmiddleware when using factory to associate session information""" """Adds sessionmiddleware when using factory to associate session information"""
middleware = SessionMiddleware(lambda req: req) middleware = SessionMiddleware(lambda req: req)
@ -531,6 +533,8 @@ class MockDb(TestCase):
@classmethod @classmethod
@less_console_noise_decorator @less_console_noise_decorator
def sharedSetUp(cls): def sharedSetUp(cls):
cls.mock_client_class = MagicMock()
cls.mock_client = cls.mock_client_class.return_value
username = "test_user" username = "test_user"
first_name = "First" first_name = "First"
last_name = "Last" last_name = "Last"
@ -540,6 +544,29 @@ class MockDb(TestCase):
cls.user = get_user_model().objects.create( cls.user = get_user_model().objects.create(
username=username, first_name=first_name, last_name=last_name, email=email, title=title, phone=phone username=username, first_name=first_name, last_name=last_name, email=email, title=title, phone=phone
) )
cls.meoward_user = get_user_model().objects.create(
username="meoward_username", first_name="first_meoward", last_name="last_meoward", email="meoward@rocks.com"
)
cls.lebowski_user = get_user_model().objects.create(
username="big_lebowski", first_name="big", last_name="lebowski", email="big_lebowski@dude.co"
)
cls.tired_user = get_user_model().objects.create(
username="ministry_of_bedtime", first_name="tired", last_name="sleepy", email="tired_sleepy@igorville.gov"
)
# Custom superuser and staff so that these do not conflict with what may be defined on what implements this.
cls.custom_superuser = create_superuser(
username="cold_superuser", first_name="cold", last_name="icy", email="icy_superuser@igorville.gov"
)
cls.custom_staffuser = create_user(
username="warm_staff", first_name="warm", last_name="cozy", email="cozy_staffuser@igorville.gov"
)
cls.federal_agency_1, _ = FederalAgency.objects.get_or_create(agency="World War I Centennial Commission")
cls.federal_agency_2, _ = FederalAgency.objects.get_or_create(agency="Armed Forces Retirement Home")
cls.portfolio_1, _ = Portfolio.objects.get_or_create(
creator=cls.custom_superuser, federal_agency=cls.federal_agency_1
)
current_date = get_time_aware_date(datetime(2024, 4, 2)) current_date = get_time_aware_date(datetime(2024, 4, 2))
# Create start and end dates using timedelta # Create start and end dates using timedelta
@ -547,9 +574,6 @@ class MockDb(TestCase):
cls.end_date = current_date + timedelta(days=2) cls.end_date = current_date + timedelta(days=2)
cls.start_date = current_date - timedelta(days=2) cls.start_date = current_date - timedelta(days=2)
cls.federal_agency_1, _ = FederalAgency.objects.get_or_create(agency="World War I Centennial Commission")
cls.federal_agency_2, _ = FederalAgency.objects.get_or_create(agency="Armed Forces Retirement Home")
cls.domain_1, _ = Domain.objects.get_or_create( cls.domain_1, _ = Domain.objects.get_or_create(
name="cdomain1.gov", state=Domain.State.READY, first_ready=get_time_aware_date(datetime(2024, 4, 2)) name="cdomain1.gov", state=Domain.State.READY, first_ready=get_time_aware_date(datetime(2024, 4, 2))
) )
@ -596,9 +620,14 @@ class MockDb(TestCase):
federal_agency=cls.federal_agency_1, federal_agency=cls.federal_agency_1,
federal_type="executive", federal_type="executive",
is_election_board=False, is_election_board=False,
portfolio=cls.portfolio_1,
) )
cls.domain_information_2, _ = DomainInformation.objects.get_or_create( cls.domain_information_2, _ = DomainInformation.objects.get_or_create(
creator=cls.user, domain=cls.domain_2, generic_org_type="interstate", is_election_board=True creator=cls.user,
domain=cls.domain_2,
generic_org_type="interstate",
is_election_board=True,
portfolio=cls.portfolio_1,
) )
cls.domain_information_3, _ = DomainInformation.objects.get_or_create( cls.domain_information_3, _ = DomainInformation.objects.get_or_create(
creator=cls.user, creator=cls.user,
@ -671,14 +700,6 @@ class MockDb(TestCase):
is_election_board=False, is_election_board=False,
) )
cls.meoward_user = get_user_model().objects.create(
username="meoward_username", first_name="first_meoward", last_name="last_meoward", email="meoward@rocks.com"
)
cls.lebowski_user = get_user_model().objects.create(
username="big_lebowski", first_name="big", last_name="lebowski", email="big_lebowski@dude.co"
)
_, created = UserDomainRole.objects.get_or_create( _, created = UserDomainRole.objects.get_or_create(
user=cls.meoward_user, domain=cls.domain_1, role=UserDomainRole.Roles.MANAGER user=cls.meoward_user, domain=cls.domain_1, role=UserDomainRole.Roles.MANAGER
) )
@ -709,6 +730,12 @@ class MockDb(TestCase):
status=DomainInvitation.DomainInvitationStatus.RETRIEVED, status=DomainInvitation.DomainInvitationStatus.RETRIEVED,
) )
_, created = DomainInvitation.objects.get_or_create(
email=cls.meoward_user.email,
domain=cls.domain_11,
status=DomainInvitation.DomainInvitationStatus.RETRIEVED,
)
_, created = DomainInvitation.objects.get_or_create( _, created = DomainInvitation.objects.get_or_create(
email="woofwardthethird@rocks.com", email="woofwardthethird@rocks.com",
domain=cls.domain_1, domain=cls.domain_1,
@ -723,6 +750,85 @@ class MockDb(TestCase):
email="squeaker@rocks.com", domain=cls.domain_10, status=DomainInvitation.DomainInvitationStatus.INVITED email="squeaker@rocks.com", domain=cls.domain_10, status=DomainInvitation.DomainInvitationStatus.INVITED
) )
cls.portfolio_invitation_1, _ = PortfolioInvitation.objects.get_or_create(
email=cls.meoward_user.email,
portfolio=cls.portfolio_1,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
additional_permissions=[UserPortfolioPermissionChoices.EDIT_MEMBERS],
)
cls.portfolio_invitation_2, _ = PortfolioInvitation.objects.get_or_create(
email=cls.lebowski_user.email,
portfolio=cls.portfolio_1,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
additional_permissions=[UserPortfolioPermissionChoices.VIEW_MEMBERS],
)
cls.portfolio_invitation_3, _ = PortfolioInvitation.objects.get_or_create(
email=cls.tired_user.email,
portfolio=cls.portfolio_1,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
additional_permissions=[UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS],
)
cls.portfolio_invitation_4, _ = PortfolioInvitation.objects.get_or_create(
email=cls.custom_superuser.email,
portfolio=cls.portfolio_1,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_MEMBERS,
UserPortfolioPermissionChoices.EDIT_MEMBERS,
UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS,
UserPortfolioPermissionChoices.EDIT_REQUESTS,
],
)
cls.portfolio_invitation_5, _ = PortfolioInvitation.objects.get_or_create(
email=cls.custom_staffuser.email,
portfolio=cls.portfolio_1,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
)
# Add some invitations that we never retireve
PortfolioInvitation.objects.get_or_create(
email="nonexistentmember_1@igorville.gov",
portfolio=cls.portfolio_1,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
additional_permissions=[UserPortfolioPermissionChoices.EDIT_MEMBERS],
)
PortfolioInvitation.objects.get_or_create(
email="nonexistentmember_2@igorville.gov",
portfolio=cls.portfolio_1,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
additional_permissions=[UserPortfolioPermissionChoices.VIEW_MEMBERS],
)
PortfolioInvitation.objects.get_or_create(
email="nonexistentmember_3@igorville.gov",
portfolio=cls.portfolio_1,
roles=[UserPortfolioRoleChoices.ORGANIZATION_MEMBER],
additional_permissions=[UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS],
)
PortfolioInvitation.objects.get_or_create(
email="nonexistentmember_4@igorville.gov",
portfolio=cls.portfolio_1,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
additional_permissions=[
UserPortfolioPermissionChoices.VIEW_MEMBERS,
UserPortfolioPermissionChoices.EDIT_MEMBERS,
UserPortfolioPermissionChoices.VIEW_ALL_REQUESTS,
UserPortfolioPermissionChoices.EDIT_REQUESTS,
],
)
PortfolioInvitation.objects.get_or_create(
email="nonexistentmember_5@igorville.gov",
portfolio=cls.portfolio_1,
roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN],
)
with less_console_noise(): with less_console_noise():
cls.domain_request_1 = completed_domain_request( cls.domain_request_1 = completed_domain_request(
status=DomainRequest.DomainRequestStatus.STARTED, status=DomainRequest.DomainRequestStatus.STARTED,
@ -731,10 +837,12 @@ class MockDb(TestCase):
cls.domain_request_2 = completed_domain_request( cls.domain_request_2 = completed_domain_request(
status=DomainRequest.DomainRequestStatus.IN_REVIEW, status=DomainRequest.DomainRequestStatus.IN_REVIEW,
name="city2.gov", name="city2.gov",
portfolio=cls.portfolio_1,
) )
cls.domain_request_3 = completed_domain_request( cls.domain_request_3 = completed_domain_request(
status=DomainRequest.DomainRequestStatus.STARTED, status=DomainRequest.DomainRequestStatus.STARTED,
name="city3.gov", name="city3.gov",
portfolio=cls.portfolio_1,
) )
cls.domain_request_4 = completed_domain_request( cls.domain_request_4 = completed_domain_request(
status=DomainRequest.DomainRequestStatus.STARTED, status=DomainRequest.DomainRequestStatus.STARTED,
@ -749,6 +857,7 @@ class MockDb(TestCase):
cls.domain_request_6 = completed_domain_request( cls.domain_request_6 = completed_domain_request(
status=DomainRequest.DomainRequestStatus.STARTED, status=DomainRequest.DomainRequestStatus.STARTED,
name="city6.gov", name="city6.gov",
portfolio=cls.portfolio_1,
) )
cls.domain_request_3.submit() cls.domain_request_3.submit()
cls.domain_request_4.submit() cls.domain_request_4.submit()
@ -797,6 +906,7 @@ class MockDb(TestCase):
UserPortfolioPermission.objects.all().delete() UserPortfolioPermission.objects.all().delete()
User.objects.all().delete() User.objects.all().delete()
DomainInvitation.objects.all().delete() DomainInvitation.objects.all().delete()
PortfolioInvitation.objects.all().delete()
cls.federal_agency_1.delete() cls.federal_agency_1.delete()
cls.federal_agency_2.delete() cls.federal_agency_2.delete()
@ -837,17 +947,18 @@ def mock_user():
return mock_user return mock_user
def create_superuser(): def create_superuser(**kwargs):
"""Creates a analyst user with is_staff=True and the group full_access_group"""
User = get_user_model() User = get_user_model()
p = "adminpass" p = "adminpass"
user = User.objects.create_user( user = User.objects.create_user(
username="superuser", username=kwargs.get("username", "superuser"),
email="admin@example.com", email=kwargs.get("email", "admin@example.com"),
first_name="first", first_name=kwargs.get("first_name", "first"),
last_name="last", last_name=kwargs.get("last_name", "last"),
is_staff=True, is_staff=kwargs.get("is_staff", True),
password=p, password=kwargs.get("password", p),
phone="8003111234", phone=kwargs.get("phone", "8003111234"),
) )
# Retrieve the group or create it if it doesn't exist # Retrieve the group or create it if it doesn't exist
group, _ = UserGroup.objects.get_or_create(name="full_access_group") group, _ = UserGroup.objects.get_or_create(name="full_access_group")
@ -856,18 +967,19 @@ def create_superuser():
return user return user
def create_user(): def create_user(**kwargs):
"""Creates a analyst user with is_staff=True and the group cisa_analysts_group"""
User = get_user_model() User = get_user_model()
p = "userpass" p = "userpass"
user = User.objects.create_user( user = User.objects.create_user(
username="staffuser", username=kwargs.get("username", "staffuser"),
email="staff@example.com", email=kwargs.get("email", "staff@example.com"),
first_name="first", first_name=kwargs.get("first_name", "first"),
last_name="last", last_name=kwargs.get("last_name", "last"),
is_staff=True, is_staff=kwargs.get("is_staff", True),
title="title", title=kwargs.get("title", "title"),
password=p, password=kwargs.get("password", p),
phone="8003111234", phone=kwargs.get("phone", "8003111234"),
) )
# Retrieve the group or create it if it doesn't exist # Retrieve the group or create it if it doesn't exist
group, _ = UserGroup.objects.get_or_create(name="cisa_analysts_group") group, _ = UserGroup.objects.get_or_create(name="cisa_analysts_group")

View file

@ -5,6 +5,8 @@ from registrar.models import (
DomainRequest, DomainRequest,
Domain, Domain,
UserDomainRole, UserDomainRole,
PortfolioInvitation,
User,
) )
from registrar.models import Portfolio, DraftDomain from registrar.models import Portfolio, DraftDomain
from registrar.models.user_portfolio_permission import UserPortfolioPermission from registrar.models.user_portfolio_permission import UserPortfolioPermission
@ -22,6 +24,7 @@ from registrar.utility.csv_export import (
DomainRequestExport, DomainRequestExport,
DomainRequestGrowth, DomainRequestGrowth,
DomainRequestDataFull, DomainRequestDataFull,
MemberExport,
get_default_start_date, get_default_start_date,
get_default_end_date, get_default_end_date,
) )
@ -42,9 +45,14 @@ from .common import (
get_wsgi_request_object, get_wsgi_request_object,
less_console_noise, less_console_noise,
get_time_aware_date, get_time_aware_date,
GenericTestHelper,
) )
from waffle.testutils import override_flag from waffle.testutils import override_flag
from datetime import datetime
from django.contrib.admin.models import LogEntry, ADDITION
from django.contrib.contenttypes.models import ContentType
class CsvReportsTest(MockDbForSharedTests): class CsvReportsTest(MockDbForSharedTests):
"""Tests to determine if we are uploading our reports correctly.""" """Tests to determine if we are uploading our reports correctly."""
@ -794,6 +802,104 @@ class ExportDataTest(MockDbForIndividualTests, MockEppLib):
self.assertEqual(csv_content, expected_content) self.assertEqual(csv_content, expected_content)
class MemberExportTest(MockDbForIndividualTests, MockEppLib):
def setUp(self):
"""Override of the base setUp to add a request factory"""
super().setUp()
self.factory = RequestFactory()
@override_flag("organization_feature", active=True)
@override_flag("organization_members", active=True)
@less_console_noise_decorator
def test_member_export(self):
"""Tests the member export report by comparing the csv output."""
# == Data setup == #
# Set last_login for some users
active_date = timezone.make_aware(datetime(2024, 2, 1))
User.objects.filter(id__in=[self.custom_superuser.id, self.custom_staffuser.id]).update(last_login=active_date)
# Create a logentry for meoward, created by lebowski to test invited_by.
content_type = ContentType.objects.get_for_model(PortfolioInvitation)
LogEntry.objects.create(
user=self.lebowski_user,
content_type=content_type,
object_id=self.portfolio_invitation_1.id,
object_repr=str(self.portfolio_invitation_1),
action_flag=ADDITION,
change_message="Created invitation",
action_time=timezone.make_aware(datetime(2023, 4, 12)),
)
# Create log entries for each remaining invitation. Exclude meoward and tired_user.
for invitation in PortfolioInvitation.objects.exclude(
id__in=[self.portfolio_invitation_1.id, self.portfolio_invitation_3.id]
):
LogEntry.objects.create(
user=self.custom_staffuser,
content_type=content_type,
object_id=invitation.id,
object_repr=str(invitation),
action_flag=ADDITION,
change_message="Created invitation",
action_time=timezone.make_aware(datetime(2024, 1, 15)),
)
# Retrieve invitations
with boto3_mocking.clients.handler_for("sesv2", self.mock_client_class):
self.meoward_user.check_portfolio_invitations_on_login()
self.lebowski_user.check_portfolio_invitations_on_login()
self.tired_user.check_portfolio_invitations_on_login()
self.custom_superuser.check_portfolio_invitations_on_login()
self.custom_staffuser.check_portfolio_invitations_on_login()
# Update the created at date on UserPortfolioPermission, so we can test a consistent date.
UserPortfolioPermission.objects.filter(portfolio=self.portfolio_1).update(
created_at=timezone.make_aware(datetime(2022, 4, 1))
)
# == End of data setup == #
# Create a request and add the user to the request
request = self.factory.get("/")
request.user = self.user
self.maxDiff = None
# Add portfolio to session
request = GenericTestHelper._mock_user_request_for_factory(request)
request.session["portfolio"] = self.portfolio_1
# Create a CSV file in memory
csv_file = StringIO()
# Call the export function
MemberExport.export_data_to_csv(csv_file, request=request)
# Reset the CSV file's position to the beginning
csv_file.seek(0)
# Read the content into a variable
csv_content = csv_file.read()
expected_content = (
# Header
"Email,Organization admin,Invited by,Joined date,Last active,Domain requests,"
"Member management,Domain management,Number of domains,Domains\n"
# Content
"meoward@rocks.com,False,big_lebowski@dude.co,2022-04-01,Invalid date,None,"
'Manager,True,2,"adomain2.gov,cdomain1.gov"\n'
"big_lebowski@dude.co,False,help@get.gov,2022-04-01,Invalid date,None,Viewer,True,1,cdomain1.gov\n"
"tired_sleepy@igorville.gov,False,System,2022-04-01,Invalid date,Viewer,None,False,0,\n"
"icy_superuser@igorville.gov,True,help@get.gov,2022-04-01,2024-02-01,Viewer Requester,Manager,False,0,\n"
"cozy_staffuser@igorville.gov,True,help@get.gov,2022-04-01,2024-02-01,Viewer Requester,None,False,0,\n"
"nonexistentmember_1@igorville.gov,False,help@get.gov,Unretrieved,Invited,None,Manager,False,0,\n"
"nonexistentmember_2@igorville.gov,False,help@get.gov,Unretrieved,Invited,None,Viewer,False,0,\n"
"nonexistentmember_3@igorville.gov,False,help@get.gov,Unretrieved,Invited,Viewer,None,False,0,\n"
"nonexistentmember_4@igorville.gov,True,help@get.gov,Unretrieved,"
"Invited,Viewer Requester,Manager,False,0,\n"
"nonexistentmember_5@igorville.gov,True,help@get.gov,Unretrieved,Invited,Viewer Requester,None,False,0,\n"
)
# Normalize line endings and remove commas,
# spaces and leading/trailing whitespace
csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip()
expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip()
self.assertEqual(csv_content, expected_content)
class HelperFunctions(MockDbForSharedTests): class HelperFunctions(MockDbForSharedTests):
"""This asserts that 1=1. Its limited usefulness lies in making sure the helper methods stay healthy.""" """This asserts that 1=1. Its limited usefulness lies in making sure the helper methods stay healthy."""

View file

@ -10,16 +10,38 @@ from registrar.models import (
DomainInformation, DomainInformation,
PublicContact, PublicContact,
UserDomainRole, UserDomainRole,
PortfolioInvitation,
UserGroup,
UserPortfolioPermission,
)
from django.db.models import (
Case,
CharField,
Count,
DateField,
F,
ManyToManyField,
Q,
QuerySet,
TextField,
Value,
When,
OuterRef,
Subquery,
Exists,
Func,
) )
from django.db.models import Case, CharField, Count, DateField, F, ManyToManyField, Q, QuerySet, Value, When
from django.utils import timezone from django.utils import timezone
from django.db.models.functions import Concat, Coalesce from django.db.models.functions import Concat, Coalesce, Cast
from django.contrib.postgres.aggregates import StringAgg from django.contrib.postgres.aggregates import ArrayAgg, StringAgg
from django.contrib.admin.models import LogEntry, ADDITION
from django.contrib.contenttypes.models import ContentType
from registrar.models.utility.generic_helper import convert_queryset_to_dict from registrar.models.utility.generic_helper import convert_queryset_to_dict
from registrar.models.utility.orm_helper import ArrayRemoveNull
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
from registrar.templatetags.custom_filters import get_region from registrar.templatetags.custom_filters import get_region
from registrar.utility.constants import BranchChoices from registrar.utility.constants import BranchChoices
from registrar.utility.enums import DefaultEmail from registrar.utility.enums import DefaultEmail, DefaultUserValues
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -109,14 +131,14 @@ class BaseExport(ABC):
return Q() return Q()
@classmethod @classmethod
def get_filter_conditions(cls, **export_kwargs): def get_filter_conditions(cls, **kwargs):
""" """
Get a Q object of filter conditions to filter when building queryset. Get a Q object of filter conditions to filter when building queryset.
""" """
return Q() return Q()
@classmethod @classmethod
def get_computed_fields(cls): def get_computed_fields(cls, **kwargs):
""" """
Get a dict of computed fields. These are fields that do not exist on the model normally Get a dict of computed fields. These are fields that do not exist on the model normally
and will be passed to .annotate() when building a queryset. and will be passed to .annotate() when building a queryset.
@ -145,7 +167,7 @@ class BaseExport(ABC):
return queryset return queryset
@classmethod @classmethod
def write_csv_before(cls, csv_writer, **export_kwargs): def write_csv_before(cls, csv_writer, **kwargs):
""" """
Write to csv file before the write_csv method. Write to csv file before the write_csv method.
Override in subclasses where needed. Override in subclasses where needed.
@ -162,7 +184,7 @@ class BaseExport(ABC):
Parameters: Parameters:
initial_queryset (QuerySet): Initial queryset. initial_queryset (QuerySet): Initial queryset.
computed_fields (dict, optional): Fields to compute {field_name: expression}. computed_fields (dict, optional): Fields to compute {field_name: expression}.
related_table_fields (list, optional): Extra fields to retrieve; defaults to annotation keys if None. related_table_fields (list, optional): Extra fields to retrieve; defaults to annotation keys if None.
include_many_to_many (bool, optional): Determines if we should include many to many fields or not include_many_to_many (bool, optional): Determines if we should include many to many fields or not
**kwargs: Additional keyword arguments for specific parameters (e.g., public_contacts, domain_invitations, **kwargs: Additional keyword arguments for specific parameters (e.g., public_contacts, domain_invitations,
@ -176,8 +198,8 @@ class BaseExport(ABC):
# We can infer that if we're passing in annotations, # We can infer that if we're passing in annotations,
# we want to grab the result of said annotation. # we want to grab the result of said annotation.
if computed_fields: if computed_fields :
related_table_fields.extend(computed_fields.keys()) related_table_fields.extend(computed_fields .keys())
# Get prexisting fields on the model # Get prexisting fields on the model
model_fields = set() model_fields = set()
@ -192,21 +214,37 @@ class BaseExport(ABC):
return cls.update_queryset(queryset, **kwargs) return cls.update_queryset(queryset, **kwargs)
@classmethod @classmethod
def export_data_to_csv(cls, csv_file, **export_kwargs): def export_data_to_csv(cls, csv_file, **kwargs):
""" """
All domain metadata: All domain metadata:
Exports domains of all statuses plus domain managers. Exports domains of all statuses plus domain managers.
""" """
writer = csv.writer(csv_file) writer = csv.writer(csv_file)
columns = cls.get_columns() columns = cls.get_columns()
models_dict = cls.get_model_annotation_dict(**kwargs)
# Write to csv file before the write_csv
cls.write_csv_before(writer, **kwargs)
# Write the csv file
rows = cls.write_csv(writer, columns, models_dict)
# Return rows that for easier parsing and testing
return rows
@classmethod
def get_annotated_queryset(cls, **kwargs):
"""Returns an annotated queryset based off of all query conditions."""
sort_fields = cls.get_sort_fields() sort_fields = cls.get_sort_fields()
kwargs = cls.get_additional_args() # Get additional args and merge with incoming kwargs
additional_args = cls.get_additional_args()
kwargs.update(additional_args)
select_related = cls.get_select_related() select_related = cls.get_select_related()
prefetch_related = cls.get_prefetch_related() prefetch_related = cls.get_prefetch_related()
exclusions = cls.get_exclusions() exclusions = cls.get_exclusions()
annotations_for_sort = cls.get_annotations_for_sort() annotations_for_sort = cls.get_annotations_for_sort()
filter_conditions = cls.get_filter_conditions(**export_kwargs) filter_conditions = cls.get_filter_conditions(**kwargs)
computed_fields = cls.get_computed_fields() computed_fields = cls.get_computed_fields(**kwargs)
related_table_fields = cls.get_related_table_fields() related_table_fields = cls.get_related_table_fields()
model_queryset = ( model_queryset = (
@ -219,15 +257,24 @@ class BaseExport(ABC):
.order_by(*sort_fields) .order_by(*sort_fields)
.distinct() .distinct()
) )
return cls.annotate_and_retrieve_fields(model_queryset, computed_fields, related_table_fields, **kwargs)
# Convert the queryset to a dictionary (including annotated fields) @classmethod
annotated_queryset = cls.annotate_and_retrieve_fields( def get_model_annotation_dict(cls, **kwargs):
model_queryset, computed_fields, related_table_fields, **kwargs return convert_queryset_to_dict(cls.get_annotated_queryset(**kwargs), is_model=False)
)
models_dict = convert_queryset_to_dict(annotated_queryset, is_model=False) @classmethod
def export_data_to_csv(cls, csv_file, **kwargs):
"""
All domain metadata:
Exports domains of all statuses plus domain managers.
"""
writer = csv.writer(csv_file)
columns = cls.get_columns()
models_dict = cls.get_model_annotation_dict(**kwargs)
# Write to csv file before the write_csv # Write to csv file before the write_csv
cls.write_csv_before(writer, **export_kwargs) cls.write_csv_before(writer, **kwargs)
# Write the csv file # Write the csv file
rows = cls.write_csv(writer, columns, models_dict) rows = cls.write_csv(writer, columns, models_dict)
@ -273,6 +320,218 @@ class BaseExport(ABC):
pass pass
class MemberExport(BaseExport):
"""CSV export for the MembersTable. The members table combines the content
of three tables: PortfolioInvitation, UserPortfolioPermission, and DomainInvitation."""
@classmethod
def model(self):
"""
No model is defined for the member report as it is a combination of multiple fields.
This is a special edge case, but the base report requires this to be defined.
"""
return None
@classmethod
def get_model_annotation_dict(cls, request=None, **kwargs):
"""Combines the permissions and invitation model annotations for
the final returned csv export which combines both of these contexts.
Returns a dictionary of a union between:
- UserPortfolioPermissionModelAnnotation.get_annotated_queryset(portfolio, csv_report=True)
- PortfolioInvitationModelAnnotation.get_annotated_queryset(portfolio, csv_report=True)
"""
portfolio = request.session.get("portfolio")
if not portfolio:
return {}
# Union the two querysets to combine UserPortfolioPermission + invites.
# Unions cannot have a col mismatch, so we must clamp what is returned here.
shared_columns = [
"id",
"first_name",
"last_name",
"email_display",
"last_active",
"roles",
"additional_permissions_display",
"member_display",
"domain_info",
"type",
"joined_date",
"invited_by",
]
# Permissions
permissions = (
UserPortfolioPermission.objects.filter(portfolio=portfolio)
.select_related("user")
.annotate(
first_name=F("user__first_name"),
last_name=F("user__last_name"),
email_display=F("user__email"),
last_active=Coalesce(
Func(F("user__last_login"), Value("YYYY-MM-DD"), function="to_char", output_field=TextField()),
Value("Invalid date"),
output_field=CharField(),
),
additional_permissions_display=F("additional_permissions"),
member_display=Case(
# If email is present and not blank, use email
When(Q(user__email__isnull=False) & ~Q(user__email=""), then=F("user__email")),
# If first name or last name is present, use concatenation of first_name + " " + last_name
When(
Q(user__first_name__isnull=False) | Q(user__last_name__isnull=False),
then=Concat(
Coalesce(F("user__first_name"), Value("")),
Value(" "),
Coalesce(F("user__last_name"), Value("")),
),
),
# If neither, use an empty string
default=Value(""),
output_field=CharField(),
),
domain_info=ArrayAgg(
F("user__permissions__domain__name"),
distinct=True,
# only include domains in portfolio
filter=Q(user__permissions__domain__isnull=False)
& Q(user__permissions__domain__domain_info__portfolio=portfolio),
),
type=Value("member", output_field=CharField()),
joined_date=Func(F("created_at"), Value("YYYY-MM-DD"), function="to_char", output_field=CharField()),
invited_by=cls.get_invited_by_query(object_id_query=cls.get_portfolio_invitation_id_query()),
)
.values(*shared_columns)
)
# Invitations
domain_invitations = DomainInvitation.objects.filter(
email=OuterRef("email"), # Check if email matches the OuterRef("email")
domain__domain_info__portfolio=portfolio, # Check if the domain's portfolio matches the given portfolio
).annotate(domain_info=F("domain__name"))
invitations = (
PortfolioInvitation.objects.exclude(status=PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED)
.filter(portfolio=portfolio)
.annotate(
first_name=Value(None, output_field=CharField()),
last_name=Value(None, output_field=CharField()),
email_display=F("email"),
last_active=Value("Invited", output_field=CharField()),
additional_permissions_display=F("additional_permissions"),
member_display=F("email"),
# Use ArrayRemove to return an empty list when no domain invitations are found
domain_info=ArrayRemoveNull(
ArrayAgg(
Subquery(domain_invitations.values("domain_info")),
distinct=True,
)
),
type=Value("invitedmember", output_field=CharField()),
joined_date=Value("Unretrieved", output_field=CharField()),
invited_by=cls.get_invited_by_query(object_id_query=Cast(OuterRef("id"), output_field=CharField())),
)
.values(*shared_columns)
)
return convert_queryset_to_dict(permissions.union(invitations), is_model=False)
@classmethod
def get_invited_by_query(cls, object_id_query):
"""Returns the user that created the given portfolio invitation.
Grabs this data from the audit log, given that a portfolio invitation object
is specified via object_id_query."""
return Coalesce(
Subquery(
LogEntry.objects.filter(
content_type=ContentType.objects.get_for_model(PortfolioInvitation),
object_id=object_id_query,
action_flag=ADDITION,
)
.annotate(
display_email=Case(
When(
Exists(
UserGroup.objects.filter(
name__in=["cisa_analysts_group", "full_access_group"],
user=OuterRef("user"),
)
),
then=Value(DefaultUserValues.HELP_EMAIL.value),
),
default=F("user__email"),
output_field=CharField(),
)
)
.order_by("action_time")
.values("display_email")[:1]
),
Value(DefaultUserValues.SYSTEM.value),
output_field=CharField(),
)
@classmethod
def get_portfolio_invitation_id_query(cls):
"""Gets the id of the portfolio invitation that created this UserPortfolioPermission.
This makes the assumption that if an invitation is retrieved, it must have created the given
UserPortfolioPermission object."""
return Cast(
Subquery(
PortfolioInvitation.objects.filter(
status=PortfolioInvitation.PortfolioInvitationStatus.RETRIEVED,
# Double outer ref because we first go into the LogEntry query,
# then into the parent UserPortfolioPermission.
email=OuterRef(OuterRef("user__email")),
portfolio=OuterRef(OuterRef("portfolio")),
).values("id")[:1]
),
output_field=CharField(),
)
@classmethod
def get_columns(cls):
"""
Returns the list of column string names for CSV export. Override in subclasses as needed.
"""
return [
"Email",
"Organization admin",
"Invited by",
"Joined date",
"Last active",
"Domain requests",
"Member management",
"Domain management",
"Number of domains",
"Domains",
]
@classmethod
@abstractmethod
def parse_row(cls, columns, model):
"""
Given a set of columns and a model dictionary, generate a new row from cleaned column data.
Must be implemented by subclasses
"""
roles = model.get("roles", [])
permissions = model.get("additional_permissions_display")
user_managed_domains = model.get("domain_info", [])
length_user_managed_domains = len(user_managed_domains)
FIELDS = {
"Email": model.get("email_display"),
"Organization admin": bool(UserPortfolioRoleChoices.ORGANIZATION_ADMIN in roles),
"Invited by": model.get("invited_by"),
"Joined date": model.get("joined_date"),
"Last active": model.get("last_active"),
"Domain requests": UserPortfolioPermission.get_domain_request_permission_display(roles, permissions),
"Member management": UserPortfolioPermission.get_member_permission_display(roles, permissions),
"Domain management": bool(length_user_managed_domains > 0),
"Number of domains": length_user_managed_domains,
"Domains": ",".join(user_managed_domains),
}
return [FIELDS.get(column, "") for column in columns]
class DomainExport(BaseExport): class DomainExport(BaseExport):
""" """
A collection of functions which return csv files regarding Domains. Although class is A collection of functions which return csv files regarding Domains. Although class is
@ -531,10 +790,10 @@ class DomainDataType(DomainExport):
""" """
Get a list of tables to pass to prefetch_related when building queryset. Get a list of tables to pass to prefetch_related when building queryset.
""" """
return ["permissions"] return ["domain__permissions"]
@classmethod @classmethod
def get_computed_fields(cls, delimiter=", "): def get_computed_fields(cls, delimiter=", ", **kwargs):
""" """
Get a dict of computed fields. Get a dict of computed fields.
""" """
@ -571,7 +830,7 @@ class DomainDataTypeUser(DomainDataType):
""" """
@classmethod @classmethod
def get_filter_conditions(cls, request=None): def get_filter_conditions(cls, request=None, **kwargs):
""" """
Get a Q object of filter conditions to filter when building queryset. Get a Q object of filter conditions to filter when building queryset.
""" """
@ -589,7 +848,7 @@ class DomainRequestsDataType:
""" """
@classmethod @classmethod
def get_filter_conditions(cls, request=None): def get_filter_conditions(cls, request=None, **kwargs):
if request is None or not hasattr(request, "user") or not request.user.is_authenticated: if request is None or not hasattr(request, "user") or not request.user.is_authenticated:
return Q(id__in=[]) return Q(id__in=[])
@ -739,7 +998,7 @@ class DomainDataFull(DomainExport):
return ["domain"] return ["domain"]
@classmethod @classmethod
def get_filter_conditions(cls): def get_filter_conditions(cls, **kwargs):
""" """
Get a Q object of filter conditions to filter when building queryset. Get a Q object of filter conditions to filter when building queryset.
""" """
@ -751,7 +1010,7 @@ class DomainDataFull(DomainExport):
) )
@classmethod @classmethod
def get_computed_fields(cls, delimiter=", "): def get_computed_fields(cls, delimiter=", ", **kwargs):
""" """
Get a dict of computed fields. Get a dict of computed fields.
""" """
@ -833,7 +1092,7 @@ class DomainDataFederal(DomainExport):
return ["domain"] return ["domain"]
@classmethod @classmethod
def get_filter_conditions(cls): def get_filter_conditions(cls, **kwargs):
""" """
Get a Q object of filter conditions to filter when building queryset. Get a Q object of filter conditions to filter when building queryset.
""" """
@ -846,7 +1105,7 @@ class DomainDataFederal(DomainExport):
) )
@classmethod @classmethod
def get_computed_fields(cls, delimiter=", "): def get_computed_fields(cls, delimiter=", ", **kwargs):
""" """
Get a dict of computed fields. Get a dict of computed fields.
""" """
@ -930,10 +1189,14 @@ class DomainGrowth(DomainExport):
return ["domain"] return ["domain"]
@classmethod @classmethod
def get_filter_conditions(cls, start_date=None, end_date=None): def get_filter_conditions(cls, start_date=None, end_date=None, **kwargs):
""" """
Get a Q object of filter conditions to filter when building queryset. Get a Q object of filter conditions to filter when building queryset.
""" """
if not start_date or not end_date:
# Return nothing
return Q(id__in=[])
filter_ready = Q( filter_ready = Q(
domain__state__in=[Domain.State.READY], domain__state__in=[Domain.State.READY],
domain__first_ready__gte=start_date, domain__first_ready__gte=start_date,
@ -1002,10 +1265,14 @@ class DomainManaged(DomainExport):
return ["permissions"] return ["permissions"]
@classmethod @classmethod
def get_filter_conditions(cls, start_date=None, end_date=None): def get_filter_conditions(cls, end_date=None, **kwargs):
""" """
Get a Q object of filter conditions to filter when building queryset. Get a Q object of filter conditions to filter when building queryset.
""" """
if not end_date:
# Return nothing
return Q(id__in=[])
end_date_formatted = format_end_date(end_date) end_date_formatted = format_end_date(end_date)
return Q( return Q(
domain__permissions__isnull=False, domain__permissions__isnull=False,
@ -1137,10 +1404,14 @@ class DomainUnmanaged(DomainExport):
return ["permissions"] return ["permissions"]
@classmethod @classmethod
def get_filter_conditions(cls, start_date=None, end_date=None): def get_filter_conditions(cls, end_date=None, **kwargs):
""" """
Get a Q object of filter conditions to filter when building queryset. Get a Q object of filter conditions to filter when building queryset.
""" """
if not end_date:
# Return nothing
return Q(id__in=[])
end_date_formatted = format_end_date(end_date) end_date_formatted = format_end_date(end_date)
return Q( return Q(
domain__permissions__isnull=True, domain__permissions__isnull=True,
@ -1369,10 +1640,13 @@ class DomainRequestGrowth(DomainRequestExport):
] ]
@classmethod @classmethod
def get_filter_conditions(cls, start_date=None, end_date=None): def get_filter_conditions(cls, start_date=None, end_date=None, **kwargs):
""" """
Get a Q object of filter conditions to filter when building queryset. Get a Q object of filter conditions to filter when building queryset.
""" """
if not start_date or not end_date:
# Return nothing
return Q(id__in=[])
start_date_formatted = format_start_date(start_date) start_date_formatted = format_start_date(start_date)
end_date_formatted = format_end_date(end_date) end_date_formatted = format_end_date(end_date)
@ -1465,7 +1739,7 @@ class DomainRequestDataFull(DomainRequestExport):
] ]
@classmethod @classmethod
def get_computed_fields(cls, delimiter=", "): def get_computed_fields(cls, delimiter=", ", **kwargs):
""" """
Get a dict of computed fields. Get a dict of computed fields.
""" """

View file

@ -35,12 +35,25 @@ class DefaultEmail(Enum):
Overview of emails: Overview of emails:
- PUBLIC_CONTACT_DEFAULT: "dotgov@cisa.dhs.gov" - PUBLIC_CONTACT_DEFAULT: "dotgov@cisa.dhs.gov"
- LEGACY_DEFAULT: "registrar@dotgov.gov" - LEGACY_DEFAULT: "registrar@dotgov.gov"
- HELP_EMAIL: "help@get.gov"
""" """
PUBLIC_CONTACT_DEFAULT = "dotgov@cisa.dhs.gov" PUBLIC_CONTACT_DEFAULT = "dotgov@cisa.dhs.gov"
LEGACY_DEFAULT = "registrar@dotgov.gov" LEGACY_DEFAULT = "registrar@dotgov.gov"
class DefaultUserValues(StrEnum):
"""Stores default values for a default user.
Overview of defaults:
- SYSTEM: "System" <= Default username
- UNRETRIEVED: "Unretrieved" <= Default email state
"""
HELP_EMAIL = "help@get.gov"
SYSTEM = "System"
UNRETRIEVED = "Unretrieved"
class Step(StrEnum): class Step(StrEnum):
""" """
Names for each page of the domain request wizard. Names for each page of the domain request wizard.

View file

@ -12,6 +12,7 @@ from registrar.models.portfolio_invitation import PortfolioInvitation
from registrar.models.user_portfolio_permission import UserPortfolioPermission from registrar.models.user_portfolio_permission import UserPortfolioPermission
from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices
from registrar.views.utility.mixins import PortfolioMembersPermission from registrar.views.utility.mixins import PortfolioMembersPermission
from registrar.models.utility.orm_helper import ArrayRemoveNull
class PortfolioMembersJson(PortfolioMembersPermission, View): class PortfolioMembersJson(PortfolioMembersPermission, View):
@ -134,7 +135,7 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
additional_permissions_display=F("additional_permissions"), additional_permissions_display=F("additional_permissions"),
member_display=F("email"), member_display=F("email"),
# Use ArrayRemove to return an empty list when no domain invitations are found # Use ArrayRemove to return an empty list when no domain invitations are found
domain_info=ArrayRemove( domain_info=ArrayRemoveNull(
ArrayAgg( ArrayAgg(
Subquery(domain_invitations.values("domain_info")), Subquery(domain_invitations.values("domain_info")),
distinct=True, distinct=True,
@ -214,8 +215,3 @@ class PortfolioMembersJson(PortfolioMembersPermission, View):
} }
return member_json return member_json
# Custom Func to use array_remove to remove null values
class ArrayRemove(Func):
function = "array_remove"
template = "%(function)s(%(expressions)s, NULL)"

View file

@ -169,6 +169,34 @@ class ExportDataTypeUser(View):
return response return response
class ExportMembersPortfolio(View):
"""Returns a members report for a given portfolio"""
def get(self, request, *args, **kwargs):
"""Returns the members report"""
portfolio = request.session.get("portfolio")
# Check if the user has organization access
if not request.user.is_org_user(request):
return render(request, "403.html", status=403)
# Check if the user has member permissions
if not request.user.has_view_members_portfolio_permission(
portfolio
) and not request.user.has_edit_members_portfolio_permission(portfolio):
return render(request, "403.html", status=403)
# Swap the spaces for dashes to make the formatted name look prettier
portfolio_display = "organization"
if portfolio:
portfolio_display = str(portfolio).lower().replace(" ", "-")
response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = f'attachment; filename="members-for-{portfolio_display}.csv"'
csv_export.MemberExport.export_data_to_csv(response, request=request)
return response
class ExportDataTypeRequests(View): class ExportDataTypeRequests(View):
"""Returns a domain requests report for a given user on the request""" """Returns a domain requests report for a given user on the request"""