mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-05-19 10:59:21 +02:00
Merge pull request #2246 from cisagov/dk/2194-import-export
Issues #2194, #2204: added command scripts clean_tables, export_tables and import_tables
This commit is contained in:
commit
7f55b6dc6d
6 changed files with 693 additions and 19 deletions
|
@ -1,18 +1,29 @@
|
|||
# Export / Import Tables
|
||||
|
||||
A means is provided to export and import individual tables from
|
||||
A means is provided to export and import tables from
|
||||
one environment to another. This allows for replication of
|
||||
production data in a development environment. Import and export
|
||||
are provided through the django admin interface, through a modified
|
||||
library, django-import-export. Each supported model has an Import
|
||||
and an Export button on the list view.
|
||||
are provided through a modified library, django-import-export.
|
||||
Simple scripts are provided as detailed below.
|
||||
|
||||
### Export
|
||||
|
||||
When exporting models from the source environment, make sure that
|
||||
no filters are selected. This will ensure that all rows of the model
|
||||
are exported. Due to database dependencies, the following models
|
||||
need to be exported:
|
||||
To export from the source environment, run the following command from src directory:
|
||||
manage.py export_tables
|
||||
|
||||
Connect to the source sandbox and run the command:
|
||||
cf ssh {source-app}
|
||||
/tmp/lifecycle/shell
|
||||
./manage.py export_tables
|
||||
|
||||
example exporting from getgov-stable:
|
||||
cf ssh getgov-stable
|
||||
/tmp/lifecycle/shell
|
||||
./manage.py export_tables
|
||||
|
||||
This exports a file, exported_tables.zip, to the tmp directory
|
||||
|
||||
For reference, the zip file will contain the following tables in csv form:
|
||||
|
||||
* User
|
||||
* Contact
|
||||
|
@ -25,6 +36,20 @@ need to be exported:
|
|||
* Host
|
||||
* HostIP
|
||||
|
||||
After exporting the file from the target environment, scp the exported_tables.zip
|
||||
file from the target environment to local. Run the below commands from local.
|
||||
|
||||
Get passcode by running:
|
||||
cf ssh-code
|
||||
|
||||
scp file from source app to local file:
|
||||
scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app {source-app} --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip {local_file_path}
|
||||
when prompted, supply the passcode retrieved in the 'cf ssh-code' command
|
||||
|
||||
example copying from stable to local cwd:
|
||||
scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-stable --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 ssh.fr.cloud.gov:app/tmp/exported_tables.zip .
|
||||
|
||||
|
||||
### Import
|
||||
|
||||
When importing into the target environment, if the target environment
|
||||
|
@ -34,7 +59,18 @@ that there are no database conflicts on import.
|
|||
|
||||
#### Preparing Target Environment
|
||||
|
||||
Delete all rows from tables in the following order through django admin:
|
||||
In order to delete all rows from the appropriate tables, run the following
|
||||
command:
|
||||
cf ssh {target-app}
|
||||
/tmp/lifecycle/shell
|
||||
./manage.py clean_tables
|
||||
|
||||
example cleaning getgov-backup:
|
||||
cf ssh getgov-backup
|
||||
/tmp/lifecycle/backup
|
||||
./manage.py clean_tables
|
||||
|
||||
For reference, this deletes all rows from the following tables:
|
||||
|
||||
* DomainInformation
|
||||
* DomainRequest
|
||||
|
@ -48,10 +84,34 @@ Delete all rows from tables in the following order through django admin:
|
|||
|
||||
#### Importing into Target Environment
|
||||
|
||||
Once target environment is prepared, files can be imported in the following
|
||||
order:
|
||||
Once target environment is prepared, files can be imported.
|
||||
|
||||
* User (After importing User table, you need to delete all rows from Contact table before importing Contacts)
|
||||
To scp the exported_tables.zip file from local to the sandbox, run the following:
|
||||
|
||||
Get passcode by running:
|
||||
cf ssh-code
|
||||
|
||||
scp file from local to target app:
|
||||
scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app {target-app} --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 {local_file_path} ssh.fr.cloud.gov:app/tmp/exported_tables.zip
|
||||
when prompted, supply the passcode retrieved in the 'cf ssh-code' command
|
||||
|
||||
example copy of local file in tmp to getgov-backup:
|
||||
scp -P 2222 -o User=cf:$(cf curl /v3/apps/$(cf app getgov-backup --guid)/processes | jq -r '.resources[] | select(.type=="web") | .guid')/0 tmp/exported_tables.zip ssh.fr.cloud.gov:app/tmp/exported_tables.zip
|
||||
|
||||
|
||||
Then connect to a shell in the target environment, and run the following import command:
|
||||
cf ssh {target-app}
|
||||
/tmp/lifecycle/shell
|
||||
./manage.py import_tables
|
||||
|
||||
example cleaning getgov-backup:
|
||||
cf ssh getgov-backup
|
||||
/tmp/lifecycle/backup
|
||||
./manage.py import_tables
|
||||
|
||||
For reference, this imports tables in the following order:
|
||||
|
||||
* User
|
||||
* Contact
|
||||
* Domain
|
||||
* Host
|
||||
|
|
|
@ -2245,9 +2245,46 @@ class DraftDomainAdmin(ListHeaderAdmin, ImportExportModelAdmin):
|
|||
return response
|
||||
|
||||
|
||||
class PublicContactAdmin(ListHeaderAdmin):
|
||||
class PublicContactResource(resources.ModelResource):
|
||||
"""defines how each field in the referenced model should be mapped to the corresponding fields in the
|
||||
import/export file"""
|
||||
|
||||
class Meta:
|
||||
model = models.PublicContact
|
||||
|
||||
def import_row(self, row, instance_loader, using_transactions=True, dry_run=False, raise_errors=None, **kwargs):
|
||||
"""Override kwargs skip_epp_save and set to True"""
|
||||
kwargs["skip_epp_save"] = True
|
||||
return super().import_row(
|
||||
row,
|
||||
instance_loader,
|
||||
using_transactions=using_transactions,
|
||||
dry_run=dry_run,
|
||||
raise_errors=raise_errors,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def save_instance(self, instance, is_create, using_transactions=True, dry_run=False):
|
||||
"""Override save_instance setting skip_epp_save to True"""
|
||||
self.before_save_instance(instance, using_transactions, dry_run)
|
||||
if self._meta.use_bulk:
|
||||
if is_create:
|
||||
self.create_instances.append(instance)
|
||||
else:
|
||||
self.update_instances.append(instance)
|
||||
elif not using_transactions and dry_run:
|
||||
# we don't have transactions and we want to do a dry_run
|
||||
pass
|
||||
else:
|
||||
instance.save(skip_epp_save=True)
|
||||
self.after_save_instance(instance, using_transactions, dry_run)
|
||||
|
||||
|
||||
class PublicContactAdmin(ListHeaderAdmin, ImportExportModelAdmin):
|
||||
"""Custom PublicContact admin class."""
|
||||
|
||||
resource_classes = [PublicContactResource]
|
||||
|
||||
change_form_template = "django/admin/email_clipboard_change_form.html"
|
||||
autocomplete_fields = ["domain"]
|
||||
|
||||
|
|
68
src/registrar/management/commands/clean_tables.py
Normal file
68
src/registrar/management/commands/clean_tables.py
Normal file
|
@ -0,0 +1,68 @@
|
|||
import logging
|
||||
from django.conf import settings
|
||||
from django.core.management import BaseCommand
|
||||
from django.apps import apps
|
||||
from django.db import transaction
|
||||
|
||||
from registrar.management.commands.utility.terminal_helper import TerminalHelper
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Clean tables in database to prepare for import."
|
||||
|
||||
def handle(self, **options):
|
||||
"""Delete all rows from a list of tables"""
|
||||
|
||||
if settings.IS_PRODUCTION:
|
||||
logger.error("clean_tables cannot be run in production")
|
||||
return
|
||||
|
||||
TerminalHelper.prompt_for_execution(
|
||||
system_exit_on_terminate=True,
|
||||
info_to_inspect="""
|
||||
This script will delete all rows from the following tables:
|
||||
* Contact
|
||||
* Domain
|
||||
* DomainInformation
|
||||
* DomainRequest
|
||||
* DraftDomain
|
||||
* Host
|
||||
* HostIp
|
||||
* PublicContact
|
||||
* User
|
||||
* Website
|
||||
""",
|
||||
prompt_title="Do you wish to proceed with these changes?",
|
||||
)
|
||||
|
||||
table_names = [
|
||||
"DomainInformation",
|
||||
"DomainRequest",
|
||||
"PublicContact",
|
||||
"Domain",
|
||||
"User",
|
||||
"Contact",
|
||||
"Website",
|
||||
"DraftDomain",
|
||||
"HostIp",
|
||||
"Host",
|
||||
]
|
||||
|
||||
for table_name in table_names:
|
||||
self.clean_table(table_name)
|
||||
|
||||
def clean_table(self, table_name):
|
||||
"""Delete all rows in the given table"""
|
||||
try:
|
||||
# Get the model class dynamically
|
||||
model = apps.get_model("registrar", table_name)
|
||||
# Use a transaction to ensure database integrity
|
||||
with transaction.atomic():
|
||||
model.objects.all().delete()
|
||||
logger.info(f"Successfully cleaned table {table_name}")
|
||||
except LookupError:
|
||||
logger.error(f"Model for table {table_name} not found.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning table {table_name}: {e}")
|
64
src/registrar/management/commands/export_tables.py
Normal file
64
src/registrar/management/commands/export_tables.py
Normal file
|
@ -0,0 +1,64 @@
|
|||
import logging
|
||||
import os
|
||||
import pyzipper
|
||||
from django.core.management import BaseCommand
|
||||
import registrar.admin
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Exports tables in csv format to zip file in tmp directory."
|
||||
|
||||
def handle(self, **options):
|
||||
"""Generates CSV files for specified tables and creates a zip archive"""
|
||||
table_names = [
|
||||
"User",
|
||||
"Contact",
|
||||
"Domain",
|
||||
"DomainRequest",
|
||||
"DomainInformation",
|
||||
"UserDomainRole",
|
||||
"DraftDomain",
|
||||
"Website",
|
||||
"HostIp",
|
||||
"Host",
|
||||
"PublicContact",
|
||||
]
|
||||
|
||||
# Ensure the tmp directory exists
|
||||
os.makedirs("tmp", exist_ok=True)
|
||||
|
||||
for table_name in table_names:
|
||||
self.export_table(table_name)
|
||||
|
||||
# Create a zip file containing all the CSV files
|
||||
zip_filename = "tmp/exported_tables.zip"
|
||||
with pyzipper.AESZipFile(zip_filename, "w", compression=pyzipper.ZIP_DEFLATED) as zipf:
|
||||
for table_name in table_names:
|
||||
csv_filename = f"tmp/{table_name}.csv"
|
||||
if os.path.exists(csv_filename):
|
||||
zipf.write(csv_filename, os.path.basename(csv_filename))
|
||||
logger.info(f"Added {csv_filename} to zip archive {zip_filename}")
|
||||
|
||||
# Remove the CSV files after adding them to the zip file
|
||||
for table_name in table_names:
|
||||
csv_filename = f"tmp/{table_name}.csv"
|
||||
if os.path.exists(csv_filename):
|
||||
os.remove(csv_filename)
|
||||
logger.info(f"Removed temporary file {csv_filename}")
|
||||
|
||||
def export_table(self, table_name):
|
||||
"""Export a given table to a csv file in the tmp directory"""
|
||||
resourcename = f"{table_name}Resource"
|
||||
try:
|
||||
resourceclass = getattr(registrar.admin, resourcename)
|
||||
dataset = resourceclass().export()
|
||||
filename = f"tmp/{table_name}.csv"
|
||||
with open(filename, "w") as outputfile:
|
||||
outputfile.write(dataset.csv)
|
||||
logger.info(f"Successfully exported {table_name} to {filename}")
|
||||
except AttributeError:
|
||||
logger.error(f"Resource class {resourcename} not found in registrar.admin")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to export {table_name}: {e}")
|
104
src/registrar/management/commands/import_tables.py
Normal file
104
src/registrar/management/commands/import_tables.py
Normal file
|
@ -0,0 +1,104 @@
|
|||
import logging
|
||||
import os
|
||||
import pyzipper
|
||||
import tablib
|
||||
from django.apps import apps
|
||||
from django.conf import settings
|
||||
from django.db import transaction
|
||||
from django.core.management import BaseCommand
|
||||
import registrar.admin
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Imports tables from a zip file, exported_tables.zip, containing CSV files in the tmp directory."
|
||||
|
||||
def handle(self, **options):
|
||||
"""Extracts CSV files from a zip archive and imports them into the respective tables"""
|
||||
|
||||
if settings.IS_PRODUCTION:
|
||||
logger.error("import_tables cannot be run in production")
|
||||
return
|
||||
|
||||
table_names = [
|
||||
"User",
|
||||
"Contact",
|
||||
"Domain",
|
||||
"Host",
|
||||
"HostIp",
|
||||
"DraftDomain",
|
||||
"Website",
|
||||
"DomainRequest",
|
||||
"DomainInformation",
|
||||
"UserDomainRole",
|
||||
"PublicContact",
|
||||
]
|
||||
|
||||
# Ensure the tmp directory exists
|
||||
os.makedirs("tmp", exist_ok=True)
|
||||
|
||||
# Unzip the file
|
||||
zip_filename = "tmp/exported_tables.zip"
|
||||
if not os.path.exists(zip_filename):
|
||||
logger.error(f"Zip file {zip_filename} does not exist.")
|
||||
return
|
||||
|
||||
with pyzipper.AESZipFile(zip_filename, "r") as zipf:
|
||||
zipf.extractall("tmp")
|
||||
logger.info(f"Extracted zip file {zip_filename} into tmp directory")
|
||||
|
||||
# Import each CSV file
|
||||
for table_name in table_names:
|
||||
self.import_table(table_name)
|
||||
|
||||
def import_table(self, table_name):
|
||||
"""Import data from a CSV file into the given table"""
|
||||
|
||||
resourcename = f"{table_name}Resource"
|
||||
csv_filename = f"tmp/{table_name}.csv"
|
||||
try:
|
||||
if not os.path.exists(csv_filename):
|
||||
logger.error(f"CSV file {csv_filename} not found.")
|
||||
return
|
||||
|
||||
# if table_name is Contact, clean the table first
|
||||
# User table is loaded before Contact, and signals create
|
||||
# rows in Contact table which break the import, so need
|
||||
# to be cleaned again before running import on Contact table
|
||||
if table_name == "Contact":
|
||||
self.clean_table(table_name)
|
||||
|
||||
resourceclass = getattr(registrar.admin, resourcename)
|
||||
resource_instance = resourceclass()
|
||||
with open(csv_filename, "r") as csvfile:
|
||||
dataset = tablib.Dataset().load(csvfile.read(), format="csv")
|
||||
result = resource_instance.import_data(dataset, dry_run=False, skip_epp_save=True)
|
||||
|
||||
if result.has_errors():
|
||||
logger.error(f"Errors occurred while importing {csv_filename}: {result.row_errors()}")
|
||||
else:
|
||||
logger.info(f"Successfully imported {csv_filename} into {table_name}")
|
||||
|
||||
except AttributeError:
|
||||
logger.error(f"Resource class {resourcename} not found in registrar.admin")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to import {csv_filename}: {e}")
|
||||
finally:
|
||||
if os.path.exists(csv_filename):
|
||||
os.remove(csv_filename)
|
||||
logger.info(f"Removed temporary file {csv_filename}")
|
||||
|
||||
def clean_table(self, table_name):
|
||||
"""Delete all rows in the given table"""
|
||||
try:
|
||||
# Get the model class dynamically
|
||||
model = apps.get_model("registrar", table_name)
|
||||
# Use a transaction to ensure database integrity
|
||||
with transaction.atomic():
|
||||
model.objects.all().delete()
|
||||
logger.info(f"Successfully cleaned table {table_name}")
|
||||
except LookupError:
|
||||
logger.error(f"Model for table {table_name} not found.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning table {table_name}: {e}")
|
|
@ -1,9 +1,12 @@
|
|||
import copy
|
||||
from datetime import date, datetime, time
|
||||
from django.core.management import call_command
|
||||
from django.test import TestCase, override_settings
|
||||
from django.utils import timezone
|
||||
|
||||
from django.test import TestCase
|
||||
|
||||
from django.utils.module_loading import import_string
|
||||
import logging
|
||||
import pyzipper
|
||||
from registrar.management.commands.clean_tables import Command as CleanTablesCommand
|
||||
from registrar.models import (
|
||||
User,
|
||||
Domain,
|
||||
|
@ -18,14 +21,15 @@ from registrar.models import (
|
|||
PublicContact,
|
||||
FederalAgency,
|
||||
)
|
||||
|
||||
from django.core.management import call_command
|
||||
from unittest.mock import patch, call
|
||||
import tablib
|
||||
from unittest.mock import patch, call, MagicMock, mock_open
|
||||
from epplibwrapper import commands, common
|
||||
|
||||
from .common import MockEppLib, less_console_noise, completed_domain_request
|
||||
from api.tests.common import less_console_noise_decorator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestPopulateVerificationType(MockEppLib):
|
||||
"""Tests for the populate_organization_type script"""
|
||||
|
@ -767,3 +771,340 @@ class TestDiscloseEmails(MockEppLib):
|
|||
)
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class TestCleanTables(TestCase):
|
||||
"""Test the clean_tables script"""
|
||||
|
||||
def setUp(self):
|
||||
self.command = CleanTablesCommand()
|
||||
self.logger_patcher = patch("registrar.management.commands.clean_tables.logger")
|
||||
self.logger_mock = self.logger_patcher.start()
|
||||
|
||||
def tearDown(self):
|
||||
self.logger_patcher.stop()
|
||||
|
||||
@override_settings(IS_PRODUCTION=True)
|
||||
def test_command_logs_error_in_production(self):
|
||||
"""Test that the handle method does not process in production"""
|
||||
with less_console_noise():
|
||||
with patch(
|
||||
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa
|
||||
return_value=True,
|
||||
):
|
||||
call_command("clean_tables")
|
||||
self.logger_mock.error.assert_called_with("clean_tables cannot be run in production")
|
||||
|
||||
@override_settings(IS_PRODUCTION=False)
|
||||
def test_command_cleans_tables(self):
|
||||
"""test that the handle method functions properly to clean tables"""
|
||||
with less_console_noise():
|
||||
with patch("django.apps.apps.get_model") as get_model_mock:
|
||||
model_mock = MagicMock()
|
||||
get_model_mock.return_value = model_mock
|
||||
|
||||
with patch(
|
||||
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa
|
||||
return_value=True,
|
||||
):
|
||||
call_command("clean_tables")
|
||||
|
||||
table_names = [
|
||||
"DomainInformation",
|
||||
"DomainRequest",
|
||||
"PublicContact",
|
||||
"Domain",
|
||||
"User",
|
||||
"Contact",
|
||||
"Website",
|
||||
"DraftDomain",
|
||||
"HostIp",
|
||||
"Host",
|
||||
]
|
||||
|
||||
# Check that each model's delete method was called
|
||||
for table_name in table_names:
|
||||
get_model_mock.assert_any_call("registrar", table_name)
|
||||
model_mock.objects.all().delete.assert_called()
|
||||
|
||||
self.logger_mock.info.assert_any_call("Successfully cleaned table DomainInformation")
|
||||
|
||||
@override_settings(IS_PRODUCTION=False)
|
||||
def test_command_handles_nonexistent_model(self):
|
||||
"""Test that exceptions for non existent models are handled properly within the handle method"""
|
||||
with less_console_noise():
|
||||
with patch("django.apps.apps.get_model", side_effect=LookupError):
|
||||
with patch(
|
||||
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa
|
||||
return_value=True,
|
||||
):
|
||||
call_command("clean_tables")
|
||||
# Assert that the error message was logged for any of the table names
|
||||
self.logger_mock.error.assert_any_call("Model for table DomainInformation not found.")
|
||||
self.logger_mock.error.assert_any_call("Model for table DomainRequest not found.")
|
||||
self.logger_mock.error.assert_any_call("Model for table PublicContact not found.")
|
||||
self.logger_mock.error.assert_any_call("Model for table Domain not found.")
|
||||
self.logger_mock.error.assert_any_call("Model for table User not found.")
|
||||
self.logger_mock.error.assert_any_call("Model for table Contact not found.")
|
||||
self.logger_mock.error.assert_any_call("Model for table Website not found.")
|
||||
self.logger_mock.error.assert_any_call("Model for table DraftDomain not found.")
|
||||
self.logger_mock.error.assert_any_call("Model for table HostIp not found.")
|
||||
self.logger_mock.error.assert_any_call("Model for table Host not found.")
|
||||
|
||||
@override_settings(IS_PRODUCTION=False)
|
||||
def test_command_logs_other_exceptions(self):
|
||||
"""Test that generic exceptions are handled properly in the handle method"""
|
||||
with less_console_noise():
|
||||
with patch("django.apps.apps.get_model") as get_model_mock:
|
||||
model_mock = MagicMock()
|
||||
get_model_mock.return_value = model_mock
|
||||
model_mock.objects.all().delete.side_effect = Exception("Some error")
|
||||
|
||||
with patch(
|
||||
"registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", # noqa
|
||||
return_value=True,
|
||||
):
|
||||
call_command("clean_tables")
|
||||
|
||||
self.logger_mock.error.assert_any_call("Error cleaning table DomainInformation: Some error")
|
||||
|
||||
|
||||
class TestExportTables(MockEppLib):
|
||||
"""Test the export_tables script"""
|
||||
|
||||
def setUp(self):
|
||||
self.logger_patcher = patch("registrar.management.commands.export_tables.logger")
|
||||
self.logger_mock = self.logger_patcher.start()
|
||||
|
||||
def tearDown(self):
|
||||
self.logger_patcher.stop()
|
||||
|
||||
@patch("registrar.management.commands.export_tables.os.makedirs")
|
||||
@patch("registrar.management.commands.export_tables.os.path.exists")
|
||||
@patch("registrar.management.commands.export_tables.os.remove")
|
||||
@patch("registrar.management.commands.export_tables.pyzipper.AESZipFile")
|
||||
@patch("registrar.management.commands.export_tables.getattr")
|
||||
@patch("builtins.open", new_callable=mock_open, read_data=b"mock_csv_data")
|
||||
@patch("django.utils.translation.trans_real._translations", {})
|
||||
@patch("django.utils.translation.trans_real.translation")
|
||||
def test_handle(
|
||||
self, mock_translation, mock_file, mock_getattr, mock_zipfile, mock_remove, mock_path_exists, mock_makedirs
|
||||
):
|
||||
"""test that the handle method properly exports tables"""
|
||||
with less_console_noise():
|
||||
# Mock os.makedirs to do nothing
|
||||
mock_makedirs.return_value = None
|
||||
|
||||
# Mock os.path.exists to always return True
|
||||
mock_path_exists.return_value = True
|
||||
|
||||
# Mock the resource class and its export method
|
||||
mock_resource_class = MagicMock()
|
||||
mock_dataset = MagicMock()
|
||||
mock_dataset.csv = b"mock_csv_data"
|
||||
mock_resource_class().export.return_value = mock_dataset
|
||||
mock_getattr.return_value = mock_resource_class
|
||||
|
||||
# Mock translation function to return a dummy translation object
|
||||
mock_translation.return_value = MagicMock()
|
||||
|
||||
call_command("export_tables")
|
||||
|
||||
# Check that os.makedirs was called once to create the tmp directory
|
||||
mock_makedirs.assert_called_once_with("tmp", exist_ok=True)
|
||||
|
||||
# Check that the export_table function was called for each table
|
||||
table_names = [
|
||||
"User",
|
||||
"Contact",
|
||||
"Domain",
|
||||
"DomainRequest",
|
||||
"DomainInformation",
|
||||
"UserDomainRole",
|
||||
"DraftDomain",
|
||||
"Website",
|
||||
"HostIp",
|
||||
"Host",
|
||||
"PublicContact",
|
||||
]
|
||||
|
||||
# Check that the CSV file was written
|
||||
for table_name in table_names:
|
||||
mock_file().write.assert_any_call(b"mock_csv_data")
|
||||
# Check that os.path.exists was called
|
||||
mock_path_exists.assert_any_call(f"tmp/{table_name}.csv")
|
||||
# Check that os.remove was called
|
||||
mock_remove.assert_any_call(f"tmp/{table_name}.csv")
|
||||
|
||||
# Check that the zipfile was created and files were added
|
||||
mock_zipfile.assert_called_once_with("tmp/exported_tables.zip", "w", compression=pyzipper.ZIP_DEFLATED)
|
||||
zipfile_instance = mock_zipfile.return_value.__enter__.return_value
|
||||
for table_name in table_names:
|
||||
zipfile_instance.write.assert_any_call(f"tmp/{table_name}.csv", f"{table_name}.csv")
|
||||
|
||||
# Verify logging for added files
|
||||
for table_name in table_names:
|
||||
self.logger_mock.info.assert_any_call(
|
||||
f"Added tmp/{table_name}.csv to zip archive tmp/exported_tables.zip"
|
||||
)
|
||||
|
||||
# Verify logging for removed files
|
||||
for table_name in table_names:
|
||||
self.logger_mock.info.assert_any_call(f"Removed temporary file tmp/{table_name}.csv")
|
||||
|
||||
@patch("registrar.management.commands.export_tables.getattr")
|
||||
def test_export_table_handles_missing_resource_class(self, mock_getattr):
|
||||
"""Test that missing resource classes are handled properly in the handle method"""
|
||||
with less_console_noise():
|
||||
mock_getattr.side_effect = AttributeError
|
||||
|
||||
# Import the command to avoid any locale or gettext issues
|
||||
command_class = import_string("registrar.management.commands.export_tables.Command")
|
||||
command_instance = command_class()
|
||||
command_instance.export_table("NonExistentTable")
|
||||
|
||||
self.logger_mock.error.assert_called_with(
|
||||
"Resource class NonExistentTableResource not found in registrar.admin"
|
||||
)
|
||||
|
||||
@patch("registrar.management.commands.export_tables.getattr")
|
||||
def test_export_table_handles_generic_exception(self, mock_getattr):
|
||||
"""Test that general exceptions in the handle method are handled correctly"""
|
||||
with less_console_noise():
|
||||
mock_resource_class = MagicMock()
|
||||
mock_resource_class().export.side_effect = Exception("Test Exception")
|
||||
mock_getattr.return_value = mock_resource_class
|
||||
|
||||
# Import the command to avoid any locale or gettext issues
|
||||
command_class = import_string("registrar.management.commands.export_tables.Command")
|
||||
command_instance = command_class()
|
||||
command_instance.export_table("TestTable")
|
||||
|
||||
self.logger_mock.error.assert_called_with("Failed to export TestTable: Test Exception")
|
||||
|
||||
|
||||
class TestImportTables(TestCase):
|
||||
"""Test the import_tables script"""
|
||||
|
||||
@patch("registrar.management.commands.import_tables.os.makedirs")
|
||||
@patch("registrar.management.commands.import_tables.os.path.exists")
|
||||
@patch("registrar.management.commands.import_tables.os.remove")
|
||||
@patch("registrar.management.commands.import_tables.pyzipper.AESZipFile")
|
||||
@patch("registrar.management.commands.import_tables.tablib.Dataset")
|
||||
@patch("registrar.management.commands.import_tables.open", create=True)
|
||||
@patch("registrar.management.commands.import_tables.logger")
|
||||
@patch("registrar.management.commands.import_tables.getattr")
|
||||
@patch("django.apps.apps.get_model")
|
||||
def test_handle(
|
||||
self,
|
||||
mock_get_model,
|
||||
mock_getattr,
|
||||
mock_logger,
|
||||
mock_open,
|
||||
mock_dataset,
|
||||
mock_zipfile,
|
||||
mock_remove,
|
||||
mock_path_exists,
|
||||
mock_makedirs,
|
||||
):
|
||||
"""Test that the handle method properly imports tables"""
|
||||
with less_console_noise():
|
||||
# Mock os.makedirs to do nothing
|
||||
mock_makedirs.return_value = None
|
||||
|
||||
# Mock os.path.exists to always return True
|
||||
mock_path_exists.return_value = True
|
||||
|
||||
# Mock the zipfile to have extractall return None
|
||||
mock_zipfile_instance = mock_zipfile.return_value.__enter__.return_value
|
||||
mock_zipfile_instance.extractall.return_value = None
|
||||
|
||||
# Mock the CSV file content
|
||||
csv_content = b"mock_csv_data"
|
||||
|
||||
# Mock the open function to return a mock file
|
||||
mock_open.return_value.__enter__.return_value.read.return_value = csv_content
|
||||
|
||||
# Mock the Dataset class and its load method to return a dataset
|
||||
mock_dataset_instance = MagicMock(spec=tablib.Dataset)
|
||||
with patch(
|
||||
"registrar.management.commands.import_tables.tablib.Dataset.load", return_value=mock_dataset_instance
|
||||
):
|
||||
# Mock the resource class and its import method
|
||||
mock_resource_class = MagicMock()
|
||||
mock_resource_instance = MagicMock()
|
||||
mock_result = MagicMock()
|
||||
mock_result.has_errors.return_value = False
|
||||
mock_resource_instance.import_data.return_value = mock_result
|
||||
mock_resource_class.return_value = mock_resource_instance
|
||||
mock_getattr.return_value = mock_resource_class
|
||||
|
||||
# Call the command
|
||||
call_command("import_tables")
|
||||
|
||||
# Check that os.makedirs was called once to create the tmp directory
|
||||
mock_makedirs.assert_called_once_with("tmp", exist_ok=True)
|
||||
|
||||
# Check that os.path.exists was called once for the zip file
|
||||
mock_path_exists.assert_any_call("tmp/exported_tables.zip")
|
||||
|
||||
# Check that pyzipper.AESZipFile was called once to open the zip file
|
||||
mock_zipfile.assert_called_once_with("tmp/exported_tables.zip", "r")
|
||||
|
||||
# Check that extractall was called once to extract the zip file contents
|
||||
mock_zipfile_instance.extractall.assert_called_once_with("tmp")
|
||||
|
||||
# Check that the import_table function was called for each table
|
||||
table_names = [
|
||||
"User",
|
||||
"Contact",
|
||||
"Domain",
|
||||
"DomainRequest",
|
||||
"DomainInformation",
|
||||
"UserDomainRole",
|
||||
"DraftDomain",
|
||||
"Website",
|
||||
"HostIp",
|
||||
"Host",
|
||||
"PublicContact",
|
||||
]
|
||||
# Check that os.path.exists was called for each table
|
||||
for table_name in table_names:
|
||||
mock_path_exists.assert_any_call(f"tmp/{table_name}.csv")
|
||||
|
||||
# Check that clean_tables is called for Contact
|
||||
mock_get_model.assert_any_call("registrar", "Contact")
|
||||
model_mock = mock_get_model.return_value
|
||||
model_mock.objects.all().delete.assert_called()
|
||||
|
||||
# Check that logger.info was called for each successful import
|
||||
for table_name in table_names:
|
||||
mock_logger.info.assert_any_call(f"Successfully imported tmp/{table_name}.csv into {table_name}")
|
||||
|
||||
# Check that logger.error was not called for resource class not found
|
||||
mock_logger.error.assert_not_called()
|
||||
|
||||
# Check that os.remove was called for each CSV file
|
||||
for table_name in table_names:
|
||||
mock_remove.assert_any_call(f"tmp/{table_name}.csv")
|
||||
|
||||
# Check that logger.info was called for each CSV file removal
|
||||
for table_name in table_names:
|
||||
mock_logger.info.assert_any_call(f"Removed temporary file tmp/{table_name}.csv")
|
||||
|
||||
@patch("registrar.management.commands.import_tables.logger")
|
||||
@patch("registrar.management.commands.import_tables.os.makedirs")
|
||||
@patch("registrar.management.commands.import_tables.os.path.exists")
|
||||
def test_handle_zip_file_not_found(self, mock_path_exists, mock_makedirs, mock_logger):
|
||||
"""Test the handle method when the zip file doesn't exist"""
|
||||
with less_console_noise():
|
||||
# Mock os.makedirs to do nothing
|
||||
mock_makedirs.return_value = None
|
||||
|
||||
# Mock os.path.exists to return False
|
||||
mock_path_exists.return_value = False
|
||||
|
||||
call_command("import_tables")
|
||||
|
||||
# Check that logger.error was called with the correct message
|
||||
mock_logger.error.assert_called_once_with("Zip file tmp/exported_tables.zip does not exist.")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue