mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-05-17 18:09:25 +02:00
Merge branch 'main' into za/2332-display-action-needed-text
This commit is contained in:
commit
1d8993627c
14 changed files with 597 additions and 134 deletions
|
@ -32,9 +32,11 @@ For reference, the zip file will contain the following tables in csv form:
|
||||||
* DomainInformation
|
* DomainInformation
|
||||||
* DomainUserRole
|
* DomainUserRole
|
||||||
* DraftDomain
|
* DraftDomain
|
||||||
|
* FederalAgency
|
||||||
* Websites
|
* Websites
|
||||||
* Host
|
* Host
|
||||||
* HostIP
|
* HostIP
|
||||||
|
* PublicContact
|
||||||
|
|
||||||
After exporting the file from the target environment, scp the exported_tables.zip
|
After exporting the file from the target environment, scp the exported_tables.zip
|
||||||
file from the target environment to local. Run the below commands from local.
|
file from the target environment to local. Run the below commands from local.
|
||||||
|
@ -75,17 +77,25 @@ For reference, this deletes all rows from the following tables:
|
||||||
* DomainInformation
|
* DomainInformation
|
||||||
* DomainRequest
|
* DomainRequest
|
||||||
* Domain
|
* Domain
|
||||||
* User (all but the current user)
|
* User
|
||||||
* Contact
|
* Contact
|
||||||
* Websites
|
* Websites
|
||||||
* DraftDomain
|
* DraftDomain
|
||||||
* HostIP
|
* HostIP
|
||||||
* Host
|
* Host
|
||||||
|
* PublicContact
|
||||||
|
* FederalAgency
|
||||||
|
|
||||||
#### Importing into Target Environment
|
#### Importing into Target Environment
|
||||||
|
|
||||||
Once target environment is prepared, files can be imported.
|
Once target environment is prepared, files can be imported.
|
||||||
|
|
||||||
|
If importing tables from stable environment into an OT&E sandbox, there will be a difference
|
||||||
|
between the stable's registry and the sandbox's registry. Therefore, you need to run import_tables
|
||||||
|
with --skipEppSave option set to False. If you set to False, it will attempt to save PublicContact
|
||||||
|
records to the registry on load. If this is unset, or set to True, it will load the database and not
|
||||||
|
attempt to update the registry on load.
|
||||||
|
|
||||||
To scp the exported_tables.zip file from local to the sandbox, run the following:
|
To scp the exported_tables.zip file from local to the sandbox, run the following:
|
||||||
|
|
||||||
Get passcode by running:
|
Get passcode by running:
|
||||||
|
@ -107,7 +117,7 @@ cf ssh {target-app}
|
||||||
example cleaning getgov-backup:
|
example cleaning getgov-backup:
|
||||||
cf ssh getgov-backup
|
cf ssh getgov-backup
|
||||||
/tmp/lifecycle/backup
|
/tmp/lifecycle/backup
|
||||||
./manage.py import_tables
|
./manage.py import_tables --no-skipEppSave
|
||||||
|
|
||||||
For reference, this imports tables in the following order:
|
For reference, this imports tables in the following order:
|
||||||
|
|
||||||
|
@ -118,9 +128,11 @@ For reference, this imports tables in the following order:
|
||||||
* HostIP
|
* HostIP
|
||||||
* DraftDomain
|
* DraftDomain
|
||||||
* Websites
|
* Websites
|
||||||
|
* FederalAgency
|
||||||
* DomainRequest
|
* DomainRequest
|
||||||
* DomainInformation
|
* DomainInformation
|
||||||
* UserDomainRole
|
* UserDomainRole
|
||||||
|
* PublicContact
|
||||||
|
|
||||||
Optional step:
|
Optional step:
|
||||||
* Run fixtures to load fixture users back in
|
* Run fixtures to load fixture users back in
|
|
@ -2478,16 +2478,35 @@ class PublicContactResource(resources.ModelResource):
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
model = models.PublicContact
|
model = models.PublicContact
|
||||||
|
# may want to consider these bulk options in future, so left in as comments
|
||||||
|
# use_bulk = True
|
||||||
|
# batch_size = 1000
|
||||||
|
# force_init_instance = True
|
||||||
|
|
||||||
def import_row(self, row, instance_loader, using_transactions=True, dry_run=False, raise_errors=None, **kwargs):
|
def __init__(self):
|
||||||
"""Override kwargs skip_epp_save and set to True"""
|
"""Sets global variables for code tidyness"""
|
||||||
kwargs["skip_epp_save"] = True
|
super().__init__()
|
||||||
return super().import_row(
|
self.skip_epp_save = False
|
||||||
row,
|
|
||||||
instance_loader,
|
def import_data(
|
||||||
using_transactions=using_transactions,
|
self,
|
||||||
dry_run=dry_run,
|
dataset,
|
||||||
raise_errors=raise_errors,
|
dry_run=False,
|
||||||
|
raise_errors=False,
|
||||||
|
use_transactions=None,
|
||||||
|
collect_failed_rows=False,
|
||||||
|
rollback_on_validation_errors=False,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
|
"""Override import_data to set self.skip_epp_save if in kwargs"""
|
||||||
|
self.skip_epp_save = kwargs.get("skip_epp_save", False)
|
||||||
|
return super().import_data(
|
||||||
|
dataset,
|
||||||
|
dry_run,
|
||||||
|
raise_errors,
|
||||||
|
use_transactions,
|
||||||
|
collect_failed_rows,
|
||||||
|
rollback_on_validation_errors,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -2503,7 +2522,7 @@ class PublicContactResource(resources.ModelResource):
|
||||||
# we don't have transactions and we want to do a dry_run
|
# we don't have transactions and we want to do a dry_run
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
instance.save(skip_epp_save=True)
|
instance.save(skip_epp_save=self.skip_epp_save)
|
||||||
self.after_save_instance(instance, using_transactions, dry_run)
|
self.after_save_instance(instance, using_transactions, dry_run)
|
||||||
|
|
||||||
|
|
||||||
|
@ -2552,11 +2571,48 @@ class VerifiedByStaffAdmin(ListHeaderAdmin):
|
||||||
super().save_model(request, obj, form, change)
|
super().save_model(request, obj, form, change)
|
||||||
|
|
||||||
|
|
||||||
class FederalAgencyAdmin(ListHeaderAdmin):
|
class PortfolioAdmin(ListHeaderAdmin):
|
||||||
|
# NOTE: these are just placeholders. Not part of ACs (haven't been defined yet). Update in future tickets.
|
||||||
|
list_display = ("organization_name", "federal_agency", "creator")
|
||||||
|
search_fields = ["organization_name"]
|
||||||
|
search_help_text = "Search by organization name."
|
||||||
|
# readonly_fields = [
|
||||||
|
# "requestor",
|
||||||
|
# ]
|
||||||
|
|
||||||
|
def save_model(self, request, obj, form, change):
|
||||||
|
|
||||||
|
if obj.creator is not None:
|
||||||
|
# ---- update creator ----
|
||||||
|
# Set the creator field to the current admin user
|
||||||
|
obj.creator = request.user if request.user.is_authenticated else None
|
||||||
|
|
||||||
|
# ---- update organization name ----
|
||||||
|
# org name will be the same as federal agency, if it is federal,
|
||||||
|
# otherwise it will be the actual org name. If nothing is entered for
|
||||||
|
# org name and it is a federal organization, have this field fill with
|
||||||
|
# the federal agency text name.
|
||||||
|
is_federal = obj.organization_type == DomainRequest.OrganizationChoices.FEDERAL
|
||||||
|
if is_federal and obj.organization_name is None:
|
||||||
|
obj.organization_name = obj.federal_agency.agency
|
||||||
|
|
||||||
|
super().save_model(request, obj, form, change)
|
||||||
|
|
||||||
|
|
||||||
|
class FederalAgencyResource(resources.ModelResource):
|
||||||
|
"""defines how each field in the referenced model should be mapped to the corresponding fields in the
|
||||||
|
import/export file"""
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
model = models.FederalAgency
|
||||||
|
|
||||||
|
|
||||||
|
class FederalAgencyAdmin(ListHeaderAdmin, ImportExportModelAdmin):
|
||||||
list_display = ["agency"]
|
list_display = ["agency"]
|
||||||
search_fields = ["agency"]
|
search_fields = ["agency"]
|
||||||
search_help_text = "Search by agency name."
|
search_help_text = "Search by agency name."
|
||||||
ordering = ["agency"]
|
ordering = ["agency"]
|
||||||
|
resource_classes = [FederalAgencyResource]
|
||||||
|
|
||||||
|
|
||||||
class UserGroupAdmin(AuditedAdmin):
|
class UserGroupAdmin(AuditedAdmin):
|
||||||
|
@ -2622,6 +2678,7 @@ admin.site.register(models.PublicContact, PublicContactAdmin)
|
||||||
admin.site.register(models.DomainRequest, DomainRequestAdmin)
|
admin.site.register(models.DomainRequest, DomainRequestAdmin)
|
||||||
admin.site.register(models.TransitionDomain, TransitionDomainAdmin)
|
admin.site.register(models.TransitionDomain, TransitionDomainAdmin)
|
||||||
admin.site.register(models.VerifiedByStaff, VerifiedByStaffAdmin)
|
admin.site.register(models.VerifiedByStaff, VerifiedByStaffAdmin)
|
||||||
|
admin.site.register(models.Portfolio, PortfolioAdmin)
|
||||||
|
|
||||||
# Register our custom waffle implementations
|
# Register our custom waffle implementations
|
||||||
admin.site.register(models.WaffleFlag, WaffleFlagAdmin)
|
admin.site.register(models.WaffleFlag, WaffleFlagAdmin)
|
||||||
|
|
|
@ -28,6 +28,7 @@ class Command(BaseCommand):
|
||||||
* DomainInformation
|
* DomainInformation
|
||||||
* DomainRequest
|
* DomainRequest
|
||||||
* DraftDomain
|
* DraftDomain
|
||||||
|
* FederalAgency
|
||||||
* Host
|
* Host
|
||||||
* HostIp
|
* HostIp
|
||||||
* PublicContact
|
* PublicContact
|
||||||
|
@ -40,14 +41,15 @@ class Command(BaseCommand):
|
||||||
table_names = [
|
table_names = [
|
||||||
"DomainInformation",
|
"DomainInformation",
|
||||||
"DomainRequest",
|
"DomainRequest",
|
||||||
|
"FederalAgency",
|
||||||
"PublicContact",
|
"PublicContact",
|
||||||
|
"HostIp",
|
||||||
|
"Host",
|
||||||
"Domain",
|
"Domain",
|
||||||
"User",
|
"User",
|
||||||
"Contact",
|
"Contact",
|
||||||
"Website",
|
"Website",
|
||||||
"DraftDomain",
|
"DraftDomain",
|
||||||
"HostIp",
|
|
||||||
"Host",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
for table_name in table_names:
|
for table_name in table_names:
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
|
from django.core.paginator import Paginator
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import pyzipper
|
import pyzipper
|
||||||
|
import tablib
|
||||||
from django.core.management import BaseCommand
|
from django.core.management import BaseCommand
|
||||||
import registrar.admin
|
import registrar.admin
|
||||||
|
|
||||||
|
@ -18,6 +20,7 @@ class Command(BaseCommand):
|
||||||
"Domain",
|
"Domain",
|
||||||
"DomainRequest",
|
"DomainRequest",
|
||||||
"DomainInformation",
|
"DomainInformation",
|
||||||
|
"FederalAgency",
|
||||||
"UserDomainRole",
|
"UserDomainRole",
|
||||||
"DraftDomain",
|
"DraftDomain",
|
||||||
"Website",
|
"Website",
|
||||||
|
@ -36,28 +39,58 @@ class Command(BaseCommand):
|
||||||
zip_filename = "tmp/exported_tables.zip"
|
zip_filename = "tmp/exported_tables.zip"
|
||||||
with pyzipper.AESZipFile(zip_filename, "w", compression=pyzipper.ZIP_DEFLATED) as zipf:
|
with pyzipper.AESZipFile(zip_filename, "w", compression=pyzipper.ZIP_DEFLATED) as zipf:
|
||||||
for table_name in table_names:
|
for table_name in table_names:
|
||||||
csv_filename = f"tmp/{table_name}.csv"
|
|
||||||
if os.path.exists(csv_filename):
|
|
||||||
zipf.write(csv_filename, os.path.basename(csv_filename))
|
|
||||||
logger.info(f"Added {csv_filename} to zip archive {zip_filename}")
|
|
||||||
|
|
||||||
# Remove the CSV files after adding them to the zip file
|
# Define the tmp directory and the file pattern
|
||||||
for table_name in table_names:
|
tmp_dir = "tmp"
|
||||||
csv_filename = f"tmp/{table_name}.csv"
|
pattern = f"{table_name}_"
|
||||||
if os.path.exists(csv_filename):
|
zip_file_path = os.path.join(tmp_dir, "exported_files.zip")
|
||||||
os.remove(csv_filename)
|
|
||||||
logger.info(f"Removed temporary file {csv_filename}")
|
# Find all files that match the pattern
|
||||||
|
matching_files = [file for file in os.listdir(tmp_dir) if file.startswith(pattern)]
|
||||||
|
for file_path in matching_files:
|
||||||
|
# Add each file to the zip archive
|
||||||
|
zipf.write(f"tmp/{file_path}", os.path.basename(file_path))
|
||||||
|
logger.info(f"Added {file_path} to {zip_file_path}")
|
||||||
|
|
||||||
|
# Remove the file after adding to zip
|
||||||
|
os.remove(f"tmp/{file_path}")
|
||||||
|
logger.info(f"Removed {file_path}")
|
||||||
|
|
||||||
def export_table(self, table_name):
|
def export_table(self, table_name):
|
||||||
"""Export a given table to a csv file in the tmp directory"""
|
"""Export a given table to csv files in the tmp directory"""
|
||||||
resourcename = f"{table_name}Resource"
|
resourcename = f"{table_name}Resource"
|
||||||
try:
|
try:
|
||||||
resourceclass = getattr(registrar.admin, resourcename)
|
resourceclass = getattr(registrar.admin, resourcename)
|
||||||
dataset = resourceclass().export()
|
dataset = resourceclass().export()
|
||||||
filename = f"tmp/{table_name}.csv"
|
if not isinstance(dataset, tablib.Dataset):
|
||||||
with open(filename, "w") as outputfile:
|
raise ValueError(f"Exported data from {resourcename} is not a tablib.Dataset")
|
||||||
outputfile.write(dataset.csv)
|
|
||||||
logger.info(f"Successfully exported {table_name} to {filename}")
|
# Determine the number of rows per file
|
||||||
|
rows_per_file = 10000
|
||||||
|
|
||||||
|
# Use Paginator to handle splitting the dataset
|
||||||
|
paginator = Paginator(dataset.dict, rows_per_file)
|
||||||
|
num_files = paginator.num_pages
|
||||||
|
|
||||||
|
logger.info(f"splitting {table_name} into {num_files} files")
|
||||||
|
|
||||||
|
# Export each page to a separate file
|
||||||
|
for page_num in paginator.page_range:
|
||||||
|
page = paginator.page(page_num)
|
||||||
|
|
||||||
|
# Create a new dataset for the chunk
|
||||||
|
chunk = tablib.Dataset(headers=dataset.headers)
|
||||||
|
for row_dict in page.object_list:
|
||||||
|
row = [row_dict[header] for header in dataset.headers]
|
||||||
|
chunk.append(row)
|
||||||
|
|
||||||
|
# Export the chunk to a new file
|
||||||
|
filename = f"tmp/{table_name}_{page_num}.csv"
|
||||||
|
with open(filename, "w") as f:
|
||||||
|
f.write(chunk.export("csv"))
|
||||||
|
|
||||||
|
logger.info(f"Successfully exported {table_name} into {num_files} files.")
|
||||||
|
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
logger.error(f"Resource class {resourcename} not found in registrar.admin")
|
logger.error(f"Resource class {resourcename} not found in registrar.admin")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import argparse
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import pyzipper
|
import pyzipper
|
||||||
|
@ -14,6 +15,10 @@ logger = logging.getLogger(__name__)
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
help = "Imports tables from a zip file, exported_tables.zip, containing CSV files in the tmp directory."
|
help = "Imports tables from a zip file, exported_tables.zip, containing CSV files in the tmp directory."
|
||||||
|
|
||||||
|
def add_arguments(self, parser):
|
||||||
|
"""Add command line arguments."""
|
||||||
|
parser.add_argument("--skipEppSave", default=True, action=argparse.BooleanOptionalAction)
|
||||||
|
|
||||||
def handle(self, **options):
|
def handle(self, **options):
|
||||||
"""Extracts CSV files from a zip archive and imports them into the respective tables"""
|
"""Extracts CSV files from a zip archive and imports them into the respective tables"""
|
||||||
|
|
||||||
|
@ -21,6 +26,8 @@ class Command(BaseCommand):
|
||||||
logger.error("import_tables cannot be run in production")
|
logger.error("import_tables cannot be run in production")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
self.skip_epp_save = options.get("skipEppSave")
|
||||||
|
|
||||||
table_names = [
|
table_names = [
|
||||||
"User",
|
"User",
|
||||||
"Contact",
|
"Contact",
|
||||||
|
@ -29,6 +36,7 @@ class Command(BaseCommand):
|
||||||
"HostIp",
|
"HostIp",
|
||||||
"DraftDomain",
|
"DraftDomain",
|
||||||
"Website",
|
"Website",
|
||||||
|
"FederalAgency",
|
||||||
"DomainRequest",
|
"DomainRequest",
|
||||||
"DomainInformation",
|
"DomainInformation",
|
||||||
"UserDomainRole",
|
"UserDomainRole",
|
||||||
|
@ -56,38 +64,46 @@ class Command(BaseCommand):
|
||||||
"""Import data from a CSV file into the given table"""
|
"""Import data from a CSV file into the given table"""
|
||||||
|
|
||||||
resourcename = f"{table_name}Resource"
|
resourcename = f"{table_name}Resource"
|
||||||
csv_filename = f"tmp/{table_name}.csv"
|
|
||||||
try:
|
|
||||||
if not os.path.exists(csv_filename):
|
|
||||||
logger.error(f"CSV file {csv_filename} not found.")
|
|
||||||
return
|
|
||||||
|
|
||||||
# if table_name is Contact, clean the table first
|
# if table_name is Contact, clean the table first
|
||||||
# User table is loaded before Contact, and signals create
|
# User table is loaded before Contact, and signals create
|
||||||
# rows in Contact table which break the import, so need
|
# rows in Contact table which break the import, so need
|
||||||
# to be cleaned again before running import on Contact table
|
# to be cleaned again before running import on Contact table
|
||||||
if table_name == "Contact":
|
if table_name == "Contact":
|
||||||
self.clean_table(table_name)
|
self.clean_table(table_name)
|
||||||
|
|
||||||
resourceclass = getattr(registrar.admin, resourcename)
|
# Define the directory and the pattern for csv filenames
|
||||||
resource_instance = resourceclass()
|
tmp_dir = "tmp"
|
||||||
with open(csv_filename, "r") as csvfile:
|
pattern = f"{table_name}_"
|
||||||
dataset = tablib.Dataset().load(csvfile.read(), format="csv")
|
|
||||||
result = resource_instance.import_data(dataset, dry_run=False, skip_epp_save=True)
|
|
||||||
|
|
||||||
if result.has_errors():
|
resourceclass = getattr(registrar.admin, resourcename)
|
||||||
logger.error(f"Errors occurred while importing {csv_filename}: {result.row_errors()}")
|
resource_instance = resourceclass()
|
||||||
else:
|
|
||||||
logger.info(f"Successfully imported {csv_filename} into {table_name}")
|
|
||||||
|
|
||||||
except AttributeError:
|
# Find all files that match the pattern
|
||||||
logger.error(f"Resource class {resourcename} not found in registrar.admin")
|
matching_files = [file for file in os.listdir(tmp_dir) if file.startswith(pattern)]
|
||||||
except Exception as e:
|
for csv_filename in matching_files:
|
||||||
logger.error(f"Failed to import {csv_filename}: {e}")
|
try:
|
||||||
finally:
|
with open(f"tmp/{csv_filename}", "r") as csvfile:
|
||||||
if os.path.exists(csv_filename):
|
dataset = tablib.Dataset().load(csvfile.read(), format="csv")
|
||||||
os.remove(csv_filename)
|
result = resource_instance.import_data(dataset, dry_run=False, skip_epp_save=self.skip_epp_save)
|
||||||
logger.info(f"Removed temporary file {csv_filename}")
|
if result.has_errors():
|
||||||
|
logger.error(f"Errors occurred while importing {csv_filename}:")
|
||||||
|
for row_error in result.row_errors():
|
||||||
|
row_index = row_error[0]
|
||||||
|
errors = row_error[1]
|
||||||
|
for error in errors:
|
||||||
|
logger.error(f"Row {row_index} - {error.error} - {error.row}")
|
||||||
|
else:
|
||||||
|
logger.info(f"Successfully imported {csv_filename} into {table_name}")
|
||||||
|
|
||||||
|
except AttributeError:
|
||||||
|
logger.error(f"Resource class {resourcename} not found in registrar.admin")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to import {csv_filename}: {e}")
|
||||||
|
finally:
|
||||||
|
if os.path.exists(csv_filename):
|
||||||
|
os.remove(csv_filename)
|
||||||
|
logger.info(f"Removed temporary file {csv_filename}")
|
||||||
|
|
||||||
def clean_table(self, table_name):
|
def clean_table(self, table_name):
|
||||||
"""Delete all rows in the given table"""
|
"""Delete all rows in the given table"""
|
||||||
|
|
|
@ -0,0 +1,174 @@
|
||||||
|
# Generated by Django 4.2.10 on 2024-06-18 17:55
|
||||||
|
|
||||||
|
from django.conf import settings
|
||||||
|
from django.db import migrations, models
|
||||||
|
import django.db.models.deletion
|
||||||
|
import registrar.models.federal_agency
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
("registrar", "0102_domain_dsdata_last_change"),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.CreateModel(
|
||||||
|
name="Portfolio",
|
||||||
|
fields=[
|
||||||
|
("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
|
||||||
|
("created_at", models.DateTimeField(auto_now_add=True)),
|
||||||
|
("updated_at", models.DateTimeField(auto_now=True)),
|
||||||
|
("notes", models.TextField(blank=True, null=True)),
|
||||||
|
(
|
||||||
|
"organization_type",
|
||||||
|
models.CharField(
|
||||||
|
blank=True,
|
||||||
|
choices=[
|
||||||
|
("federal", "Federal"),
|
||||||
|
("interstate", "Interstate"),
|
||||||
|
("state_or_territory", "State or territory"),
|
||||||
|
("tribal", "Tribal"),
|
||||||
|
("county", "County"),
|
||||||
|
("city", "City"),
|
||||||
|
("special_district", "Special district"),
|
||||||
|
("school_district", "School district"),
|
||||||
|
],
|
||||||
|
help_text="Type of organization",
|
||||||
|
max_length=255,
|
||||||
|
null=True,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
("organization_name", models.CharField(blank=True, null=True)),
|
||||||
|
("address_line1", models.CharField(blank=True, null=True, verbose_name="address line 1")),
|
||||||
|
("address_line2", models.CharField(blank=True, null=True, verbose_name="address line 2")),
|
||||||
|
("city", models.CharField(blank=True, null=True)),
|
||||||
|
(
|
||||||
|
"state_territory",
|
||||||
|
models.CharField(
|
||||||
|
blank=True,
|
||||||
|
choices=[
|
||||||
|
("AL", "Alabama (AL)"),
|
||||||
|
("AK", "Alaska (AK)"),
|
||||||
|
("AS", "American Samoa (AS)"),
|
||||||
|
("AZ", "Arizona (AZ)"),
|
||||||
|
("AR", "Arkansas (AR)"),
|
||||||
|
("CA", "California (CA)"),
|
||||||
|
("CO", "Colorado (CO)"),
|
||||||
|
("CT", "Connecticut (CT)"),
|
||||||
|
("DE", "Delaware (DE)"),
|
||||||
|
("DC", "District of Columbia (DC)"),
|
||||||
|
("FL", "Florida (FL)"),
|
||||||
|
("GA", "Georgia (GA)"),
|
||||||
|
("GU", "Guam (GU)"),
|
||||||
|
("HI", "Hawaii (HI)"),
|
||||||
|
("ID", "Idaho (ID)"),
|
||||||
|
("IL", "Illinois (IL)"),
|
||||||
|
("IN", "Indiana (IN)"),
|
||||||
|
("IA", "Iowa (IA)"),
|
||||||
|
("KS", "Kansas (KS)"),
|
||||||
|
("KY", "Kentucky (KY)"),
|
||||||
|
("LA", "Louisiana (LA)"),
|
||||||
|
("ME", "Maine (ME)"),
|
||||||
|
("MD", "Maryland (MD)"),
|
||||||
|
("MA", "Massachusetts (MA)"),
|
||||||
|
("MI", "Michigan (MI)"),
|
||||||
|
("MN", "Minnesota (MN)"),
|
||||||
|
("MS", "Mississippi (MS)"),
|
||||||
|
("MO", "Missouri (MO)"),
|
||||||
|
("MT", "Montana (MT)"),
|
||||||
|
("NE", "Nebraska (NE)"),
|
||||||
|
("NV", "Nevada (NV)"),
|
||||||
|
("NH", "New Hampshire (NH)"),
|
||||||
|
("NJ", "New Jersey (NJ)"),
|
||||||
|
("NM", "New Mexico (NM)"),
|
||||||
|
("NY", "New York (NY)"),
|
||||||
|
("NC", "North Carolina (NC)"),
|
||||||
|
("ND", "North Dakota (ND)"),
|
||||||
|
("MP", "Northern Mariana Islands (MP)"),
|
||||||
|
("OH", "Ohio (OH)"),
|
||||||
|
("OK", "Oklahoma (OK)"),
|
||||||
|
("OR", "Oregon (OR)"),
|
||||||
|
("PA", "Pennsylvania (PA)"),
|
||||||
|
("PR", "Puerto Rico (PR)"),
|
||||||
|
("RI", "Rhode Island (RI)"),
|
||||||
|
("SC", "South Carolina (SC)"),
|
||||||
|
("SD", "South Dakota (SD)"),
|
||||||
|
("TN", "Tennessee (TN)"),
|
||||||
|
("TX", "Texas (TX)"),
|
||||||
|
("UM", "United States Minor Outlying Islands (UM)"),
|
||||||
|
("UT", "Utah (UT)"),
|
||||||
|
("VT", "Vermont (VT)"),
|
||||||
|
("VI", "Virgin Islands (VI)"),
|
||||||
|
("VA", "Virginia (VA)"),
|
||||||
|
("WA", "Washington (WA)"),
|
||||||
|
("WV", "West Virginia (WV)"),
|
||||||
|
("WI", "Wisconsin (WI)"),
|
||||||
|
("WY", "Wyoming (WY)"),
|
||||||
|
("AA", "Armed Forces Americas (AA)"),
|
||||||
|
("AE", "Armed Forces Africa, Canada, Europe, Middle East (AE)"),
|
||||||
|
("AP", "Armed Forces Pacific (AP)"),
|
||||||
|
],
|
||||||
|
max_length=2,
|
||||||
|
null=True,
|
||||||
|
verbose_name="state / territory",
|
||||||
|
),
|
||||||
|
),
|
||||||
|
("zipcode", models.CharField(blank=True, max_length=10, null=True, verbose_name="zip code")),
|
||||||
|
(
|
||||||
|
"urbanization",
|
||||||
|
models.CharField(
|
||||||
|
blank=True, help_text="Required for Puerto Rico only", null=True, verbose_name="urbanization"
|
||||||
|
),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"security_contact_email",
|
||||||
|
models.EmailField(blank=True, max_length=320, null=True, verbose_name="security contact e-mail"),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"creator",
|
||||||
|
models.ForeignKey(
|
||||||
|
help_text="Associated user",
|
||||||
|
on_delete=django.db.models.deletion.PROTECT,
|
||||||
|
to=settings.AUTH_USER_MODEL,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"federal_agency",
|
||||||
|
models.ForeignKey(
|
||||||
|
default=registrar.models.federal_agency.FederalAgency.get_non_federal_agency,
|
||||||
|
help_text="Associated federal agency",
|
||||||
|
on_delete=django.db.models.deletion.PROTECT,
|
||||||
|
to="registrar.federalagency",
|
||||||
|
),
|
||||||
|
),
|
||||||
|
],
|
||||||
|
options={
|
||||||
|
"abstract": False,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
migrations.AddField(
|
||||||
|
model_name="domaininformation",
|
||||||
|
name="portfolio",
|
||||||
|
field=models.ForeignKey(
|
||||||
|
blank=True,
|
||||||
|
help_text="Portfolio associated with this domain",
|
||||||
|
null=True,
|
||||||
|
on_delete=django.db.models.deletion.PROTECT,
|
||||||
|
related_name="DomainRequest_portfolio",
|
||||||
|
to="registrar.portfolio",
|
||||||
|
),
|
||||||
|
),
|
||||||
|
migrations.AddField(
|
||||||
|
model_name="domainrequest",
|
||||||
|
name="portfolio",
|
||||||
|
field=models.ForeignKey(
|
||||||
|
blank=True,
|
||||||
|
help_text="Portfolio associated with this domain request",
|
||||||
|
null=True,
|
||||||
|
on_delete=django.db.models.deletion.PROTECT,
|
||||||
|
related_name="DomainInformation_portfolio",
|
||||||
|
to="registrar.portfolio",
|
||||||
|
),
|
||||||
|
),
|
||||||
|
]
|
37
src/registrar/migrations/0104_create_groups_v13.py
Normal file
37
src/registrar/migrations/0104_create_groups_v13.py
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
# This migration creates the create_full_access_group and create_cisa_analyst_group groups
|
||||||
|
# It is dependent on 0079 (which populates federal agencies)
|
||||||
|
# If permissions on the groups need changing, edit CISA_ANALYST_GROUP_PERMISSIONS
|
||||||
|
# in the user_group model then:
|
||||||
|
# [NOT RECOMMENDED]
|
||||||
|
# step 1: docker-compose exec app ./manage.py migrate --fake registrar 0035_contenttypes_permissions
|
||||||
|
# step 2: docker-compose exec app ./manage.py migrate registrar 0036_create_groups
|
||||||
|
# step 3: fake run the latest migration in the migrations list
|
||||||
|
# [RECOMMENDED]
|
||||||
|
# Alternatively:
|
||||||
|
# step 1: duplicate the migration that loads data
|
||||||
|
# step 2: docker-compose exec app ./manage.py migrate
|
||||||
|
|
||||||
|
from django.db import migrations
|
||||||
|
from registrar.models import UserGroup
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
# For linting: RunPython expects a function reference,
|
||||||
|
# so let's give it one
|
||||||
|
def create_groups(apps, schema_editor) -> Any:
|
||||||
|
UserGroup.create_cisa_analyst_group(apps, schema_editor)
|
||||||
|
UserGroup.create_full_access_group(apps, schema_editor)
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
dependencies = [
|
||||||
|
("registrar", "0103_portfolio_domaininformation_portfolio_and_more"),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.RunPython(
|
||||||
|
create_groups,
|
||||||
|
reverse_code=migrations.RunPython.noop,
|
||||||
|
atomic=True,
|
||||||
|
),
|
||||||
|
]
|
|
@ -16,6 +16,7 @@ from .website import Website
|
||||||
from .transition_domain import TransitionDomain
|
from .transition_domain import TransitionDomain
|
||||||
from .verified_by_staff import VerifiedByStaff
|
from .verified_by_staff import VerifiedByStaff
|
||||||
from .waffle_flag import WaffleFlag
|
from .waffle_flag import WaffleFlag
|
||||||
|
from .portfolio import Portfolio
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
|
@ -36,6 +37,7 @@ __all__ = [
|
||||||
"TransitionDomain",
|
"TransitionDomain",
|
||||||
"VerifiedByStaff",
|
"VerifiedByStaff",
|
||||||
"WaffleFlag",
|
"WaffleFlag",
|
||||||
|
"Portfolio",
|
||||||
]
|
]
|
||||||
|
|
||||||
auditlog.register(Contact)
|
auditlog.register(Contact)
|
||||||
|
@ -55,3 +57,4 @@ auditlog.register(Website)
|
||||||
auditlog.register(TransitionDomain)
|
auditlog.register(TransitionDomain)
|
||||||
auditlog.register(VerifiedByStaff)
|
auditlog.register(VerifiedByStaff)
|
||||||
auditlog.register(WaffleFlag)
|
auditlog.register(WaffleFlag)
|
||||||
|
auditlog.register(Portfolio)
|
||||||
|
|
|
@ -57,6 +57,16 @@ class DomainInformation(TimeStampedModel):
|
||||||
help_text="Person who submitted the domain request",
|
help_text="Person who submitted the domain request",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# portfolio
|
||||||
|
portfolio = models.ForeignKey(
|
||||||
|
"registrar.Portfolio",
|
||||||
|
on_delete=models.PROTECT,
|
||||||
|
null=True,
|
||||||
|
blank=True,
|
||||||
|
related_name="DomainRequest_portfolio",
|
||||||
|
help_text="Portfolio associated with this domain",
|
||||||
|
)
|
||||||
|
|
||||||
domain_request = models.OneToOneField(
|
domain_request = models.OneToOneField(
|
||||||
"registrar.DomainRequest",
|
"registrar.DomainRequest",
|
||||||
on_delete=models.PROTECT,
|
on_delete=models.PROTECT,
|
||||||
|
|
|
@ -309,6 +309,16 @@ class DomainRequest(TimeStampedModel):
|
||||||
null=True,
|
null=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# portfolio
|
||||||
|
portfolio = models.ForeignKey(
|
||||||
|
"registrar.Portfolio",
|
||||||
|
on_delete=models.PROTECT,
|
||||||
|
null=True,
|
||||||
|
blank=True,
|
||||||
|
related_name="DomainInformation_portfolio",
|
||||||
|
help_text="Portfolio associated with this domain request",
|
||||||
|
)
|
||||||
|
|
||||||
# This is the domain request user who created this domain request. The contact
|
# This is the domain request user who created this domain request. The contact
|
||||||
# information that they gave is in the `submitter` field
|
# information that they gave is in the `submitter` field
|
||||||
creator = models.ForeignKey(
|
creator = models.ForeignKey(
|
||||||
|
|
|
@ -230,3 +230,8 @@ class FederalAgency(TimeStampedModel):
|
||||||
FederalAgency.objects.bulk_create(agencies)
|
FederalAgency.objects.bulk_create(agencies)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error creating federal agencies: {e}")
|
logger.error(f"Error creating federal agencies: {e}")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_non_federal_agency(cls):
|
||||||
|
"""Returns the non-federal agency."""
|
||||||
|
return FederalAgency.objects.filter(agency="Non-Federal Agency").first()
|
||||||
|
|
99
src/registrar/models/portfolio.py
Normal file
99
src/registrar/models/portfolio.py
Normal file
|
@ -0,0 +1,99 @@
|
||||||
|
from django.db import models
|
||||||
|
|
||||||
|
from registrar.models.domain_request import DomainRequest
|
||||||
|
from registrar.models.federal_agency import FederalAgency
|
||||||
|
|
||||||
|
from .utility.time_stamped_model import TimeStampedModel
|
||||||
|
|
||||||
|
|
||||||
|
# def get_default_federal_agency():
|
||||||
|
# """returns non-federal agency"""
|
||||||
|
# return FederalAgency.objects.filter(agency="Non-Federal Agency").first()
|
||||||
|
|
||||||
|
|
||||||
|
class Portfolio(TimeStampedModel):
|
||||||
|
"""
|
||||||
|
Portfolio is used for organizing domains/domain-requests into
|
||||||
|
manageable groups.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# use the short names in Django admin
|
||||||
|
OrganizationChoices = DomainRequest.OrganizationChoices
|
||||||
|
StateTerritoryChoices = DomainRequest.StateTerritoryChoices
|
||||||
|
|
||||||
|
# Stores who created this model. If no creator is specified in DJA,
|
||||||
|
# then the creator will default to the current request user"""
|
||||||
|
creator = models.ForeignKey("registrar.User", on_delete=models.PROTECT, help_text="Associated user", unique=False)
|
||||||
|
|
||||||
|
notes = models.TextField(
|
||||||
|
null=True,
|
||||||
|
blank=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
federal_agency = models.ForeignKey(
|
||||||
|
"registrar.FederalAgency",
|
||||||
|
on_delete=models.PROTECT,
|
||||||
|
help_text="Associated federal agency",
|
||||||
|
unique=False,
|
||||||
|
default=FederalAgency.get_non_federal_agency,
|
||||||
|
)
|
||||||
|
|
||||||
|
organization_type = models.CharField(
|
||||||
|
max_length=255,
|
||||||
|
choices=OrganizationChoices.choices,
|
||||||
|
null=True,
|
||||||
|
blank=True,
|
||||||
|
help_text="Type of organization",
|
||||||
|
)
|
||||||
|
|
||||||
|
organization_name = models.CharField(
|
||||||
|
null=True,
|
||||||
|
blank=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
address_line1 = models.CharField(
|
||||||
|
null=True,
|
||||||
|
blank=True,
|
||||||
|
verbose_name="address line 1",
|
||||||
|
)
|
||||||
|
|
||||||
|
address_line2 = models.CharField(
|
||||||
|
null=True,
|
||||||
|
blank=True,
|
||||||
|
verbose_name="address line 2",
|
||||||
|
)
|
||||||
|
|
||||||
|
city = models.CharField(
|
||||||
|
null=True,
|
||||||
|
blank=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# (imports enums from domain_request.py)
|
||||||
|
state_territory = models.CharField(
|
||||||
|
max_length=2,
|
||||||
|
choices=StateTerritoryChoices.choices,
|
||||||
|
null=True,
|
||||||
|
blank=True,
|
||||||
|
verbose_name="state / territory",
|
||||||
|
)
|
||||||
|
|
||||||
|
zipcode = models.CharField(
|
||||||
|
max_length=10,
|
||||||
|
null=True,
|
||||||
|
blank=True,
|
||||||
|
verbose_name="zip code",
|
||||||
|
)
|
||||||
|
|
||||||
|
urbanization = models.CharField(
|
||||||
|
null=True,
|
||||||
|
blank=True,
|
||||||
|
help_text="Required for Puerto Rico only",
|
||||||
|
verbose_name="urbanization",
|
||||||
|
)
|
||||||
|
|
||||||
|
security_contact_email = models.EmailField(
|
||||||
|
null=True,
|
||||||
|
blank=True,
|
||||||
|
verbose_name="security contact e-mail",
|
||||||
|
max_length=320,
|
||||||
|
)
|
|
@ -2291,6 +2291,7 @@ class TestDomainRequestAdmin(MockEppLib):
|
||||||
"rejection_reason",
|
"rejection_reason",
|
||||||
"action_needed_reason",
|
"action_needed_reason",
|
||||||
"federal_agency",
|
"federal_agency",
|
||||||
|
"portfolio",
|
||||||
"creator",
|
"creator",
|
||||||
"investigator",
|
"investigator",
|
||||||
"generic_org_type",
|
"generic_org_type",
|
||||||
|
|
|
@ -7,6 +7,7 @@ from django.utils.module_loading import import_string
|
||||||
import logging
|
import logging
|
||||||
import pyzipper
|
import pyzipper
|
||||||
from registrar.management.commands.clean_tables import Command as CleanTablesCommand
|
from registrar.management.commands.clean_tables import Command as CleanTablesCommand
|
||||||
|
from registrar.management.commands.export_tables import Command as ExportTablesCommand
|
||||||
from registrar.models import (
|
from registrar.models import (
|
||||||
User,
|
User,
|
||||||
Domain,
|
Domain,
|
||||||
|
@ -873,84 +874,81 @@ class TestExportTables(MockEppLib):
|
||||||
"""Test the export_tables script"""
|
"""Test the export_tables script"""
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
self.command = ExportTablesCommand()
|
||||||
self.logger_patcher = patch("registrar.management.commands.export_tables.logger")
|
self.logger_patcher = patch("registrar.management.commands.export_tables.logger")
|
||||||
self.logger_mock = self.logger_patcher.start()
|
self.logger_mock = self.logger_patcher.start()
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.logger_patcher.stop()
|
self.logger_patcher.stop()
|
||||||
|
|
||||||
@patch("registrar.management.commands.export_tables.os.makedirs")
|
@patch("os.makedirs")
|
||||||
@patch("registrar.management.commands.export_tables.os.path.exists")
|
@patch("os.path.exists")
|
||||||
@patch("registrar.management.commands.export_tables.os.remove")
|
@patch("os.remove")
|
||||||
@patch("registrar.management.commands.export_tables.pyzipper.AESZipFile")
|
@patch("pyzipper.AESZipFile")
|
||||||
@patch("registrar.management.commands.export_tables.getattr")
|
@patch("registrar.management.commands.export_tables.getattr")
|
||||||
@patch("builtins.open", new_callable=mock_open, read_data=b"mock_csv_data")
|
@patch("builtins.open", new_callable=mock_open)
|
||||||
@patch("django.utils.translation.trans_real._translations", {})
|
@patch("os.listdir")
|
||||||
@patch("django.utils.translation.trans_real.translation")
|
|
||||||
def test_handle(
|
def test_handle(
|
||||||
self, mock_translation, mock_file, mock_getattr, mock_zipfile, mock_remove, mock_path_exists, mock_makedirs
|
self, mock_listdir, mock_open, mock_getattr, mock_zipfile, mock_remove, mock_path_exists, mock_makedirs
|
||||||
):
|
):
|
||||||
"""test that the handle method properly exports tables"""
|
"""test that the handle method properly exports tables"""
|
||||||
with less_console_noise():
|
# Mock os.makedirs to do nothing
|
||||||
# Mock os.makedirs to do nothing
|
mock_makedirs.return_value = None
|
||||||
mock_makedirs.return_value = None
|
|
||||||
|
|
||||||
# Mock os.path.exists to always return True
|
# Mock os.path.exists to always return True
|
||||||
mock_path_exists.return_value = True
|
mock_path_exists.return_value = True
|
||||||
|
|
||||||
# Mock the resource class and its export method
|
# Check that the export_table function was called for each table
|
||||||
mock_resource_class = MagicMock()
|
table_names = [
|
||||||
mock_dataset = MagicMock()
|
"User",
|
||||||
mock_dataset.csv = b"mock_csv_data"
|
"Contact",
|
||||||
mock_resource_class().export.return_value = mock_dataset
|
"Domain",
|
||||||
mock_getattr.return_value = mock_resource_class
|
"DomainRequest",
|
||||||
|
"DomainInformation",
|
||||||
|
"FederalAgency",
|
||||||
|
"UserDomainRole",
|
||||||
|
"DraftDomain",
|
||||||
|
"Website",
|
||||||
|
"HostIp",
|
||||||
|
"Host",
|
||||||
|
"PublicContact",
|
||||||
|
]
|
||||||
|
|
||||||
# Mock translation function to return a dummy translation object
|
# Mock directory listing
|
||||||
mock_translation.return_value = MagicMock()
|
mock_listdir.side_effect = lambda path: [f"{table}_1.csv" for table in table_names]
|
||||||
|
|
||||||
call_command("export_tables")
|
# Mock the resource class and its export method
|
||||||
|
mock_dataset = tablib.Dataset()
|
||||||
|
mock_dataset.headers = ["header1", "header2"]
|
||||||
|
mock_dataset.append(["row1_col1", "row1_col2"])
|
||||||
|
mock_resource_class = MagicMock()
|
||||||
|
mock_resource_class().export.return_value = mock_dataset
|
||||||
|
mock_getattr.return_value = mock_resource_class
|
||||||
|
|
||||||
# Check that os.makedirs was called once to create the tmp directory
|
command_instance = ExportTablesCommand()
|
||||||
mock_makedirs.assert_called_once_with("tmp", exist_ok=True)
|
command_instance.handle()
|
||||||
|
|
||||||
# Check that the export_table function was called for each table
|
# Check that os.makedirs was called once to create the tmp directory
|
||||||
table_names = [
|
mock_makedirs.assert_called_once_with("tmp", exist_ok=True)
|
||||||
"User",
|
|
||||||
"Contact",
|
|
||||||
"Domain",
|
|
||||||
"DomainRequest",
|
|
||||||
"DomainInformation",
|
|
||||||
"UserDomainRole",
|
|
||||||
"DraftDomain",
|
|
||||||
"Website",
|
|
||||||
"HostIp",
|
|
||||||
"Host",
|
|
||||||
"PublicContact",
|
|
||||||
]
|
|
||||||
|
|
||||||
# Check that the CSV file was written
|
# Check that the CSV file was written
|
||||||
for table_name in table_names:
|
for table_name in table_names:
|
||||||
mock_file().write.assert_any_call(b"mock_csv_data")
|
# Check that os.remove was called
|
||||||
# Check that os.path.exists was called
|
mock_remove.assert_any_call(f"tmp/{table_name}_1.csv")
|
||||||
mock_path_exists.assert_any_call(f"tmp/{table_name}.csv")
|
|
||||||
# Check that os.remove was called
|
|
||||||
mock_remove.assert_any_call(f"tmp/{table_name}.csv")
|
|
||||||
|
|
||||||
# Check that the zipfile was created and files were added
|
# Check that the zipfile was created and files were added
|
||||||
mock_zipfile.assert_called_once_with("tmp/exported_tables.zip", "w", compression=pyzipper.ZIP_DEFLATED)
|
mock_zipfile.assert_called_once_with("tmp/exported_tables.zip", "w", compression=pyzipper.ZIP_DEFLATED)
|
||||||
zipfile_instance = mock_zipfile.return_value.__enter__.return_value
|
zipfile_instance = mock_zipfile.return_value.__enter__.return_value
|
||||||
for table_name in table_names:
|
for table_name in table_names:
|
||||||
zipfile_instance.write.assert_any_call(f"tmp/{table_name}.csv", f"{table_name}.csv")
|
zipfile_instance.write.assert_any_call(f"tmp/{table_name}_1.csv", f"{table_name}_1.csv")
|
||||||
|
|
||||||
# Verify logging for added files
|
# Verify logging for added files
|
||||||
for table_name in table_names:
|
for table_name in table_names:
|
||||||
self.logger_mock.info.assert_any_call(
|
self.logger_mock.info.assert_any_call(f"Added {table_name}_1.csv to tmp/exported_files.zip")
|
||||||
f"Added tmp/{table_name}.csv to zip archive tmp/exported_tables.zip"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Verify logging for removed files
|
# Verify logging for removed files
|
||||||
for table_name in table_names:
|
for table_name in table_names:
|
||||||
self.logger_mock.info.assert_any_call(f"Removed temporary file tmp/{table_name}.csv")
|
self.logger_mock.info.assert_any_call(f"Removed {table_name}_1.csv")
|
||||||
|
|
||||||
@patch("registrar.management.commands.export_tables.getattr")
|
@patch("registrar.management.commands.export_tables.getattr")
|
||||||
def test_export_table_handles_missing_resource_class(self, mock_getattr):
|
def test_export_table_handles_missing_resource_class(self, mock_getattr):
|
||||||
|
@ -995,8 +993,10 @@ class TestImportTables(TestCase):
|
||||||
@patch("registrar.management.commands.import_tables.logger")
|
@patch("registrar.management.commands.import_tables.logger")
|
||||||
@patch("registrar.management.commands.import_tables.getattr")
|
@patch("registrar.management.commands.import_tables.getattr")
|
||||||
@patch("django.apps.apps.get_model")
|
@patch("django.apps.apps.get_model")
|
||||||
|
@patch("os.listdir")
|
||||||
def test_handle(
|
def test_handle(
|
||||||
self,
|
self,
|
||||||
|
mock_listdir,
|
||||||
mock_get_model,
|
mock_get_model,
|
||||||
mock_getattr,
|
mock_getattr,
|
||||||
mock_logger,
|
mock_logger,
|
||||||
|
@ -1019,6 +1019,24 @@ class TestImportTables(TestCase):
|
||||||
mock_zipfile_instance = mock_zipfile.return_value.__enter__.return_value
|
mock_zipfile_instance = mock_zipfile.return_value.__enter__.return_value
|
||||||
mock_zipfile_instance.extractall.return_value = None
|
mock_zipfile_instance.extractall.return_value = None
|
||||||
|
|
||||||
|
# Check that the import_table function was called for each table
|
||||||
|
table_names = [
|
||||||
|
"User",
|
||||||
|
"Contact",
|
||||||
|
"Domain",
|
||||||
|
"DomainRequest",
|
||||||
|
"DomainInformation",
|
||||||
|
"UserDomainRole",
|
||||||
|
"DraftDomain",
|
||||||
|
"Website",
|
||||||
|
"HostIp",
|
||||||
|
"Host",
|
||||||
|
"PublicContact",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Mock directory listing
|
||||||
|
mock_listdir.side_effect = lambda path: [f"{table}_1.csv" for table in table_names]
|
||||||
|
|
||||||
# Mock the CSV file content
|
# Mock the CSV file content
|
||||||
csv_content = b"mock_csv_data"
|
csv_content = b"mock_csv_data"
|
||||||
|
|
||||||
|
@ -1054,23 +1072,9 @@ class TestImportTables(TestCase):
|
||||||
# Check that extractall was called once to extract the zip file contents
|
# Check that extractall was called once to extract the zip file contents
|
||||||
mock_zipfile_instance.extractall.assert_called_once_with("tmp")
|
mock_zipfile_instance.extractall.assert_called_once_with("tmp")
|
||||||
|
|
||||||
# Check that the import_table function was called for each table
|
|
||||||
table_names = [
|
|
||||||
"User",
|
|
||||||
"Contact",
|
|
||||||
"Domain",
|
|
||||||
"DomainRequest",
|
|
||||||
"DomainInformation",
|
|
||||||
"UserDomainRole",
|
|
||||||
"DraftDomain",
|
|
||||||
"Website",
|
|
||||||
"HostIp",
|
|
||||||
"Host",
|
|
||||||
"PublicContact",
|
|
||||||
]
|
|
||||||
# Check that os.path.exists was called for each table
|
# Check that os.path.exists was called for each table
|
||||||
for table_name in table_names:
|
for table_name in table_names:
|
||||||
mock_path_exists.assert_any_call(f"tmp/{table_name}.csv")
|
mock_path_exists.assert_any_call(f"{table_name}_1.csv")
|
||||||
|
|
||||||
# Check that clean_tables is called for Contact
|
# Check that clean_tables is called for Contact
|
||||||
mock_get_model.assert_any_call("registrar", "Contact")
|
mock_get_model.assert_any_call("registrar", "Contact")
|
||||||
|
@ -1079,18 +1083,18 @@ class TestImportTables(TestCase):
|
||||||
|
|
||||||
# Check that logger.info was called for each successful import
|
# Check that logger.info was called for each successful import
|
||||||
for table_name in table_names:
|
for table_name in table_names:
|
||||||
mock_logger.info.assert_any_call(f"Successfully imported tmp/{table_name}.csv into {table_name}")
|
mock_logger.info.assert_any_call(f"Successfully imported {table_name}_1.csv into {table_name}")
|
||||||
|
|
||||||
# Check that logger.error was not called for resource class not found
|
# Check that logger.error was not called for resource class not found
|
||||||
mock_logger.error.assert_not_called()
|
mock_logger.error.assert_not_called()
|
||||||
|
|
||||||
# Check that os.remove was called for each CSV file
|
# Check that os.remove was called for each CSV file
|
||||||
for table_name in table_names:
|
for table_name in table_names:
|
||||||
mock_remove.assert_any_call(f"tmp/{table_name}.csv")
|
mock_remove.assert_any_call(f"{table_name}_1.csv")
|
||||||
|
|
||||||
# Check that logger.info was called for each CSV file removal
|
# Check that logger.info was called for each CSV file removal
|
||||||
for table_name in table_names:
|
for table_name in table_names:
|
||||||
mock_logger.info.assert_any_call(f"Removed temporary file tmp/{table_name}.csv")
|
mock_logger.info.assert_any_call(f"Removed temporary file {table_name}_1.csv")
|
||||||
|
|
||||||
@patch("registrar.management.commands.import_tables.logger")
|
@patch("registrar.management.commands.import_tables.logger")
|
||||||
@patch("registrar.management.commands.import_tables.os.makedirs")
|
@patch("registrar.management.commands.import_tables.os.makedirs")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue