Merge pull request #534 from cisagov/nmb/migration

This commit is contained in:
Neil MartinsenBurrell 2023-04-18 15:44:58 -05:00 committed by GitHub
commit 8a22c6f9a5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 224 additions and 0 deletions

View file

@ -0,0 +1,76 @@
"""Load domain invitations for existing domains and their contacts."""
import csv
import logging
from collections import defaultdict
from django.core.management import BaseCommand
from registrar.models import Domain, DomainInvitation
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Load invitations for existing domains and their users."
def add_arguments(self, parser):
"""Add our two filename arguments."""
parser.add_argument(
"domain_contacts_filename",
help="Data file with domain contact information",
)
parser.add_argument(
"contacts_filename", help="Data file with contact information"
)
parser.add_argument("--sep", default="|", help="Delimiter character")
def handle(self, domain_contacts_filename, contacts_filename, **options):
"""Load the data files and create the DomainInvitations."""
sep = options.get("sep")
# We open the domain file first and hold it in memory.
# There are three contacts per domain, so there should be at
# most 3*N different contacts here.
contact_domains = defaultdict(list) # each contact has a list of domains
logger.info("Reading domain-contacts data file %s", domain_contacts_filename)
with open(domain_contacts_filename, "r") as domain_file:
for row in csv.reader(domain_file, delimiter=sep):
# fields are just domain, userid, role
# lowercase the domain names now
contact_domains[row[1]].append(row[0].lower())
logger.info("Loaded domains for %d contacts", len(contact_domains))
# now we have a mapping of user IDs to lists of domains for that user
# iterate over the contacts list and for contacts in our mapping,
# create the domain invitations for their email address
logger.info("Reading contacts data file %s", contacts_filename)
to_create = []
skipped = 0
with open(contacts_filename, "r") as contacts_file:
for row in csv.reader(contacts_file, delimiter=sep):
# userid is in the first field, email is the seventh
userid = row[0]
if userid not in contact_domains:
# this user has no domains, skip them
skipped += 1
continue
for domain_name in contact_domains[userid]:
email_address = row[6]
domain = Domain.objects.get(name=domain_name)
to_create.append(
DomainInvitation(
email=email_address.lower(),
domain=domain,
status=DomainInvitation.INVITED,
)
)
logger.info("Creating %d invitations", len(to_create))
DomainInvitation.objects.bulk_create(to_create)
logger.info(
"Created %d domain invitations, ignored %d contacts",
len(to_create),
skipped,
)

View file

@ -0,0 +1,69 @@
"""Load domains from registry export."""
import csv
import logging
import sys
from django.core.management.base import BaseCommand
from registrar.models import Domain
logger = logging.getLogger(__name__)
def _domain_dict_reader(file_object, **kwargs):
"""A csv DictReader with the correct field names for escrow_domains data.
All keyword arguments are sent on to the DictReader function call.
"""
# field names are from escrow_manifests without "f"
return csv.DictReader(
file_object,
fieldnames=[
"Name",
"Roid",
"IdnTableId",
"Registrant",
"ClID",
"CrRr",
"CrID",
"CrDate",
"UpRr",
"UpID",
"UpDate",
"ExDate",
"TrDate",
],
**kwargs,
)
class Command(BaseCommand):
help = "Load domain data from a delimited text file on stdin."
def add_arguments(self, parser):
parser.add_argument(
"--sep", default="|", help="Separator character for data file"
)
def handle(self, *args, **options):
separator_character = options.get("sep")
reader = _domain_dict_reader(sys.stdin, delimiter=separator_character)
# accumulate model objects so we can `bulk_create` them all at once.
domains = []
for row in reader:
name = row["Name"].lower() # we typically use lowercase domains
# Ensure that there is a `Domain` object for each domain name in
# this file and that it is active. There is a uniqueness
# constraint for active Domain objects, so we are going to account
# for that here with this check so that our later bulk_create
# should succeed
if Domain.objects.filter(name=name, is_active=True).exists():
# don't do anything, this domain is here and active
continue
else:
domains.append(Domain(name=name, is_active=True))
logger.info("Creating %d new domains", len(domains))
Domain.objects.bulk_create(domains)