This commit is contained in:
zandercymatics 2024-07-29 15:31:45 -06:00
parent 30b9525d4f
commit 60b8857815
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7

View file

@ -3,7 +3,7 @@ import csv
import logging
import os
from django.core.management import BaseCommand
from registrar.management.commands.utility.terminal_helper import TerminalHelper, TerminalColors
from registrar.management.commands.utility.terminal_helper import TerminalHelper
from registrar.models import SeniorOfficial, FederalAgency
@ -34,18 +34,20 @@ class Command(BaseCommand):
For each item in this CSV, a SeniorOffical record will be added.
Note:
If the row is SO data - it will not be added.
- If the row is missing SO data - it will not be added.
- Given we can add the row, a blank first_name will be replaced with "-"
""",
prompt_title="Do you wish to load records into the SeniorOfficial table?",
)
logger.info("Updating...")
# Get all existing data.
existing_senior_officials = SeniorOfficial.objects.all().prefetch_related("federal_agency")
existing_agencies = FederalAgency.objects.all()
self.existing_senior_officials = SeniorOfficial.objects.all().prefetch_related("federal_agency")
self.existing_agencies = FederalAgency.objects.all()
# Read the CSV
added_senior_officials, skipped_rows = [], []
self.added_senior_officials = []
self.skipped_rows = []
with open(federal_cio_csv_path, "r") as requested_file:
for row in csv.DictReader(requested_file):
# Note: the csv doesn't have a phone field, but we can try to pull one anyway.
@ -65,49 +67,55 @@ class Command(BaseCommand):
# Handle the federal_agency record seperately (db call)
agency_name = row.get("Agency").strip() if row.get("Agency") else None
if agency_name:
so_kwargs["federal_agency"] = existing_agencies.filter(agency=agency_name).first()
so_kwargs["federal_agency"] = self.existing_agencies.filter(agency=agency_name).first()
# Check if at least one field has a non-empty value
if row and any(so_kwargs.values()):
# WORKAROUND: Placeholder value for first name,
# as not having these makes it impossible to access through DJA.
old_first_name = so_kwargs["first_name"]
if not so_kwargs["first_name"]:
so_kwargs["first_name"] = "-"
# Create a new SeniorOfficial object
new_so = SeniorOfficial(**so_kwargs)
# Store a variable for the console logger
if any([old_first_name, new_so.last_name]):
record_display = new_so
else:
record_display = so_kwargs
# Before adding this record, check to make sure we aren't adding a duplicate.
duplicate_field = existing_senior_officials.filter(**so_kwargs).exists()
if not duplicate_field:
added_senior_officials.append(new_so)
message = f"Creating record: {record_display}"
TerminalHelper.colorful_logger("INFO", "OKCYAN", message)
else:
# if this field is a duplicate, don't do anything
skipped_rows.append(row)
message = f"Skipping add on duplicate record: {record_display}"
TerminalHelper.colorful_logger("WARNING", "YELLOW", message)
# Split into a function: C901 'Command.handle' is too complex.
# Doesn't add it to the DB, but just inits a class of SeniorOfficial.
self.create_senior_official(so_kwargs)
else:
skipped_rows.append(row)
self.skipped_rows.append(row)
message = f"Skipping row (no data was found): {row}"
TerminalHelper.colorful_logger("WARNING", "YELLOW", message)
# Bulk create the SO fields
if len(added_senior_officials) > 0:
SeniorOfficial.objects.bulk_create(added_senior_officials)
if len(self.added_senior_officials) > 0:
SeniorOfficial.objects.bulk_create(self.added_senior_officials)
added_message = f"Added {len(added_senior_officials)} records"
added_message = f"Added {len(self.added_senior_officials)} records"
TerminalHelper.colorful_logger("INFO", "OKGREEN", added_message)
if len(skipped_rows) > 0:
skipped_message = f"Skipped {len(skipped_rows)} records"
if len(self.skipped_rows) > 0:
skipped_message = f"Skipped {len(self.skipped_rows)} records"
TerminalHelper.colorful_logger("WARNING", "MAGENTA", skipped_message)
def create_senior_official(self, so_kwargs):
"""Creates a senior official object from kwargs but does not add it to the DB"""
# WORKAROUND: Placeholder value for first name,
# as not having these makes it impossible to access through DJA.
old_first_name = so_kwargs["first_name"]
if not so_kwargs["first_name"]:
so_kwargs["first_name"] = "-"
# Create a new SeniorOfficial object
new_so = SeniorOfficial(**so_kwargs)
# Store a variable for the console logger
if any([old_first_name, new_so.last_name]):
record_display = new_so
else:
record_display = so_kwargs
# Before adding this record, check to make sure we aren't adding a duplicate.
duplicate_field = self.existing_senior_officials.filter(**so_kwargs).exists()
if not duplicate_field:
self.added_senior_officials.append(new_so)
message = f"Creating record: {record_display}"
TerminalHelper.colorful_logger("INFO", "OKCYAN", message)
else:
# if this field is a duplicate, don't do anything
self.skipped_rows.append(new_so)
message = f"Skipping add on duplicate record: {record_display}"
TerminalHelper.colorful_logger("WARNING", "YELLOW", message)