#3761 : Update createfederalportfolio script match fed agency [dg] (#3941)

* Bringing over changes from original PR

* Added the rapidfuzz lib to pip

* Added the lib to requirements

* Refactored the fuzzy matcher out to a generic util, updated the create federal porfolio.

* linter fixes

* lint fixes

* Adjusting loop to skip index (correct testing)

* Created test for the fuzzy string match fixed any issues that were found. Set back the version of set up tools to what it should be.

* Linter and Black changes.

* cleaning up updates

* Added root user to the owasp sec scan to fix the perm issue.

* More updates to fix owasp.

* linting fix

* Removed the person name fuzzy matcher.

* lint fix

* Refactored the domains and requests for loop for dry run

* lint fix

* Cleaning up lint and test after removing the persongenerator

* cleaning a test

* forgot a file

* fixed lint issue

---------

Co-authored-by: Abraham Alam <abraham.alam@ecstech.com>
This commit is contained in:
Daisy Guti 2025-07-21 14:51:59 -07:00 committed by GitHub
parent ce910c2c68
commit b4b2ac6e63
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 2037 additions and 937 deletions

View file

@ -10,9 +10,9 @@ If you're new to Django, see [Getting Started with Django](https://www.djangopro
```shell
cd src
docker-compose build
docker compose build
```
* Run the server: `docker-compose up`
* Run the server: `docker compose up`
Press Ctrl-c when you'd like to exit or pass `-d` to run in detached mode.
@ -50,7 +50,7 @@ Resources:
## Setting Vars
Non-secret environment variables for local development are set in [src/docker-compose.yml](../../src/docker-compose.yml).
Non-secret environment variables for local development are set in [src/docker compose.yml](../../src/docker compose.yml).
Secrets (for example, if you'd like to have a working Login.gov authentication) go in `.env` in [src/](../../src/) with contents like this:
@ -159,15 +159,15 @@ The CODEOWNERS file sets the tagged individuals as default reviewers on any Pull
## Viewing Logs
If you run via `docker-compose up`, you'll see the logs in your terminal.
If you run via `docker compose up`, you'll see the logs in your terminal.
If you run via `docker-compose up -d`, you can get logs with `docker-compose logs -f`.
If you run via `docker compose up -d`, you can get logs with `docker compose logs -f`.
You can change the logging verbosity, if needed. Do a web search for "django log level".
## Mock data
[load.py](../../src/registrar/management/commands/load.py) called from docker-compose (locally) and reset-db.yml (upper) loads the fixtures from [fixtures_user.py](../../src/registrar/fixtures/fixtures_users.py) and the rest of the data-loading fixtures in that fixtures folder, giving you some test data to play with while developing.
[load.py](../../src/registrar/management/commands/load.py) called from docker compose (locally) and reset-db.yml (upper) loads the fixtures from [fixtures_user.py](../../src/registrar/fixtures/fixtures_users.py) and the rest of the data-loading fixtures in that fixtures folder, giving you some test data to play with while developing.
See the [database-access README](./database-access.md) for information on how to pull data to update these fixtures.
@ -179,26 +179,26 @@ To get a container running:
```shell
cd src
docker-compose build
docker-compose up -d
docker compose build
docker compose up -d
```
Django's test suite:
```shell
docker-compose exec app ./manage.py test
docker compose exec app ./manage.py test
```
OR
```shell
docker-compose exec app python -Wa ./manage.py test # view deprecation warnings
docker compose exec app python -Wa ./manage.py test # view deprecation warnings
```
Linters:
```shell
docker-compose exec app ./manage.py lint
docker compose exec app ./manage.py lint
```
### Get availability for domain requests to work locally
@ -266,7 +266,7 @@ accessibility rules. The scan runs as part of our CI setup (see
type
```shell
docker-compose run pa11y npm run pa11y-ci
docker compose run pa11y npm run pa11y-ci
```
The URLs that `pa11y-ci` will scan are configured in `src/.pa11yci`. When new
@ -280,7 +280,7 @@ security rules. The scan runs as part of our CI setup (see
type
```shell
docker-compose run owasp
docker compose run owasp
```
## Images, stylesheets, and JavaScript
@ -297,7 +297,7 @@ We utilize the [uswds-compile tool](https://designsystem.digital.gov/documentati
### Making and viewing style changes
When you run `docker-compose up` the `node` service in the container will begin to watch for changes in the `registrar/assets` folder, and will recompile once any changes are made.
When you run `docker compose up` the `node` service in the container will begin to watch for changes in the `registrar/assets` folder, and will recompile once any changes are made.
Within the `registrar/assets` folder, the `_theme` folder contains three files initially generated by `uswds-compile`:
1. `_uswds-theme-custom-styles` contains all the custom styles created for this application

View file

@ -35,6 +35,7 @@ django-admin-multiple-choice-list-filter = "*"
django-import-export = "*"
django-waffle = "*"
cryptography = "*"
rapidfuzz = "*"
[dev-packages]
django-debug-toolbar = "*"

1600
src/Pipfile.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,4 +1,5 @@
# coding: utf-8
# flake8: noqa: F824
import logging

View file

@ -125,6 +125,7 @@ services:
owasp:
image: ghcr.io/zaproxy/zaproxy:stable
user: "root"
command: zap-baseline.py -t http://app:8080 -c zap.conf -I -r zap_report.html
volumes:
- .:/zap/wrk/

View file

@ -162,6 +162,7 @@ INSTALLED_APPS = [
"import_export",
# Waffle feature flags
"waffle",
"csp",
]
# Middleware are routines for processing web requests.
@ -178,6 +179,8 @@ MIDDLEWARE = [
"whitenoise.middleware.WhiteNoiseMiddleware",
# provide security enhancements to the request/response cycle
"django.middleware.security.SecurityMiddleware",
# django-csp: enable use of Content-Security-Policy header
"csp.middleware.CSPMiddleware",
# store and retrieve arbitrary data on a per-site-visitor basis
"django.contrib.sessions.middleware.SessionMiddleware",
# add a few conveniences for perfectionists, see documentation
@ -193,8 +196,6 @@ MIDDLEWARE = [
"django.contrib.messages.middleware.MessageMiddleware",
# provide clickjacking protection via the X-Frame-Options header
"django.middleware.clickjacking.XFrameOptionsMiddleware",
# django-csp: enable use of Content-Security-Policy header
"csp.middleware.CSPMiddleware",
# django-auditlog: obtain the request User for use in logging
"auditlog.middleware.AuditlogMiddleware",
# Used for waffle feature flags
@ -360,33 +361,35 @@ WAFFLE_FLAG_MODEL = "registrar.WaffleFlag"
# Content-Security-Policy configuration
# this can be restrictive because we have few external scripts
allowed_sources = ("'self'",)
CSP_DEFAULT_SRC = allowed_sources
# Most things fall back to default-src, but the following do not and should be
# explicitly set
CSP_FRAME_ANCESTORS = allowed_sources
CSP_FORM_ACTION = allowed_sources
# Google analytics requires that we relax our otherwise
# strict CSP by allowing scripts to run from their domain
# and inline with a nonce, as well as allowing connections back to their domain.
# Note: If needed, we can embed chart.js instead of using the CDN
CSP_DEFAULT_SRC = ("'self'",)
CSP_STYLE_SRC = [
"'self'",
"https://www.ssa.gov/accessibility/andi/andi.css",
]
CSP_SCRIPT_SRC_ELEM = [
"'self'",
"https://www.googletagmanager.com/",
"https://cdn.jsdelivr.net/npm/chart.js",
"https://www.ssa.gov",
"https://ajax.googleapis.com",
]
CSP_CONNECT_SRC = ["'self'", "https://www.google-analytics.com/", "https://www.ssa.gov/accessibility/andi/andi.js"]
# Content-Security-Policy configuration for django-csp 4.0+ New format required
CONTENT_SECURITY_POLICY = {
"DIRECTIVES": {
"connect-src": [
"'self'",
"https://www.google-analytics.com/",
"https://www.ssa.gov/accessibility/andi/andi.js",
],
"default-src": ("'self'",),
"form-action": ("'self'",),
"frame-ancestors": ("'self'",),
"img-src": ["'self'", "https://www.ssa.gov/accessibility/andi/icons/"],
"script-src-elem": [
"'self'",
"https://www.googletagmanager.com/",
"https://cdn.jsdelivr.net/npm/chart.js",
"https://www.ssa.gov",
"https://ajax.googleapis.com",
],
"style-src": ["'self'", "https://www.ssa.gov/accessibility/andi/andi.css"],
}
}
CSP_INCLUDE_NONCE_IN = ["script-src-elem", "style-src"]
CSP_IMG_SRC = ["'self'", "https://www.ssa.gov/accessibility/andi/icons/"]
# Cross-Origin Resource Sharing (CORS) configuration
# Sets clients that allow access control to manage.get.gov
# TODO: remove :8080 to see if we can have all localhost access

View file

@ -505,7 +505,7 @@ class DomainOrgNameAddressForm(forms.ModelForm):
state_territory = forms.ChoiceField(
label="State, territory, or military post",
required=True,
choices=DomainInformation.StateTerritoryChoices.choices,
choices=DomainInformation.StateTerritoryChoices.choices, # type: ignore[misc]
error_messages={
"required": ("Select the state, territory, or military post where your organization is located.")
},
@ -606,7 +606,7 @@ class DomainOrgNameAddressForm(forms.ModelForm):
except field.queryset.model.DoesNotExist:
pass # Handle the case where the object does not exist
elif hasattr(new_value, "id"):
elif hasattr(new_value, "id") and new_value is not None:
# If new_value is a model instance, compare by ID.
new_value = new_value.id

View file

@ -46,7 +46,7 @@ class PortfolioOrgAddressForm(forms.ModelForm):
state_territory = forms.ChoiceField(
label="State, territory, or military post",
required=True,
choices=DomainInformation.StateTerritoryChoices.choices,
choices=DomainInformation.StateTerritoryChoices.choices, # type: ignore[misc]
error_messages={
"required": ("Select the state, territory, or military post where your organization is located.")
},

View file

@ -1,4 +1,23 @@
"""Loads files from /tmp into our sandboxes"""
"""
This command creates and organizes federal agency portfolios by:
1. Creates a Portfolio record for the specified agencies
2. Uses fuzzy string matching to find domain requests and domain information records
that belong to the agency (handles name variations like "Department of State" vs "State Dept" vs "DOS")
3. Automatically creates Suborganization records from the different sub-units/departments found within
the discovered domains/requests (e.g., "IT Department", "Communications Office")
4. Associates / Links domains and requests to their proper portfolio and suborganization hierarchy
Usage Examples:
# Create portfolio for specific agency
./manage.py create_federal_portfolio --agency_name "Department of State" --parse_requests --parse_domains
# Create portfolios for entire branch
./manage.py create_federal_portfolio --branch "executive" --parse_requests --parse_domains
# Dry run to see what would change
./manage.py create_federal_portfolio --agency_name "Department of Defense" --parse_requests --dry_run
"""
import argparse
import logging
@ -14,7 +33,7 @@ from registrar.models.utility.generic_helper import count_capitals, normalize_st
from django.db.models import F, Q
from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices
from registrar.management.commands.utility.fuzzy_string_matcher import create_federal_agency_matcher
logger = logging.getLogger(__name__)
@ -72,6 +91,34 @@ class Command(BaseCommand):
self.domain_request_changes = self.ChangeTracker(model_class=DomainRequest)
self.user_portfolio_perm_changes = self.ChangeTracker(model_class=UserPortfolioPermission)
self.portfolio_invitation_changes = self.ChangeTracker(model_class=PortfolioInvitation)
self.fuzzy_matcher = None
self.fuzzy_threshold = 85
self.dry_run = False
def _create_fuzzy_organization_filter(self, federal_agency, all_org_names=None):
"""
Create a Q filter that includes both direct federal agency matches
and fuzzy organization name matches.
"""
# Direct federal agency relationship (existing logic)
base_filter = Q(federal_agency=federal_agency)
# Fuzzy organization name matching
if all_org_names and self.fuzzy_matcher:
# The fuzzy matcher returns a MatchResult object, not a set
match_result = self.fuzzy_matcher.find_matches(federal_agency.agency, all_org_names)
# Extract the matched_strings from the MatchResult
matched_org_names = match_result.matched_strings
# Create Q objects for organization name matching
org_name_filters = Q()
for name in matched_org_names:
org_name_filters |= Q(organization_name__iexact=name)
return base_filter | org_name_filters
return base_filter
def add_arguments(self, parser):
"""Add command line arguments to create federal portfolios.
@ -88,6 +135,8 @@ class Command(BaseCommand):
Optional:
--skip_existing_portfolios: Does not perform substeps on a portfolio if it already exists.
--dry_run: Show what would be changed without making any database modifications
--fuzzy_threshold: Similarity threshold for fuzzy matching (default: 85)
--debug: Increases log verbosity
"""
group = parser.add_mutually_exclusive_group(required=True)
@ -118,7 +167,18 @@ class Command(BaseCommand):
parser.add_argument(
"--skip_existing_portfolios",
action=argparse.BooleanOptionalAction,
help="Only parses newly created portfolios, skippubg existing ones.",
help="Only parses newly created portfolios, skipping existing ones.",
)
parser.add_argument(
"--dry_run",
action=argparse.BooleanOptionalAction,
help="Show what would be changed without making any database modifications.",
)
parser.add_argument(
"--fuzzy_threshold",
type=int,
default=85,
help="Similarity threshold for fuzzy matching (0-100, default: 85).",
)
parser.add_argument(
"--debug",
@ -133,7 +193,10 @@ class Command(BaseCommand):
parse_domains = options.get("parse_domains")
parse_managers = options.get("parse_managers")
skip_existing_portfolios = options.get("skip_existing_portfolios")
dry_run = options.get("dry_run")
debug = options.get("debug")
fuzzy_threshold = options.get("fuzzy_threshold", 85)
self.dry_run = dry_run
# Parse script params
if not (parse_requests or parse_domains or parse_managers):
@ -141,6 +204,15 @@ class Command(BaseCommand):
"You must specify at least one of --parse_requests, --parse_domains, or --parse_managers."
)
# Show dry run
if dry_run:
logger.info(f"{TerminalColors.BOLD}{TerminalColors.OKBLUE}")
logger.info("=" * 60)
logger.info(" DRY RUN MODE")
logger.info(" NO DATABASE CHANGES WILL BE MADE")
logger.info("=" * 60)
logger.info(f"{TerminalColors.ENDC}")
# Get agencies
federal_agency_filter = {"agency__iexact": agency_name} if agency_name else {"federal_type": branch}
agencies = FederalAgency.objects.filter(agency__isnull=False, **federal_agency_filter).distinct()
@ -154,6 +226,8 @@ class Command(BaseCommand):
else:
raise CommandError(f"Cannot find '{branch}' federal agencies in our database.")
self.fuzzy_matcher = create_federal_agency_matcher(threshold=fuzzy_threshold)
# Store all portfolios and agencies in a dict to avoid extra db calls
existing_portfolios = Portfolio.objects.filter(
organization_name__in=agencies.values_list("agency", flat=True), organization_name__isnull=False
@ -181,19 +255,25 @@ class Command(BaseCommand):
senior_official=federal_agency.so_federal_agency.first(),
)
self.portfolio_changes.create.append(portfolio)
logger.info(f"{TerminalColors.OKGREEN}Created portfolio '{portfolio}'.{TerminalColors.ENDC}")
self._log_action("CREATE", f"portfolio '{portfolio}'")
elif skip_existing_portfolios:
message = f"Portfolio '{portfolio}' already exists. Skipped."
logger.info(f"{TerminalColors.YELLOW}{message}{TerminalColors.ENDC}")
self.portfolio_changes.skip.append(portfolio)
# Create portfolios
self.portfolio_changes.bulk_create()
if not self.dry_run:
self.portfolio_changes.bulk_create()
# After create, get the list of all portfolios to use
portfolios_to_use = set(self.portfolio_changes.create)
if not skip_existing_portfolios:
portfolios_to_use.update(set(existing_portfolios))
if self.dry_run:
portfolios_to_use = list(self.portfolio_changes.create)
if not skip_existing_portfolios:
portfolios_to_use.extend(list(existing_portfolios))
else:
# After create, get the list of all portfolios to use
portfolios_to_use = set(self.portfolio_changes.create)
if not skip_existing_portfolios:
portfolios_to_use.update(set(existing_portfolios))
portfolios_to_use_dict = {normalize_string(p.organization_name): p for p in portfolios_to_use}
@ -201,14 +281,13 @@ class Command(BaseCommand):
created_suborgs = self.create_suborganizations(portfolios_to_use_dict, agencies_dict)
if created_suborgs:
self.suborganization_changes.create.extend(created_suborgs.values())
self.suborganization_changes.bulk_create()
if not self.dry_run:
self.suborganization_changes.bulk_create()
# == Handle domains and requests == #
for portfolio_org_name, portfolio in portfolios_to_use_dict.items():
federal_agency = agencies_dict.get(portfolio_org_name)
suborgs = {}
for suborg in portfolio.portfolio_suborganizations.all():
suborgs[suborg.name] = suborg
suborgs = self._get_suborgs_for_portfolio(portfolio, created_suborgs)
if parse_domains:
updated_domains = self.update_domains(portfolio, federal_agency, suborgs, debug)
@ -219,30 +298,31 @@ class Command(BaseCommand):
self.domain_request_changes.update.extend(updated_domain_requests)
# Update DomainInformation
try:
self.domain_info_changes.bulk_update(["portfolio", "sub_organization"])
except Exception as err:
logger.error(f"{TerminalColors.FAIL}Could not bulk update domain infos.{TerminalColors.ENDC}")
logger.error(err, exc_info=True)
if not self.dry_run:
try:
self.domain_info_changes.bulk_update(["portfolio", "sub_organization"])
except Exception as err:
logger.error(f"{TerminalColors.FAIL}Could not bulk update domain infos.{TerminalColors.ENDC}")
logger.error(err, exc_info=True)
# Update DomainRequest
try:
self.domain_request_changes.bulk_update(
[
"portfolio",
"sub_organization",
"requested_suborganization",
"suborganization_city",
"suborganization_state_territory",
"federal_agency",
]
)
except Exception as err:
logger.error(f"{TerminalColors.FAIL}Could not bulk update domain requests.{TerminalColors.ENDC}")
logger.error(err, exc_info=True)
# Update DomainRequest
try:
self.domain_request_changes.bulk_update(
[
"portfolio",
"sub_organization",
"requested_suborganization",
"suborganization_city",
"suborganization_state_territory",
"federal_agency",
]
)
except Exception as err:
logger.error(f"{TerminalColors.FAIL}Could not bulk update domain requests.{TerminalColors.ENDC}")
logger.error(err, exc_info=True)
# == Handle managers (no bulk_create) == #
if parse_managers:
if parse_managers and not self.dry_run:
domain_infos = DomainInformation.objects.filter(portfolio__in=portfolios_to_use)
domains = Domain.objects.filter(domain_info__in=domain_infos)
@ -256,22 +336,29 @@ class Command(BaseCommand):
self.print_final_run_summary(parse_domains, parse_requests, parse_managers, debug)
def print_final_run_summary(self, parse_domains, parse_requests, parse_managers, debug):
action_prefix = "WOULD BE " if self.dry_run else ""
self.portfolio_changes.print_script_run_summary(
no_changes_message="||============= No portfolios changed. =============||",
log_header="============= PORTFOLIOS =============",
skipped_header="----- SOME PORTFOLIOS WERENT CREATED (BUT OTHER RECORDS ARE STILL PROCESSED) -----",
no_changes_message=(f"||============= No portfolios {action_prefix.lower()}changed. =============||"),
log_header=f"============= PORTFOLIOS {action_prefix}=============",
skipped_header=(
f"----- SOME PORTFOLIOS {action_prefix}WERENT CREATED " f"(BUT OTHER RECORDS ARE STILL PROCESSED) -----"
),
detailed_prompt_title=(
"PORTFOLIOS: Do you wish to see the full list of failed, skipped and updated records?"
f"PORTFOLIOS: Do you wish to see the full list of "
f"{action_prefix.lower()}failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
)
self.suborganization_changes.print_script_run_summary(
no_changes_message="||============= No suborganizations changed. =============||",
log_header="============= SUBORGANIZATIONS =============",
skipped_header="----- SUBORGANIZATIONS SKIPPED (SAME NAME AS PORTFOLIO NAME) -----",
no_changes_message=(f"||============= No suborganizations {action_prefix.lower()}changed. =============||"),
log_header=f"============= SUBORGANIZATIONS {action_prefix}=============",
skipped_header=(f"----- SUBORGANIZATIONS {action_prefix}SKIPPED (SAME NAME AS PORTFOLIO NAME) -----"),
detailed_prompt_title=(
"SUBORGANIZATIONS: Do you wish to see the full list of failed, skipped and updated records?"
f"SUBORGANIZATIONS: Do you wish to see the full list of "
f"{action_prefix.lower()}failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
@ -279,10 +366,11 @@ class Command(BaseCommand):
if parse_domains:
self.domain_info_changes.print_script_run_summary(
no_changes_message="||============= No domains changed. =============||",
log_header="============= DOMAINS =============",
no_changes_message=(f"||============= No domains {action_prefix.lower()}changed. =============||"),
log_header=f"============= DOMAINS {action_prefix}=============",
detailed_prompt_title=(
"DOMAINS: Do you wish to see the full list of failed, skipped and updated records?"
f"DOMAINS: Do you wish to see the full list of "
f"{action_prefix.lower()}failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
@ -290,10 +378,13 @@ class Command(BaseCommand):
if parse_requests:
self.domain_request_changes.print_script_run_summary(
no_changes_message="||============= No domain requests changed. =============||",
log_header="============= DOMAIN REQUESTS =============",
no_changes_message=(
f"||============= No domain requests {action_prefix.lower()}changed. =============||"
),
log_header=f"============= DOMAIN REQUESTS {action_prefix}=============",
detailed_prompt_title=(
"DOMAIN REQUESTS: Do you wish to see the full list of failed, skipped and updated records?"
f"DOMAIN REQUESTS: Do you wish to see the full list of "
f"{action_prefix.lower()}failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
@ -301,102 +392,240 @@ class Command(BaseCommand):
if parse_managers:
self.user_portfolio_perm_changes.print_script_run_summary(
no_changes_message="||============= No managers changed. =============||",
log_header="============= MANAGERS =============",
skipped_header="----- MANAGERS SKIPPED (ALREADY EXISTED) -----",
no_changes_message=(f"||============= No managers {action_prefix.lower()}changed. =============||"),
log_header=f"============= MANAGERS {action_prefix}=============",
skipped_header=f"----- MANAGERS {action_prefix}SKIPPED (ALREADY EXISTED) -----",
detailed_prompt_title=(
"MANAGERS: Do you wish to see the full list of failed, skipped and updated records?"
f"MANAGERS: Do you wish to see the full list of "
f"{action_prefix.lower()}failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
)
self.portfolio_invitation_changes.print_script_run_summary(
no_changes_message="||============= No manager invitations changed. =============||",
log_header="============= MANAGER INVITATIONS =============",
skipped_header="----- INVITATIONS SKIPPED (ALREADY EXISTED) -----",
no_changes_message=(
f"||============= No manager invitations {action_prefix.lower()}changed. =============||"
),
log_header=f"============= MANAGER INVITATIONS {action_prefix}=============",
skipped_header=f"----- INVITATIONS {action_prefix}SKIPPED (ALREADY EXISTED) -----",
detailed_prompt_title=(
"MANAGER INVITATIONS: Do you wish to see the full list of failed, skipped and updated records?"
f"MANAGER INVITATIONS: Do you wish to see the full list of "
f"{action_prefix.lower()}failed, skipped and updated records?"
),
display_as_str=True,
debug=debug,
)
# Add dry run summary at the end
if self.dry_run:
self._print_dry_run_summary()
def _print_dry_run_summary(self):
"""Print a summary of what would be changed in dry run mode."""
logger.info(f"\n{TerminalColors.BOLD}{TerminalColors.OKBLUE}")
logger.info("=" * 60)
logger.info(" DRY RUN SUMMARY")
logger.info("=" * 60)
logger.info(f"{TerminalColors.ENDC}")
total_changes = (
len(self.portfolio_changes.create)
+ len(self.suborganization_changes.create)
+ len(self.domain_info_changes.update)
+ len(self.domain_request_changes.update)
+ len(self.user_portfolio_perm_changes.create)
+ len(self.portfolio_invitation_changes.create)
)
logger.info(f"Total records that would be modified: {total_changes}")
logger.info(f" • Portfolios created: {len(self.portfolio_changes.create)}")
logger.info(f" • Suborganizations created: {len(self.suborganization_changes.create)}")
logger.info(f" • Domain infos updated: {len(self.domain_info_changes.update)}")
logger.info(f" • Domain requests updated: {len(self.domain_request_changes.update)}")
logger.info(f" • User permissions created: {len(self.user_portfolio_perm_changes.create)}")
logger.info(f" • Portfolio invitations created: {len(self.portfolio_invitation_changes.create)}")
logger.info(
f"\n{TerminalColors.BOLD}To apply these changes, run the command without --dry_run{TerminalColors.ENDC}"
)
def create_suborganizations(self, portfolio_dict, agency_dict):
"""Create Suborganizations tied to the given portfolio based on DomainInformation objects"""
created_suborgs = {}
portfolios = portfolio_dict.values()
# Get filtered domains and requests
domains_dict, requests_dict = self._get_filtered_domains_and_requests(agency_dict)
# Process each portfolio
for portfolio_name, portfolio in portfolio_dict.items():
existing_suborgs = self._get_existing_suborgs_for_portfolio(portfolio)
portfolio_created_suborgs = self._get_portfolio_created_suborgs(created_suborgs, portfolio)
# Create suborganizations for this portfolio
self._create_suborgs_for_portfolio(
portfolio_name,
portfolio,
domains_dict,
requests_dict,
existing_suborgs,
portfolio_created_suborgs,
created_suborgs,
)
return created_suborgs
def _get_filtered_domains_and_requests(self, agency_dict):
"""Get domains and requests filtered by agencies, grouped by normalized organization name."""
agencies = agency_dict.values()
domains = DomainInformation.objects.filter(
# Org name must not be null, and must not be the portfolio name
Q(
organization_name__isnull=False,
)
& ~Q(organization_name__iexact=F("portfolio__organization_name")),
# Only get relevant data to the agency/portfolio we are targeting
Q(federal_agency__in=agencies) | Q(portfolio__in=portfolios),
# Get all organization names for matching
all_org_names = self._get_all_organization_names()
# Build filters for domains and requests
domain_filters, request_filters = self._build_agency_filters(agencies, all_org_names)
# Get filtered querysets
domains = self._get_filtered_domains(domain_filters)
requests = self._get_filtered_requests(request_filters)
# Group by normalized organization name
domains_dict = self._group_by_normalized_org_name(domains, "organization_name")
requests_dict = self._group_by_normalized_org_name(requests, "organization_name")
return domains_dict, requests_dict
def _get_all_organization_names(self):
"""Get all unique organization names from domains and requests."""
domain_names = list(
DomainInformation.objects.filter(organization_name__isnull=False)
.values_list("organization_name", flat=True)
.distinct()
)
requests = DomainRequest.objects.filter(
# Org name must not be null, and must not be the portfolio name
Q(
organization_name__isnull=False,
)
& ~Q(organization_name__iexact=F("portfolio__organization_name")),
# Only get relevant data to the agency/portfolio we are targeting
Q(federal_agency__in=agencies) | Q(portfolio__in=portfolios),
request_names = list(
DomainRequest.objects.filter(organization_name__isnull=False)
.values_list("organization_name", flat=True)
.distinct()
)
return [normalize_string(name) for name in domain_names + request_names]
def _build_agency_filters(self, agencies, all_org_names):
"""Build Q filters for domains and requests based on agencies."""
domain_filters = Q()
request_filters = Q()
for agency in agencies:
agency_filter = self._create_fuzzy_organization_filter(agency, all_org_names)
domain_filters |= agency_filter
request_filters |= agency_filter
return domain_filters, request_filters
def _get_filtered_domains(self, domain_filters):
"""Get filtered domain information objects."""
return DomainInformation.objects.filter(
Q(organization_name__isnull=False) & ~Q(organization_name__iexact=F("portfolio__organization_name")),
domain_filters,
)
# First: get all existing suborgs
# NOTE: .all() is a heavy query, but unavoidable as we need to check for duplicate names.
# This is not quite as heavy as just using a for loop and .get_or_create, but worth noting.
# Change this if you can find a way to avoid doing this.
# This won't scale great for 10k+ records.
existing_suborgs = Suborganization.objects.all()
suborg_dict = {normalize_string(org.name): org for org in existing_suborgs}
def _get_filtered_requests(self, request_filters):
"""Get filtered domain request objects."""
return DomainRequest.objects.filter(
Q(organization_name__isnull=False) & ~Q(organization_name__iexact=F("portfolio__organization_name")),
request_filters,
)
# Second: Group domains and requests by normalized organization name.
domains_dict = {}
requests_dict = {}
for domain in domains:
normalized_name = normalize_string(domain.organization_name)
domains_dict.setdefault(normalized_name, []).append(domain)
def _group_by_normalized_org_name(self, queryset, org_name_field):
"""Group queryset objects by normalized organization name."""
grouped_dict = {}
for obj in queryset:
org_name = getattr(obj, org_name_field)
normalized_name = normalize_string(org_name)
grouped_dict.setdefault(normalized_name, []).append(obj)
return grouped_dict
for request in requests:
normalized_name = normalize_string(request.organization_name)
requests_dict.setdefault(normalized_name, []).append(request)
def _get_existing_suborgs_for_portfolio(self, portfolio):
"""Get existing suborganizations for a portfolio."""
if not portfolio.pk:
return {}
# Third: Parse through each group of domains that have the same organization names,
# then create *one* suborg record from it.
# Normalize all suborg names so we don't add duplicate data unintentionally.
for portfolio_name, portfolio in portfolio_dict.items():
# For a given agency, find all domains that list suborg info for it.
for norm_org_name, domains in domains_dict.items():
# Don't add the record if the suborg name would equal the portfolio name
if norm_org_name == portfolio_name:
continue
existing_suborgs = portfolio.portfolio_suborganizations.all()
return {normalize_string(org.name): org for org in existing_suborgs}
new_suborg_name = None
if len(domains) == 1:
new_suborg_name = normalize_string(domains[0].organization_name, lowercase=False)
elif len(domains) > 1:
# Pick the best record for a suborg name (fewest spaces, most leading capitals)
best_record = max(
domains,
key=lambda rank: (
-domain.organization_name.count(" "),
count_capitals(domain.organization_name, leading_only=True),
),
)
new_suborg_name = normalize_string(best_record.organization_name, lowercase=False)
def _get_portfolio_created_suborgs(self, created_suborgs, portfolio):
"""Get suborganizations created in this batch for the given portfolio."""
portfolio_created_suborgs = {}
for comp_key, suborg in created_suborgs.items():
if suborg.portfolio == portfolio and ":" in comp_key:
norm_name = comp_key.split(":", 1)[1]
portfolio_created_suborgs[norm_name] = suborg
return portfolio_created_suborgs
# If the suborg already exists, don't add it again.
if norm_org_name not in suborg_dict and norm_org_name not in created_suborgs:
requests = requests_dict.get(norm_org_name)
suborg = Suborganization(name=new_suborg_name, portfolio=portfolio)
self.set_suborganization_location(suborg, domains, requests)
created_suborgs[norm_org_name] = suborg
return created_suborgs
def _create_suborgs_for_portfolio(
self,
portfolio_name,
portfolio,
domains_dict,
requests_dict,
existing_suborgs,
portfolio_created_suborgs,
created_suborgs,
):
"""Create suborganizations for a specific portfolio."""
for norm_org_name, domains in domains_dict.items():
# Skip if suborg name would equal portfolio name
if norm_org_name == portfolio_name:
continue
# Skip if suborg already exists
if self._suborg_already_exists(norm_org_name, existing_suborgs, portfolio_created_suborgs):
continue
# Create new suborganization
suborg = self._create_new_suborganization(norm_org_name, domains, requests_dict, portfolio)
# Add to created suborgs with composite key
portfolio_identifier = portfolio.pk if portfolio.pk else id(portfolio)
composite_key = f"{portfolio_identifier}:{norm_org_name}"
created_suborgs[composite_key] = suborg
self._log_action("CREATE", f"suborganization '{suborg}' for portfolio '{portfolio}'")
def _suborg_already_exists(self, norm_org_name, existing_suborgs, portfolio_created_suborgs):
"""Check if suborganization already exists in portfolio."""
if norm_org_name in existing_suborgs:
existing_suborg = existing_suborgs[norm_org_name]
self._log_action(
"SKIP", f"suborganization '{existing_suborg}' already exists in portfolio '{existing_suborg.portfolio}'"
)
return True
return norm_org_name in portfolio_created_suborgs
def _create_new_suborganization(self, norm_org_name, domains, requests_dict, portfolio):
"""Create a new suborganization object."""
suborg_name = self._determine_best_suborg_name(domains)
requests = requests_dict.get(norm_org_name)
suborg = Suborganization(name=suborg_name, portfolio=portfolio)
self.set_suborganization_location(suborg, domains, requests)
return suborg
def _determine_best_suborg_name(self, domains):
"""Determine the best name for a suborganization from domain records."""
if len(domains) == 1:
return normalize_string(domains[0].organization_name, lowercase=False)
# Pick the best record (fewest spaces, most leading capitals)
best_record = max(
domains,
key=lambda domain: (
-domain.organization_name.count(" "),
count_capitals(domain.organization_name, leading_only=True),
),
)
return normalize_string(best_record.organization_name, lowercase=False)
def set_suborganization_location(self, suborg, domains, requests):
"""Updates a single suborganization's location data if valid.
@ -476,11 +705,46 @@ class Command(BaseCommand):
Returns a queryset of DomainInformation objects, or None if nothing changed.
"""
updated_domains = set()
domain_infos = federal_agency.domaininformation_set.all()
# Get all domain organization names
all_domain_org_names = list(DomainInformation.objects.values_list("organization_name", flat=True).distinct())
# Use fuzzy matching to find domain information records that belong to this agency
# This creates a filter that matches domains in two ways:
# 1. Direct relationship: domains already linked to this federal agency
# 2. Fuzzy name matching: domains with organization names that are similar
# to this agency's name (handles abbreviations, variations, etc.)
#
# e.g., if federal_agency is "Department of Defense", this will find:
# - Domains already linked to DoD (direct relationship)
# - Domains with org names like "DoD", "Defense Dept", "US Dept of Defense" (fuzzy matching)
# - This helps capture domains that should belong to this agency but weren't
# properly linked due to name variations in the organization_name field
domain_filter = self._create_fuzzy_organization_filter(
federal_agency, [normalize_string(name) for name in all_domain_org_names if name]
)
domain_infos = DomainInformation.objects.filter(domain_filter)
if debug:
logger.info(
f"Fuzzy matching found {domain_infos.count()} domain information records for '{federal_agency.agency}'"
)
for domain_info in domain_infos:
org_name = normalize_string(domain_info.organization_name, lowercase=False)
org_name = normalize_string(domain_info.organization_name)
new_suborg = suborgs.get(org_name, None)
# ADD DRY RUN CHANGE TRACKING:
changes = []
if domain_info.portfolio != portfolio:
changes.append(f"portfolio: {domain_info.portfolio}{portfolio}")
if domain_info.sub_organization != new_suborg:
changes.append(f"sub_organization: {domain_info.sub_organization}{new_suborg}")
# Log changes in dry run mode
self._log_changes(f"domain '{domain_info.domain}'", changes)
# Apply changes (these will still be tracked but not saved in dry run)
domain_info.portfolio = portfolio
domain_info.sub_organization = suborgs.get(org_name, None)
domain_info.sub_organization = new_suborg
updated_domains.add(domain_info)
if not updated_domains and debug:
@ -489,13 +753,7 @@ class Command(BaseCommand):
return updated_domains
def update_requests(
self,
portfolio,
federal_agency,
suborgs,
debug,
):
def update_requests(self, portfolio, federal_agency, suborgs, debug):
"""
Associate portfolio with domain requests for a federal agency.
Updates all relevant domain request records.
@ -505,28 +763,29 @@ class Command(BaseCommand):
DomainRequest.DomainRequestStatus.INELIGIBLE,
DomainRequest.DomainRequestStatus.REJECTED,
]
domain_requests = federal_agency.domainrequest_set.exclude(status__in=invalid_states)
# Add portfolio, sub_org, requested_suborg, suborg_city, and suborg_state_territory.
# For started domain requests, set the federal agency to None if not on a portfolio.
# Get all request organization names for fuzzy matching
all_request_org_names = list(
DomainRequest.objects.exclude(status__in=invalid_states)
.values_list("organization_name", flat=True)
.distinct()
)
# Use fuzzy matching to find domain requests that belong to this agency
request_filter = self._create_fuzzy_organization_filter(
federal_agency, [normalize_string(name) for name in all_request_org_names if name]
)
domain_requests = DomainRequest.objects.filter(request_filter).exclude(status__in=invalid_states)
if debug:
logger.info(f"Fuzzy matching found {domain_requests.count()} domain requests for '{federal_agency.agency}'")
# Process each domain request
for domain_request in domain_requests:
if domain_request.status != DomainRequest.DomainRequestStatus.STARTED:
org_name = normalize_string(domain_request.organization_name, lowercase=False)
domain_request.portfolio = portfolio
domain_request.sub_organization = suborgs.get(org_name, None)
if domain_request.sub_organization is None:
domain_request.requested_suborganization = normalize_string(
domain_request.organization_name, lowercase=False
)
domain_request.suborganization_city = normalize_string(domain_request.city, lowercase=False)
domain_request.suborganization_state_territory = domain_request.state_territory
self._update_active_request(domain_request, portfolio, suborgs)
else:
# Clear the federal agency for started domain requests
agency_name = normalize_string(domain_request.federal_agency.agency)
portfolio_name = normalize_string(portfolio.organization_name)
if agency_name == portfolio_name:
domain_request.federal_agency = None
logger.info(f"Set federal agency on started domain request '{domain_request}' to None.")
self._handle_started_request(domain_request, portfolio)
updated_domain_requests.add(domain_request)
if not updated_domain_requests and debug:
@ -535,6 +794,47 @@ class Command(BaseCommand):
return updated_domain_requests
def _update_active_request(self, domain_request, portfolio, suborgs):
"""Update an active (non-started) domain request."""
org_name = normalize_string(domain_request.organization_name)
new_suborg = suborgs.get(org_name, None)
# Track changes for dry run
changes = []
if domain_request.portfolio != portfolio:
changes.append(f"portfolio: {domain_request.portfolio}{portfolio}")
if domain_request.sub_organization != new_suborg:
changes.append(f"sub_organization: {domain_request.sub_organization}{new_suborg}")
# Log changes in dry run mode
self._log_changes(f"request '{domain_request}'", changes)
# Apply changes
domain_request.portfolio = portfolio
domain_request.sub_organization = new_suborg
if domain_request.sub_organization is None:
domain_request.requested_suborganization = normalize_string(
domain_request.organization_name, lowercase=False
)
domain_request.suborganization_city = normalize_string(domain_request.city, lowercase=False)
domain_request.suborganization_state_territory = domain_request.state_territory
def _handle_started_request(self, domain_request, portfolio):
"""Handle started domain requests by clearing federal agency if needed."""
if not domain_request.federal_agency:
return
agency_name = normalize_string(domain_request.federal_agency.agency)
portfolio_name = normalize_string(portfolio.organization_name)
if agency_name == portfolio_name:
if self.dry_run:
logger.info(f"WOULD SET federal agency on started domain request '{domain_request}' to None.")
else:
domain_request.federal_agency = None
logger.info(f"Set federal agency on started domain request '{domain_request}' to None.")
def create_user_portfolio_permissions(self, domains):
user_domain_roles = UserDomainRole.objects.select_related(
"user", "domain", "domain__domain_info", "domain__domain_info__portfolio"
@ -571,3 +871,47 @@ class Command(BaseCommand):
self.portfolio_invitation_changes.create.append(invitation)
else:
self.portfolio_invitation_changes.skip.append(invitation)
def _log_action(self, action_type, obj, message=None):
"""
Log an action that would be performed, with dry run support.
Args:
action_type: Type of action ('CREATE', 'UPDATE', 'DELETE')
obj: Object being acted upon
message: Optional custom message
"""
action_text = f"WOULD {action_type}" if self.dry_run else action_type.title()
obj_repr = message or str(obj)
color = TerminalColors.OKGREEN
if action_type == "UPDATE":
color = TerminalColors.YELLOW
elif action_type == "DELETE":
color = TerminalColors.FAIL
logger.info(f"{color}{action_text} {obj_repr}{TerminalColors.ENDC}")
def _log_changes(self, obj, changes):
"""Log what changes would be made to an object in dry run mode."""
if self.dry_run and changes:
logger.info(f" WOULD UPDATE {obj}: {', '.join(changes)}")
def _get_suborgs_for_portfolio(self, portfolio, created_suborgs):
"""Get all suborganizations for a portfolio"""
suborgs = {}
# Always add just-created suborganizations
if created_suborgs:
for composite_key, suborg in created_suborgs.items():
if suborg.portfolio == portfolio:
suborgs[normalize_string(suborg.name)] = suborg
# In normal execution, also add existing suborganizations from the database
if not self.dry_run:
for suborg in portfolio.portfolio_suborganizations.all():
normalized_name = normalize_string(suborg.name)
if normalized_name not in suborgs: # Don't overwrite just-created ones
suborgs[normalized_name] = suborg
return suborgs

View file

@ -0,0 +1,409 @@
"""
Generic fuzzy string matching utility for any string comparison needs
This util provides fuzzy string matching. It handles common variations
in naming conventions, such as:
- Abbreviations (e.g. "Department of" vs "Dept of")
- Punctuation (e.g. "U.S." vs "US")
- Word order (e.g. "John Smith" vs "Smith, John")
- Case insensitivity
- Common misspellings and typos
- Variants for federal agency names
It can be configured with different matching strategies and thresholds
to suit specific use cases, and supports detailed match reporting.
It also supports batch processing of multiple target strings against a pool of candidates.
This utility is designed to be flexible and extensible for various fuzzy matching needs.
"""
import logging
from typing import Set, List, Dict, Optional, Callable, Tuple
from dataclasses import dataclass, field
from rapidfuzz import fuzz, process
from registrar.models.utility.generic_helper import normalize_string
logger = logging.getLogger(__name__)
@dataclass
class MatchingStrategy:
"""Configuration for a single fuzzy matching strategy."""
scorer: Callable
threshold: int
name: str
weight: float = 1.0 # For weighted scoring if needed
@dataclass
class MatchResult:
"""Result of a fuzzy matching operation."""
matched_strings: Set[str]
match_details: List[Tuple[str, float, str]] = field(default_factory=list)
variants_used: Set[str] = field(default_factory=set)
def get_best_matches(self, limit: int = 10) -> List[Tuple[str, float, str]]:
"""Get the top N matches sorted by score."""
return sorted(self.match_details, key=lambda x: x[1], reverse=True)[:limit]
class StringVariantGenerator:
"""Base class for generating string variants."""
def generate_variants(self, input_string: str) -> Set[str]:
"""Generate variants of the input string."""
raise NotImplementedError("Subclasses must implement generate_variants")
class FederalAgencyVariantGenerator(StringVariantGenerator):
"""Generates variants specific to federal agency names."""
# Common abbreviation mappings for federal agencies
ABBREVIATION_MAPPINGS = [
("Department of", "Dept of", "Dept. of"),
("Administration", "Admin"),
("Agency", "Agcy"),
("United States", "US", "U.S."),
("Federal", "Fed"),
("National", "Nat'l", "Natl"),
]
def generate_variants(self, agency_name: str) -> Set[str]:
"""Generate federal agency name variants."""
variants = {normalize_string(agency_name)}
variants.update(self._get_us_prefix_variants(agency_name))
variants.update(self._get_the_prefix_variants(agency_name))
variants.update(self._get_abbreviation_variants(agency_name))
variants.update(self._get_punctuation_variants(agency_name))
return variants
def _get_us_prefix_variants(self, agency_name: str) -> Set[str]:
"""Generate U.S./US prefix variations."""
variants = set()
if agency_name.startswith("U.S. "):
variants.add(normalize_string(agency_name[4:]))
variants.add(normalize_string("US " + agency_name[4:]))
variants.add(normalize_string("United States " + agency_name[4:]))
elif agency_name.startswith("US "):
variants.add(normalize_string(agency_name[3:]))
variants.add(normalize_string("U.S. " + agency_name[3:]))
variants.add(normalize_string("United States " + agency_name[3:]))
elif agency_name.startswith("United States "):
variants.add(normalize_string(agency_name[14:]))
variants.add(normalize_string("U.S. " + agency_name[14:]))
variants.add(normalize_string("US " + agency_name[14:]))
else:
variants.add(normalize_string("U.S. " + agency_name))
variants.add(normalize_string("US " + agency_name))
variants.add(normalize_string("United States " + agency_name))
return variants
def _get_the_prefix_variants(self, agency_name: str) -> Set[str]:
"""Generate 'The' prefix variations."""
variants = set()
if agency_name.startswith("The "):
variants.add(normalize_string(agency_name[4:]))
else:
variants.add(normalize_string("The " + agency_name))
return variants
def _get_abbreviation_variants(self, agency_name: str) -> Set[str]:
"""Generate common abbreviation variants."""
variants = set()
for full_form, *abbreviations in self.ABBREVIATION_MAPPINGS:
if full_form in agency_name:
for abbrev in abbreviations:
variants.add(normalize_string(agency_name.replace(full_form, abbrev)))
else:
# Try reverse mapping (abbrev -> full form)
for abbrev in abbreviations:
if abbrev in agency_name:
variants.add(normalize_string(agency_name.replace(abbrev, full_form)))
return variants
def _get_punctuation_variants(self, agency_name: str) -> Set[str]:
"""Generate punctuation variations."""
variants = set()
# Remove all punctuation
no_punct = normalize_string(agency_name.replace(".", "").replace(",", "").replace("-", " "))
variants.add(no_punct)
# Common punctuation replacements
variants.add(normalize_string(agency_name.replace("&", "and")))
variants.add(normalize_string(agency_name.replace(" and ", " & ")))
return variants
class GenericFuzzyMatcher:
"""
Generic fuzzy string matcher that can be configured for different use cases.
This class provides flexible fuzzy matching with:
- Configurable matching strategies
- Pluggable variant generators
- Detailed match reporting
- Threshold customization per strategy
"""
# Default matching strategies
DEFAULT_STRATEGIES = [
MatchingStrategy(fuzz.token_sort_ratio, 85, "token_sort"),
MatchingStrategy(fuzz.token_set_ratio, 85, "token_set"),
MatchingStrategy(fuzz.partial_ratio, 90, "partial"),
MatchingStrategy(fuzz.ratio, 90, "exact"),
]
def __init__(
self,
strategies: Optional[List[MatchingStrategy]] = None,
variant_generator: Optional[StringVariantGenerator] = None,
global_threshold: int = 85,
):
"""
Initialize the generic fuzzy matcher.
Args:
strategies: List of matching strategies to use
variant_generator: Generator for string variants
global_threshold: Default threshold for strategies that don't specify one
"""
self.strategies = strategies or self.DEFAULT_STRATEGIES
self.variant_generator = variant_generator
self.global_threshold = global_threshold
def find_matches(
self,
target_string: str,
candidate_strings: List[str],
include_variants: bool = True,
report_details: bool = False,
) -> MatchResult:
"""
Find strings that closely match the target string.
Args:
target_string: The string to match against
candidate_strings: List of strings to search through
include_variants: Whether to include generated variants in matching
report_details: Whether to include detailed match information
Returns:
MatchResult containing matched strings and optional details
"""
if not target_string or not candidate_strings:
return MatchResult(matched_strings=set())
target_variants, variants_used = self._prepare_target_variants(target_string, include_variants)
matched_strings: Set[str] = set()
all_match_details: List[Tuple[str, float, str]] = []
# Exact string matching
self._perform_exact_matching(
target_variants, candidate_strings, matched_strings, all_match_details, report_details
)
# Fuzzy matching
self._perform_fuzzy_matching(
target_variants, candidate_strings, matched_strings, all_match_details, report_details
)
return MatchResult(
matched_strings=matched_strings,
match_details=all_match_details if report_details else [],
variants_used=variants_used,
)
def _prepare_target_variants(self, target_string: str, include_variants: bool) -> Tuple[Set[str], Set[str]]:
"""Prepare target string variants for matching."""
normalized_target = normalize_string(target_string)
target_variants = {normalized_target}
variants_used = {normalized_target}
if include_variants and self.variant_generator:
generated_variants = self.variant_generator.generate_variants(target_string)
target_variants.update(generated_variants)
variants_used = target_variants.copy()
return target_variants, variants_used
def _perform_exact_matching(
self,
target_variants: Set[str],
candidate_strings: List[str],
matched_strings: Set[str],
all_match_details: List[Tuple[str, float, str]],
report_details: bool,
) -> None:
"""Perform exact string matching against target variants."""
normalized_candidates = [normalize_string(candidate) for candidate in candidate_strings]
for i, normalized_candidate in enumerate(normalized_candidates):
if normalized_candidate in target_variants:
matched_strings.add(candidate_strings[i])
if report_details:
all_match_details.append((candidate_strings[i], 100.0, "exact_string_match"))
def _perform_fuzzy_matching(
self,
target_variants: Set[str],
candidate_strings: List[str],
matched_strings: Set[str],
all_match_details: List[Tuple[str, float, str]],
report_details: bool,
) -> None:
"""Perform fuzzy matching using configured strategies."""
for target_variant in target_variants:
for strategy in self.strategies:
self._apply_matching_strategy(
target_variant, candidate_strings, strategy, matched_strings, all_match_details, report_details
)
def _apply_matching_strategy(
self,
target_variant: str,
candidate_strings: List[str],
strategy: MatchingStrategy,
matched_strings: Set[str],
all_match_details: List[Tuple[str, float, str]],
report_details: bool,
) -> None:
"""Apply a single matching strategy to find matches."""
try:
threshold = getattr(strategy, "threshold", self.global_threshold)
matches = process.extract(
target_variant,
candidate_strings,
scorer=strategy.scorer,
score_cutoff=threshold,
limit=None,
)
for match_string, score, _ in matches:
# Only add if not already found by exact matching
if match_string not in matched_strings:
matched_strings.add(match_string)
if report_details:
self._add_match_detail(all_match_details, match_string, score, strategy.name)
except Exception as e:
logger.warning(f"Error in fuzzy matching with strategy {strategy.name}: {e}")
def _add_match_detail(
self,
all_match_details: List[Tuple[str, float, str]],
match_string: str,
score: float,
strategy_name: str,
) -> None:
"""Add match detail if it doesn't already exist."""
existing_detail = next(
(detail for detail in all_match_details if detail[0] == match_string and detail[2] == strategy_name),
None,
)
if not existing_detail:
all_match_details.append((match_string, score, strategy_name))
def find_best_match(
self, target_string: str, candidate_strings: List[str], include_variants: bool = True
) -> Optional[Tuple[str, float]]:
"""
Find the single best match for the target string.
Returns:
Tuple of (best_match_string, score) or None if no matches found
"""
result = self.find_matches(target_string, candidate_strings, include_variants, report_details=True)
if not result.match_details:
return None
best_match = max(result.match_details, key=lambda x: x[1])
return (best_match[0], best_match[1])
def batch_find_matches(
self, target_strings: List[str], candidate_strings: List[str], include_variants: bool = True
) -> Dict[str, MatchResult]:
"""
Find matches for multiple target strings efficiently.
Returns:
Dictionary mapping each target string to its MatchResult
"""
results = {}
for target in target_strings:
results[target] = self.find_matches(target, candidate_strings, include_variants, report_details=True)
return results
class FuzzyMatchingTestRunner:
"""Utility for testing and reporting fuzzy matching results."""
def __init__(self, matcher: GenericFuzzyMatcher):
self.matcher = matcher
def generate_test_report(
self, target_strings: List[str], candidate_strings: List[str], max_display: int = 10
) -> str:
"""
Generate a comprehensive test report for fuzzy matching.
Args:
target_strings: Strings to match against
candidate_strings: Pool of candidates to search
max_display: Maximum matches to display per target
Returns:
Formatted report string
"""
report_lines = []
report_lines.append("=" * 70)
report_lines.append(" FUZZY MATCHING TEST REPORT")
report_lines.append("=" * 70)
for target in target_strings:
result = self.matcher.find_matches(target, candidate_strings, include_variants=True, report_details=True)
report_lines.append(f"\nTarget: '{target}'")
report_lines.append("-" * 50)
if result.variants_used:
report_lines.append(f"Variants tested: {len(result.variants_used)}")
sample_variants = list(result.variants_used)[:5]
report_lines.append(f"Sample variants: {sample_variants}")
best_matches = result.get_best_matches(max_display)
report_lines.append(f"\nTop matches found: {len(best_matches)}")
for match_string, score, strategy in best_matches:
report_lines.append(f"{match_string} (score: {score:.1f}, strategy: {strategy})")
if len(result.matched_strings) > max_display:
remaining = len(result.matched_strings) - max_display
report_lines.append(f" ... and {remaining} more matches")
return "\n".join(report_lines)
# Factory functions for common use cases
def create_federal_agency_matcher(threshold: int = 85) -> GenericFuzzyMatcher:
"""Create a fuzzy matcher optimized for federal agency names."""
# Use default strategies but override their thresholds
return GenericFuzzyMatcher(variant_generator=FederalAgencyVariantGenerator(), global_threshold=threshold)
def create_basic_string_matcher(threshold: int = 85) -> GenericFuzzyMatcher:
"""Create a basic fuzzy matcher without variant generation."""
return GenericFuzzyMatcher(global_threshold=threshold)

View file

@ -88,7 +88,7 @@ class DomainInformation(TimeStampedModel):
# ##### data fields from the initial form #####
generic_org_type = models.CharField(
max_length=255,
choices=OrganizationChoices.choices,
choices=OrganizationChoices.choices, # type: ignore[misc]
null=True,
blank=True,
help_text="Type of organization",
@ -155,7 +155,7 @@ class DomainInformation(TimeStampedModel):
)
state_territory = models.CharField(
max_length=2,
choices=StateTerritoryChoices.choices,
choices=StateTerritoryChoices.choices, # type: ignore[misc]
null=True,
blank=True,
verbose_name="state, territory, or military post",

View file

@ -42,7 +42,7 @@ class Portfolio(TimeStampedModel):
organization_type = models.CharField(
max_length=255,
choices=OrganizationChoices.choices,
choices=OrganizationChoices.choices, # type: ignore[misc]
null=True,
blank=True,
)
@ -88,7 +88,7 @@ class Portfolio(TimeStampedModel):
# (imports enums from domain_request.py)
state_territory = models.CharField(
max_length=2,
choices=StateTerritoryChoices.choices,
choices=StateTerritoryChoices.choices, # type: ignore[misc]
null=True,
blank=True,
verbose_name="state, territory, or military post",

View file

@ -33,7 +33,7 @@ class TransitionDomain(TimeStampedModel):
null=False,
blank=True,
default=StatusChoices.READY,
choices=StatusChoices.choices,
choices=StatusChoices.choices, # type: ignore[misc]
verbose_name="status",
help_text="domain status during the transfer",
)

View file

@ -0,0 +1,233 @@
from django.test import TestCase
from registrar.models import User, FederalAgency
from registrar.management.commands.utility.fuzzy_string_matcher import (
create_federal_agency_matcher,
create_basic_string_matcher,
MatchResult,
FederalAgencyVariantGenerator,
GenericFuzzyMatcher,
MatchingStrategy,
)
from rapidfuzz import fuzz
class TestFuzzyStringMatcher(TestCase):
def setUp(self):
self.user = User.objects.create(username="testuser")
self.federal_agency = FederalAgency.objects.create(agency="Test Federal Agency")
def tearDown(self):
FederalAgency.objects.all().delete()
User.objects.all().delete()
def test_federal_agency_matcher_creation(self):
"""Test creating a federal agency matcher with different thresholds"""
matcher = create_federal_agency_matcher(threshold=85)
self.assertIsInstance(matcher, GenericFuzzyMatcher)
self.assertIsInstance(matcher.variant_generator, FederalAgencyVariantGenerator)
self.assertEqual(matcher.global_threshold, 85)
def test_basic_string_matcher_creation(self):
"""Test creating a basic string matcher without variants"""
matcher = create_basic_string_matcher(threshold=75)
self.assertIsInstance(matcher, GenericFuzzyMatcher)
self.assertIsNone(matcher.variant_generator)
self.assertEqual(matcher.global_threshold, 75)
def test_federal_agency_exact_match(self):
"""Test exact matching for federal agencies"""
matcher = create_federal_agency_matcher(threshold=85)
candidates = [
"Department of Defense",
"Department of Agriculture",
"Federal Bureau of Investigation",
"Central Intelligence Agency",
]
result = matcher.find_matches("Department of Defense", candidates)
self.assertIsInstance(result, MatchResult)
self.assertIn("Department of Defense", result.matched_strings)
self.assertGreater(len(result.matched_strings), 0)
def test_federal_agency_abbreviation_matching(self):
"""Test that federal agency abbreviations are matched correctly"""
matcher = create_federal_agency_matcher(threshold=80)
candidates = ["Department of Defense", "Dept of Defense", "DoD", "Department of Agriculture"]
# Should match both full name and abbreviations
result = matcher.find_matches("Department of Defense", candidates)
# Should find multiple matches due to variant generation
self.assertGreater(len(result.matched_strings), 1)
self.assertIn("Department of Defense", result.matched_strings)
def test_federal_agency_us_prefix_variants(self):
"""Test U.S. prefix variant generation"""
generator = FederalAgencyVariantGenerator()
variants = generator.generate_variants("U.S. Department of Defense")
# Should include variants without U.S. prefix
variant_strings = [v.lower() for v in variants]
self.assertTrue(any("department of defense" in v for v in variant_strings))
self.assertTrue(any("us department of defense" in v for v in variant_strings))
def test_match_result_functionality(self):
"""Test MatchResult class functionality"""
matcher = create_federal_agency_matcher(threshold=80)
candidates = ["Department of Defense", "Dept of Defense", "Defense Department", "Department of Agriculture"]
result = matcher.find_matches("Department of Defense", candidates, report_details=True)
# Test MatchResult methods
self.assertIsInstance(result.matched_strings, set)
self.assertIsInstance(result.match_details, list)
self.assertIsInstance(result.variants_used, set)
# Test get_best_matches
best_matches = result.get_best_matches(limit=2)
self.assertLessEqual(len(best_matches), 2)
# Each match detail should be a 3-tuple
for match_string, score, strategy_name in result.match_details:
self.assertIsInstance(match_string, str)
self.assertIsInstance(score, (int, float))
self.assertIsInstance(strategy_name, str)
def test_find_best_match(self):
"""Test finding the single best match"""
matcher = create_federal_agency_matcher(threshold=80)
candidates = ["Department of Defense", "Department of Agriculture", "Dept of Defense"]
best_match = matcher.find_best_match("Department of Defense", candidates)
self.assertIsNotNone(best_match)
match_string, score = best_match
self.assertEqual(match_string, "Department of Defense")
self.assertGreater(score, 95) # Should be very high for exact match
def test_batch_matching(self):
"""Test batch processing of multiple targets"""
matcher = create_federal_agency_matcher(threshold=80)
targets = ["Department of Defense", "FBI", "CIA"]
candidates = [
"Department of Defense",
"Federal Bureau of Investigation",
"Central Intelligence Agency",
"Department of Agriculture",
]
results = matcher.batch_find_matches(targets, candidates)
self.assertEqual(len(results), 3)
for target in targets:
self.assertIn(target, results)
self.assertIsInstance(results[target], MatchResult)
def test_no_matches_scenario(self):
"""Test behavior when no matches are found"""
matcher = create_federal_agency_matcher(threshold=95) # Very high threshold
candidates = ["Completely Different Agency"]
result = matcher.find_matches("Department of Defense", candidates)
self.assertEqual(len(result.matched_strings), 0)
self.assertEqual(len(result.match_details), 0)
def test_matching_with_variants_disabled(self):
"""Test matching with variant generation disabled"""
matcher = create_federal_agency_matcher(threshold=85)
candidates = ["Department of Defense", "Dept of Defense"]
# With variants disabled, should only match exact or very similar strings
result = matcher.find_matches("DoD", candidates, include_variants=False)
# Might not find matches since variants are disabled
self.assertIsInstance(result, MatchResult)
def test_custom_matching_strategies(self):
"""Test creating matcher with custom strategies"""
custom_strategies = [
MatchingStrategy(fuzz.ratio, 90, "exact_ratio"),
MatchingStrategy(fuzz.partial_ratio, 85, "partial_ratio"),
]
matcher = GenericFuzzyMatcher(
strategies=custom_strategies, variant_generator=FederalAgencyVariantGenerator(), global_threshold=80
)
candidates = ["Department of Defense", "Dept of Defense"]
result = matcher.find_matches("Department of Defense", candidates, report_details=True)
# Check that our custom strategies were used
strategy_names = [detail[2] for detail in result.match_details]
self.assertTrue(any("exact_ratio" in name for name in strategy_names))
def test_rapidfuzz_integration(self):
"""Test that rapidfuzz integration works correctly (this was the original bug)"""
from rapidfuzz import process, fuzz
query = "Test Federal Agency"
choices = ["Test Federal Agency", "Another Agency", "Test Federal Agency Subunit"]
# This should return 3-tuples and not cause ValueError
matches = process.extract(query, choices, scorer=fuzz.token_sort_ratio, score_cutoff=85, limit=None)
# Verify the format
self.assertIsInstance(matches, list)
if matches:
first_match = matches[0]
self.assertEqual(len(first_match), 3)
# Should be able to unpack as 3-tuple
match_string, score, index = first_match
self.assertIsInstance(match_string, str)
self.assertIsInstance(score, (int, float))
self.assertIsInstance(index, int)
def test_create_federal_portfolio_integration(self):
"""Test the exact scenario used in create_federal_portfolio command"""
matcher = create_federal_agency_matcher(threshold=85)
# Simulate real data from create_federal_portfolio
target_agency_name = "Test Federal Agency"
all_org_names = ["Test Federal Agency", "Testorg", "Test Federal Agency Division", "Another Organization"]
result = matcher.find_matches(target_agency_name, all_org_names)
self.assertIsInstance(result, MatchResult)
self.assertIn("Test Federal Agency", result.matched_strings)
self.assertGreater(len(result.matched_strings), 0)
def test_empty_input_handling(self):
"""Test handling of empty inputs"""
matcher = create_federal_agency_matcher(threshold=85)
# Empty candidates list
result = matcher.find_matches("Test Agency", [])
self.assertEqual(len(result.matched_strings), 0)
# Empty target string
result = matcher.find_matches("", ["Test Agency"])
self.assertIsInstance(result, MatchResult)
def test_special_characters_handling(self):
"""Test handling of special characters and punctuation"""
matcher = create_federal_agency_matcher(threshold=80)
candidates = ["U.S. Department of Defense", "Department of Veterans Affairs", "Health & Human Services"]
# Should handle punctuation variants
result = matcher.find_matches("US Department of Defense", candidates)
self.assertGreater(len(result.matched_strings), 0)

View file

@ -67,3 +67,4 @@ urllib3==2.3.0; python_version >= '3.9'
whitenoise==6.9.0; python_version >= '3.9'
zope.event==5.0; python_version >= '3.7'
zope.interface==7.2; python_version >= '3.8'
rapidfuzz==3.4.0; python_version >= '3.8'

View file

@ -76,6 +76,7 @@
10038 OUTOFSCOPE http://app:8080/suborganization/
10038 OUTOFSCOPE http://app:8080/transfer/
10038 OUTOFSCOPE http://app:8080/prototype-dns
10038 OUTOFSCOPE http://app:8080/.*404.*
# This URL always returns 404, so include it as well.
10038 OUTOFSCOPE http://app:8080/todo
# OIDC isn't configured in the test environment and DEBUG=True so this gives a 500 without CSP headers