Simplify script further

This commit is contained in:
zandercymatics 2024-07-26 14:07:06 -06:00
parent 217ef53a52
commit bda5296b71
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
5 changed files with 108 additions and 89 deletions

View file

@ -0,0 +1,82 @@
import argparse
import csv
import logging
import os
from django.core.management import BaseCommand
from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper
from registrar.models import SeniorOfficial, FederalAgency
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = """Populates the SeniorOfficial table based off of a given csv"""
def add_arguments(self, parser):
"""Add command line arguments."""
parser.add_argument("senior_official_csv_path", help="A csv containing information about the SeniorOfficials")
def handle(self, senior_official_csv_path, **kwargs):
"""Loops through each valid DomainRequest object and updates its senior official field"""
# Check if the provided file path is valid.
if not os.path.isfile(senior_official_csv_path):
raise argparse.ArgumentTypeError(f"Invalid file path '{senior_official_csv_path}'")
# Get all ao data.
added_senior_officials = []
skipped_rows = []
with open(senior_official_csv_path, "r") as requested_file:
reader = csv.DictReader(requested_file)
existing_senior_officials = SeniorOfficial.objects.all().prefetch_related("federal_agency")
existing_fed_agencies = FederalAgency.objects.all()
for row in reader:
# Note: the csv doesn't have a phone field, but we can try to pull one anyway.
so_kwargs = {
"first_name": row.get("First Name"),
"last_name": row.get("Last Name"),
"title": row.get("Role/Position"),
"email": row.get("Email"),
"phone": row.get("Phone"),
}
# Only first_name, last_name, and title are required
required_fields = ["first_name", "last_name", "title"]
if row and all(so_kwargs[field] for field in required_fields):
_agency = row.get("Agency")
if _agency:
_federal_agency = existing_fed_agencies.filter(agency=_agency.strip()).first()
so_kwargs["federal_agency"] = _federal_agency
new_so = SeniorOfficial(**so_kwargs)
# Before adding this record, check to make sure we aren't adding a duplicate.
is_duplicate = existing_senior_officials.filter(
# Check on every field that we're adding
**{key: value for key, value in so_kwargs.items()}
).exists()
if not is_duplicate:
added_senior_officials.append(new_so)
message = f"Added record: {new_so}"
TerminalHelper.colorful_logger("INFO", "OKBLUE", message)
else:
skipped_rows.append(new_so)
message = f"Skipping add on duplicate record: {new_so}"
TerminalHelper.colorful_logger("WARNING", "YELLOW", message)
else:
skipped_rows.append(row)
message = f"Skipping row: {row}"
TerminalHelper.colorful_logger("WARNING", "YELLOW", message)
added_message = f"Added {len(added_senior_officials)} records"
TerminalHelper.colorful_logger("INFO", "OKGREEN", added_message)
if len(skipped_rows) > 0:
skipped_message = f"Skipped {len(skipped_rows)} records"
TerminalHelper.colorful_logger("WARNING", "MAGENTA", skipped_message)
SeniorOfficial.objects.bulk_create(added_senior_officials)

View file

@ -1,82 +0,0 @@
import argparse
import csv
import logging
import os
from django.core.management import BaseCommand
from registrar.management.commands.utility.terminal_helper import PopulateScriptTemplate, TerminalColors
from registrar.models import SeniorOfficial, FederalAgency
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = """Populates the SeniorOfficial table based off of a given csv"""
def add_arguments(self, parser):
"""Add command line arguments."""
parser.add_argument(
"senior_official_csv_path", help="A csv containing information about the SeniorOfficials"
)
def handle(self, senior_official_csv_path, **kwargs):
"""Loops through each valid DomainRequest object and updates its senior official field"""
# Check if the provided file path is valid.
if not os.path.isfile(senior_official_csv_path):
raise argparse.ArgumentTypeError(f"Invalid file path '{senior_official_csv_path}'")
# Get all ao data.
added_senior_officials = []
skipped_rows = []
with open(senior_official_csv_path, "r") as requested_file:
reader = csv.DictReader(requested_file)
existing_senior_officials = SeniorOfficial.objects.all()
print(f"first: {FederalAgency.objects.first()}")
for row in reader:
first_name = row.get("First Name")
last_name = row.get("Last Name")
title = row.get("Role/Position")
required_fields = [
first_name,
last_name,
title
]
if row and not None in required_fields and not "" in required_fields:
# Missing phone?
# phone
agency = FederalAgency.objects.filter(agency=row.get("agency")).first()
r = row.get("agency")
print(f"agency: {agency} vs r: {r}")
so_kwargs = {
"first_name": first_name,
"last_name": last_name,
"title": title,
"email": row.get("Email"),
"federal_agency": agency,
}
new_so = SeniorOfficial(
**so_kwargs
)
is_duplicate = existing_senior_officials.filter(
first_name=new_so.first_name, last_name=new_so.last_name, title=new_so.title,
email=new_so.email, federal_agency=new_so.federal_agency
).exists()
if not is_duplicate:
added_senior_officials.append(new_so)
logger.info(f"Added record: {new_so}")
else:
logger.info(f"Skipping add on duplicate record {new_so}")
else:
skipped_rows.append(row)
logger.info(f"Skipping row: {row}")
logger.info(f"Added list: {added_senior_officials}")
logger.info(f"Skipped list: {skipped_rows}")
logger.info(f"Added {len(added_senior_officials)} records")
logger.info(f"Skipped {len(skipped_rows)} records")
SeniorOfficial.objects.bulk_create(added_senior_officials)

View file

@ -373,3 +373,22 @@ class TerminalHelper:
logger.info(f"{TerminalColors.MAGENTA}Writing to file " f" {filepath}..." f"{TerminalColors.ENDC}")
with open(f"{filepath}", "w+") as f:
f.write(file_contents)
@staticmethod
def colorful_logger(log_level, color, message):
"""Adds some color to your log output.
Args:
log_level: str -> Desired log level. ex: "INFO", "WARNING", "ERROR"
color: str | TerminalColors -> Output color. ex: TerminalColors.YELLOW or "YELLOW"
message: str -> Message to display.
"""
log_method = getattr(logger, log_level.lower(), logger.info)
if isinstance(color, str):
terminal_color = getattr(TerminalColors, color.upper(), TerminalColors.OKBLUE)
else:
terminal_color = color
colored_message = f"{terminal_color}{message}{TerminalColors.ENDC}"
log_method(colored_message)

View file

@ -1,4 +1,4 @@
# Generated by Django 4.2.10 on 2024-07-26 18:58
# Generated by Django 4.2.10 on 2024-07-26 19:11
from django.db import migrations, models
import django.db.models.deletion
@ -11,6 +11,11 @@ class Migration(migrations.Migration):
]
operations = [
migrations.AddField(
model_name="federalagency",
name="initials",
field=models.CharField(blank=True, help_text="Agency initials", max_length=10, null=True),
),
migrations.AddField(
model_name="federalagency",
name="is_fceb",
@ -28,9 +33,4 @@ class Migration(migrations.Migration):
to="registrar.federalagency",
),
),
migrations.AlterField(
model_name="federalagency",
name="agency",
field=models.CharField(blank=True, help_text="Agency initials", max_length=10, null=True),
),
]

View file

@ -25,7 +25,7 @@ class FederalAgency(TimeStampedModel):
help_text="Federal agency type (executive, judicial, legislative, etc.)",
)
agency = models.CharField(
initials = models.CharField(
max_length=10,
null=True,
blank=True,