mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-08-12 12:39:43 +02:00
added tests
This commit is contained in:
parent
b3454fd3c6
commit
42a2e8d3ae
2 changed files with 332 additions and 7 deletions
|
@ -54,6 +54,7 @@ class Command(BaseCommand):
|
||||||
|
|
||||||
def import_table(self, table_name):
|
def import_table(self, table_name):
|
||||||
"""Import data from a CSV file into the given table"""
|
"""Import data from a CSV file into the given table"""
|
||||||
|
|
||||||
resourcename = f"{table_name}Resource"
|
resourcename = f"{table_name}Resource"
|
||||||
csv_filename = f"tmp/{table_name}.csv"
|
csv_filename = f"tmp/{table_name}.csv"
|
||||||
try:
|
try:
|
||||||
|
@ -68,7 +69,6 @@ class Command(BaseCommand):
|
||||||
resourceclass = getattr(registrar.admin, resourcename)
|
resourceclass = getattr(registrar.admin, resourcename)
|
||||||
resource_instance = resourceclass()
|
resource_instance = resourceclass()
|
||||||
with open(csv_filename, "r") as csvfile:
|
with open(csv_filename, "r") as csvfile:
|
||||||
# dataset = resource_instance.import_data(csvfile.read())
|
|
||||||
dataset = tablib.Dataset().load(csvfile.read(), format="csv")
|
dataset = tablib.Dataset().load(csvfile.read(), format="csv")
|
||||||
result = resource_instance.import_data(dataset, dry_run=False, skip_epp_save=True)
|
result = resource_instance.import_data(dataset, dry_run=False, skip_epp_save=True)
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,12 @@
|
||||||
import copy
|
import copy
|
||||||
from datetime import date, datetime, time
|
from datetime import date, datetime, time
|
||||||
|
from django.core.management import call_command
|
||||||
|
from django.test import TestCase, override_settings
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
|
from django.utils.module_loading import import_string
|
||||||
from django.test import TestCase
|
import logging
|
||||||
|
import pyzipper
|
||||||
|
from registrar.management.commands.clean_tables import Command as CleanTablesCommand
|
||||||
from registrar.models import (
|
from registrar.models import (
|
||||||
User,
|
User,
|
||||||
Domain,
|
Domain,
|
||||||
|
@ -18,14 +21,15 @@ from registrar.models import (
|
||||||
PublicContact,
|
PublicContact,
|
||||||
FederalAgency,
|
FederalAgency,
|
||||||
)
|
)
|
||||||
|
import tablib
|
||||||
from django.core.management import call_command
|
from unittest.mock import patch, call, MagicMock, mock_open
|
||||||
from unittest.mock import patch, call
|
|
||||||
from epplibwrapper import commands, common
|
from epplibwrapper import commands, common
|
||||||
|
|
||||||
from .common import MockEppLib, less_console_noise, completed_domain_request
|
from .common import MockEppLib, less_console_noise, completed_domain_request
|
||||||
from api.tests.common import less_console_noise_decorator
|
from api.tests.common import less_console_noise_decorator
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class TestPopulateVerificationType(MockEppLib):
|
class TestPopulateVerificationType(MockEppLib):
|
||||||
"""Tests for the populate_organization_type script"""
|
"""Tests for the populate_organization_type script"""
|
||||||
|
@ -767,3 +771,324 @@ class TestDiscloseEmails(MockEppLib):
|
||||||
)
|
)
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCleanTables(TestCase):
|
||||||
|
"""Test the clean_tables script"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.command = CleanTablesCommand()
|
||||||
|
self.logger_patcher = patch("registrar.management.commands.clean_tables.logger")
|
||||||
|
self.logger_mock = self.logger_patcher.start()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.logger_patcher.stop()
|
||||||
|
|
||||||
|
@override_settings(IS_PRODUCTION=True)
|
||||||
|
def test_command_logs_error_in_production(self):
|
||||||
|
"""Test that the handle method does not process in production"""
|
||||||
|
with less_console_noise():
|
||||||
|
call_command("clean_tables")
|
||||||
|
self.logger_mock.error.assert_called_with("clean_tables cannot be run in production")
|
||||||
|
|
||||||
|
@override_settings(IS_PRODUCTION=False)
|
||||||
|
def test_command_cleans_tables(self):
|
||||||
|
"""test that the handle method functions properly to clean tables"""
|
||||||
|
with less_console_noise():
|
||||||
|
with patch("django.apps.apps.get_model") as get_model_mock:
|
||||||
|
model_mock = MagicMock()
|
||||||
|
get_model_mock.return_value = model_mock
|
||||||
|
|
||||||
|
call_command("clean_tables")
|
||||||
|
|
||||||
|
table_names = [
|
||||||
|
"DomainInformation",
|
||||||
|
"DomainRequest",
|
||||||
|
"PublicContact",
|
||||||
|
"Domain",
|
||||||
|
"User",
|
||||||
|
"Contact",
|
||||||
|
"Website",
|
||||||
|
"DraftDomain",
|
||||||
|
"HostIp",
|
||||||
|
"Host",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Check that each model's delete method was called
|
||||||
|
for table_name in table_names:
|
||||||
|
get_model_mock.assert_any_call("registrar", table_name)
|
||||||
|
model_mock.objects.all().delete.assert_called()
|
||||||
|
|
||||||
|
self.logger_mock.info.assert_any_call("Successfully cleaned table DomainInformation")
|
||||||
|
|
||||||
|
@override_settings(IS_PRODUCTION=False)
|
||||||
|
def test_command_handles_nonexistent_model(self):
|
||||||
|
"""Test that exceptions for non existent models are handled properly within the handle method"""
|
||||||
|
with less_console_noise():
|
||||||
|
with patch("django.apps.apps.get_model", side_effect=LookupError):
|
||||||
|
call_command("clean_tables")
|
||||||
|
# Assert that the error message was logged for any of the table names
|
||||||
|
self.logger_mock.error.assert_any_call("Model for table DomainInformation not found.")
|
||||||
|
self.logger_mock.error.assert_any_call("Model for table DomainRequest not found.")
|
||||||
|
self.logger_mock.error.assert_any_call("Model for table PublicContact not found.")
|
||||||
|
self.logger_mock.error.assert_any_call("Model for table Domain not found.")
|
||||||
|
self.logger_mock.error.assert_any_call("Model for table User not found.")
|
||||||
|
self.logger_mock.error.assert_any_call("Model for table Contact not found.")
|
||||||
|
self.logger_mock.error.assert_any_call("Model for table Website not found.")
|
||||||
|
self.logger_mock.error.assert_any_call("Model for table DraftDomain not found.")
|
||||||
|
self.logger_mock.error.assert_any_call("Model for table HostIp not found.")
|
||||||
|
self.logger_mock.error.assert_any_call("Model for table Host not found.")
|
||||||
|
|
||||||
|
@override_settings(IS_PRODUCTION=False)
|
||||||
|
def test_command_logs_other_exceptions(self):
|
||||||
|
"""Test that generic exceptions are handled properly in the handle method"""
|
||||||
|
with less_console_noise():
|
||||||
|
with patch("django.apps.apps.get_model") as get_model_mock:
|
||||||
|
model_mock = MagicMock()
|
||||||
|
get_model_mock.return_value = model_mock
|
||||||
|
model_mock.objects.all().delete.side_effect = Exception("Some error")
|
||||||
|
|
||||||
|
call_command("clean_tables")
|
||||||
|
|
||||||
|
self.logger_mock.error.assert_any_call("Error cleaning table DomainInformation: Some error")
|
||||||
|
|
||||||
|
|
||||||
|
class TestExportTables(MockEppLib):
|
||||||
|
"""Test the export_tables script"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.logger_patcher = patch("registrar.management.commands.export_tables.logger")
|
||||||
|
self.logger_mock = self.logger_patcher.start()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.logger_patcher.stop()
|
||||||
|
|
||||||
|
@patch("registrar.management.commands.export_tables.os.makedirs")
|
||||||
|
@patch("registrar.management.commands.export_tables.os.path.exists")
|
||||||
|
@patch("registrar.management.commands.export_tables.os.remove")
|
||||||
|
@patch("registrar.management.commands.export_tables.pyzipper.AESZipFile")
|
||||||
|
@patch("registrar.management.commands.export_tables.getattr")
|
||||||
|
@patch("builtins.open", new_callable=mock_open, read_data=b"mock_csv_data")
|
||||||
|
@patch("django.utils.translation.trans_real._translations", {})
|
||||||
|
@patch("django.utils.translation.trans_real.translation")
|
||||||
|
def test_handle(
|
||||||
|
self, mock_translation, mock_file, mock_getattr, mock_zipfile, mock_remove, mock_path_exists, mock_makedirs
|
||||||
|
):
|
||||||
|
"""test that the handle method properly exports tables"""
|
||||||
|
with less_console_noise():
|
||||||
|
# Mock os.makedirs to do nothing
|
||||||
|
mock_makedirs.return_value = None
|
||||||
|
|
||||||
|
# Mock os.path.exists to always return True
|
||||||
|
mock_path_exists.return_value = True
|
||||||
|
|
||||||
|
# Mock the resource class and its export method
|
||||||
|
mock_resource_class = MagicMock()
|
||||||
|
mock_dataset = MagicMock()
|
||||||
|
mock_dataset.csv = b"mock_csv_data"
|
||||||
|
mock_resource_class().export.return_value = mock_dataset
|
||||||
|
mock_getattr.return_value = mock_resource_class
|
||||||
|
|
||||||
|
# Mock translation function to return a dummy translation object
|
||||||
|
mock_translation.return_value = MagicMock()
|
||||||
|
|
||||||
|
call_command("export_tables")
|
||||||
|
|
||||||
|
# Check that os.makedirs was called once to create the tmp directory
|
||||||
|
mock_makedirs.assert_called_once_with("tmp", exist_ok=True)
|
||||||
|
|
||||||
|
# Check that the export_table function was called for each table
|
||||||
|
table_names = [
|
||||||
|
"User",
|
||||||
|
"Contact",
|
||||||
|
"Domain",
|
||||||
|
"DomainRequest",
|
||||||
|
"DomainInformation",
|
||||||
|
"UserDomainRole",
|
||||||
|
"DraftDomain",
|
||||||
|
"Website",
|
||||||
|
"HostIp",
|
||||||
|
"Host",
|
||||||
|
"PublicContact",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Check that the CSV file was written
|
||||||
|
for table_name in table_names:
|
||||||
|
mock_file().write.assert_any_call(b"mock_csv_data")
|
||||||
|
# Check that os.path.exists was called
|
||||||
|
mock_path_exists.assert_any_call(f"tmp/{table_name}.csv")
|
||||||
|
# Check that os.remove was called
|
||||||
|
mock_remove.assert_any_call(f"tmp/{table_name}.csv")
|
||||||
|
|
||||||
|
# Check that the zipfile was created and files were added
|
||||||
|
mock_zipfile.assert_called_once_with("tmp/exported_tables.zip", "w", compression=pyzipper.ZIP_DEFLATED)
|
||||||
|
zipfile_instance = mock_zipfile.return_value.__enter__.return_value
|
||||||
|
for table_name in table_names:
|
||||||
|
zipfile_instance.write.assert_any_call(f"tmp/{table_name}.csv", f"{table_name}.csv")
|
||||||
|
|
||||||
|
# Verify logging for added files
|
||||||
|
for table_name in table_names:
|
||||||
|
self.logger_mock.info.assert_any_call(
|
||||||
|
f"Added tmp/{table_name}.csv to zip archive tmp/exported_tables.zip"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify logging for removed files
|
||||||
|
for table_name in table_names:
|
||||||
|
self.logger_mock.info.assert_any_call(f"Removed temporary file tmp/{table_name}.csv")
|
||||||
|
|
||||||
|
@patch("registrar.management.commands.export_tables.getattr")
|
||||||
|
def test_export_table_handles_missing_resource_class(self, mock_getattr):
|
||||||
|
"""Test that missing resource classes are handled properly in the handle method"""
|
||||||
|
with less_console_noise():
|
||||||
|
mock_getattr.side_effect = AttributeError
|
||||||
|
|
||||||
|
# Import the command to avoid any locale or gettext issues
|
||||||
|
command_class = import_string("registrar.management.commands.export_tables.Command")
|
||||||
|
command_instance = command_class()
|
||||||
|
command_instance.export_table("NonExistentTable")
|
||||||
|
|
||||||
|
self.logger_mock.error.assert_called_with(
|
||||||
|
"Resource class NonExistentTableResource not found in registrar.admin"
|
||||||
|
)
|
||||||
|
|
||||||
|
@patch("registrar.management.commands.export_tables.getattr")
|
||||||
|
def test_export_table_handles_generic_exception(self, mock_getattr):
|
||||||
|
"""Test that general exceptions in the handle method are handled correctly"""
|
||||||
|
with less_console_noise():
|
||||||
|
mock_resource_class = MagicMock()
|
||||||
|
mock_resource_class().export.side_effect = Exception("Test Exception")
|
||||||
|
mock_getattr.return_value = mock_resource_class
|
||||||
|
|
||||||
|
# Import the command to avoid any locale or gettext issues
|
||||||
|
command_class = import_string("registrar.management.commands.export_tables.Command")
|
||||||
|
command_instance = command_class()
|
||||||
|
command_instance.export_table("TestTable")
|
||||||
|
|
||||||
|
self.logger_mock.error.assert_called_with("Failed to export TestTable: Test Exception")
|
||||||
|
|
||||||
|
|
||||||
|
class TestImportTables(TestCase):
|
||||||
|
"""Test the import_tables script"""
|
||||||
|
|
||||||
|
@patch("registrar.management.commands.import_tables.os.makedirs")
|
||||||
|
@patch("registrar.management.commands.import_tables.os.path.exists")
|
||||||
|
@patch("registrar.management.commands.import_tables.os.remove")
|
||||||
|
@patch("registrar.management.commands.import_tables.pyzipper.AESZipFile")
|
||||||
|
@patch("registrar.management.commands.import_tables.tablib.Dataset")
|
||||||
|
@patch("registrar.management.commands.import_tables.open", create=True)
|
||||||
|
@patch("registrar.management.commands.import_tables.logger")
|
||||||
|
@patch("registrar.management.commands.import_tables.getattr")
|
||||||
|
@patch("django.apps.apps.get_model")
|
||||||
|
def test_handle(
|
||||||
|
self,
|
||||||
|
mock_get_model,
|
||||||
|
mock_getattr,
|
||||||
|
mock_logger,
|
||||||
|
mock_open,
|
||||||
|
mock_dataset,
|
||||||
|
mock_zipfile,
|
||||||
|
mock_remove,
|
||||||
|
mock_path_exists,
|
||||||
|
mock_makedirs,
|
||||||
|
):
|
||||||
|
"""Test that the handle method properly imports tables"""
|
||||||
|
with less_console_noise():
|
||||||
|
# Mock os.makedirs to do nothing
|
||||||
|
mock_makedirs.return_value = None
|
||||||
|
|
||||||
|
# Mock os.path.exists to always return True
|
||||||
|
mock_path_exists.return_value = True
|
||||||
|
|
||||||
|
# Mock the zipfile to have extractall return None
|
||||||
|
mock_zipfile_instance = mock_zipfile.return_value.__enter__.return_value
|
||||||
|
mock_zipfile_instance.extractall.return_value = None
|
||||||
|
|
||||||
|
# Mock the CSV file content
|
||||||
|
csv_content = b"mock_csv_data"
|
||||||
|
|
||||||
|
# Mock the open function to return a mock file
|
||||||
|
mock_open.return_value.__enter__.return_value.read.return_value = csv_content
|
||||||
|
|
||||||
|
# Mock the Dataset class and its load method to return a dataset
|
||||||
|
mock_dataset_instance = MagicMock(spec=tablib.Dataset)
|
||||||
|
with patch(
|
||||||
|
"registrar.management.commands.import_tables.tablib.Dataset.load", return_value=mock_dataset_instance
|
||||||
|
):
|
||||||
|
# Mock the resource class and its import method
|
||||||
|
mock_resource_class = MagicMock()
|
||||||
|
mock_resource_instance = MagicMock()
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_result.has_errors.return_value = False
|
||||||
|
mock_resource_instance.import_data.return_value = mock_result
|
||||||
|
mock_resource_class.return_value = mock_resource_instance
|
||||||
|
mock_getattr.return_value = mock_resource_class
|
||||||
|
|
||||||
|
# Call the command
|
||||||
|
call_command("import_tables")
|
||||||
|
|
||||||
|
# Check that os.makedirs was called once to create the tmp directory
|
||||||
|
mock_makedirs.assert_called_once_with("tmp", exist_ok=True)
|
||||||
|
|
||||||
|
# Check that os.path.exists was called once for the zip file
|
||||||
|
mock_path_exists.assert_any_call("tmp/exported_tables.zip")
|
||||||
|
|
||||||
|
# Check that pyzipper.AESZipFile was called once to open the zip file
|
||||||
|
mock_zipfile.assert_called_once_with("tmp/exported_tables.zip", "r")
|
||||||
|
|
||||||
|
# Check that extractall was called once to extract the zip file contents
|
||||||
|
mock_zipfile_instance.extractall.assert_called_once_with("tmp")
|
||||||
|
|
||||||
|
# Check that the import_table function was called for each table
|
||||||
|
table_names = [
|
||||||
|
"User",
|
||||||
|
"Contact",
|
||||||
|
"Domain",
|
||||||
|
"DomainRequest",
|
||||||
|
"DomainInformation",
|
||||||
|
"UserDomainRole",
|
||||||
|
"DraftDomain",
|
||||||
|
"Website",
|
||||||
|
"HostIp",
|
||||||
|
"Host",
|
||||||
|
"PublicContact",
|
||||||
|
]
|
||||||
|
# Check that os.path.exists was called for each table
|
||||||
|
for table_name in table_names:
|
||||||
|
mock_path_exists.assert_any_call(f"tmp/{table_name}.csv")
|
||||||
|
|
||||||
|
# Check that clean_tables is called for Contact
|
||||||
|
mock_get_model.assert_any_call("registrar", "Contact")
|
||||||
|
model_mock = mock_get_model.return_value
|
||||||
|
model_mock.objects.all().delete.assert_called()
|
||||||
|
|
||||||
|
# Check that logger.info was called for each successful import
|
||||||
|
for table_name in table_names:
|
||||||
|
mock_logger.info.assert_any_call(f"Successfully imported tmp/{table_name}.csv into {table_name}")
|
||||||
|
|
||||||
|
# Check that logger.error was not called for resource class not found
|
||||||
|
mock_logger.error.assert_not_called()
|
||||||
|
|
||||||
|
# Check that os.remove was called for each CSV file
|
||||||
|
for table_name in table_names:
|
||||||
|
mock_remove.assert_any_call(f"tmp/{table_name}.csv")
|
||||||
|
|
||||||
|
# Check that logger.info was called for each CSV file removal
|
||||||
|
for table_name in table_names:
|
||||||
|
mock_logger.info.assert_any_call(f"Removed temporary file tmp/{table_name}.csv")
|
||||||
|
|
||||||
|
@patch("registrar.management.commands.import_tables.logger")
|
||||||
|
@patch("registrar.management.commands.import_tables.os.makedirs")
|
||||||
|
@patch("registrar.management.commands.import_tables.os.path.exists")
|
||||||
|
def test_handle_zip_file_not_found(self, mock_path_exists, mock_makedirs, mock_logger):
|
||||||
|
"""Test the handle method when the zip file doesn't exist"""
|
||||||
|
with less_console_noise():
|
||||||
|
# Mock os.makedirs to do nothing
|
||||||
|
mock_makedirs.return_value = None
|
||||||
|
|
||||||
|
# Mock os.path.exists to return False
|
||||||
|
mock_path_exists.return_value = False
|
||||||
|
|
||||||
|
call_command("import_tables")
|
||||||
|
|
||||||
|
# Check that logger.error was called with the correct message
|
||||||
|
mock_logger.error.assert_called_once_with("Zip file tmp/exported_tables.zip does not exist.")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue