diff --git a/src/registrar/management/commands/import_tables.py b/src/registrar/management/commands/import_tables.py index c0539c517..fd241a48b 100644 --- a/src/registrar/management/commands/import_tables.py +++ b/src/registrar/management/commands/import_tables.py @@ -54,6 +54,7 @@ class Command(BaseCommand): def import_table(self, table_name): """Import data from a CSV file into the given table""" + resourcename = f"{table_name}Resource" csv_filename = f"tmp/{table_name}.csv" try: @@ -68,7 +69,6 @@ class Command(BaseCommand): resourceclass = getattr(registrar.admin, resourcename) resource_instance = resourceclass() with open(csv_filename, "r") as csvfile: - # dataset = resource_instance.import_data(csvfile.read()) dataset = tablib.Dataset().load(csvfile.read(), format="csv") result = resource_instance.import_data(dataset, dry_run=False, skip_epp_save=True) diff --git a/src/registrar/tests/test_management_scripts.py b/src/registrar/tests/test_management_scripts.py index 791c1eb1f..08cf71d1c 100644 --- a/src/registrar/tests/test_management_scripts.py +++ b/src/registrar/tests/test_management_scripts.py @@ -1,9 +1,12 @@ import copy from datetime import date, datetime, time +from django.core.management import call_command +from django.test import TestCase, override_settings from django.utils import timezone - -from django.test import TestCase - +from django.utils.module_loading import import_string +import logging +import pyzipper +from registrar.management.commands.clean_tables import Command as CleanTablesCommand from registrar.models import ( User, Domain, @@ -18,14 +21,15 @@ from registrar.models import ( PublicContact, FederalAgency, ) - -from django.core.management import call_command -from unittest.mock import patch, call +import tablib +from unittest.mock import patch, call, MagicMock, mock_open from epplibwrapper import commands, common from .common import MockEppLib, less_console_noise, completed_domain_request from api.tests.common import less_console_noise_decorator +logger = logging.getLogger(__name__) + class TestPopulateVerificationType(MockEppLib): """Tests for the populate_organization_type script""" @@ -767,3 +771,324 @@ class TestDiscloseEmails(MockEppLib): ) ] ) + + +class TestCleanTables(TestCase): + """Test the clean_tables script""" + + def setUp(self): + self.command = CleanTablesCommand() + self.logger_patcher = patch("registrar.management.commands.clean_tables.logger") + self.logger_mock = self.logger_patcher.start() + + def tearDown(self): + self.logger_patcher.stop() + + @override_settings(IS_PRODUCTION=True) + def test_command_logs_error_in_production(self): + """Test that the handle method does not process in production""" + with less_console_noise(): + call_command("clean_tables") + self.logger_mock.error.assert_called_with("clean_tables cannot be run in production") + + @override_settings(IS_PRODUCTION=False) + def test_command_cleans_tables(self): + """test that the handle method functions properly to clean tables""" + with less_console_noise(): + with patch("django.apps.apps.get_model") as get_model_mock: + model_mock = MagicMock() + get_model_mock.return_value = model_mock + + call_command("clean_tables") + + table_names = [ + "DomainInformation", + "DomainRequest", + "PublicContact", + "Domain", + "User", + "Contact", + "Website", + "DraftDomain", + "HostIp", + "Host", + ] + + # Check that each model's delete method was called + for table_name in table_names: + get_model_mock.assert_any_call("registrar", table_name) + model_mock.objects.all().delete.assert_called() + + self.logger_mock.info.assert_any_call("Successfully cleaned table DomainInformation") + + @override_settings(IS_PRODUCTION=False) + def test_command_handles_nonexistent_model(self): + """Test that exceptions for non existent models are handled properly within the handle method""" + with less_console_noise(): + with patch("django.apps.apps.get_model", side_effect=LookupError): + call_command("clean_tables") + # Assert that the error message was logged for any of the table names + self.logger_mock.error.assert_any_call("Model for table DomainInformation not found.") + self.logger_mock.error.assert_any_call("Model for table DomainRequest not found.") + self.logger_mock.error.assert_any_call("Model for table PublicContact not found.") + self.logger_mock.error.assert_any_call("Model for table Domain not found.") + self.logger_mock.error.assert_any_call("Model for table User not found.") + self.logger_mock.error.assert_any_call("Model for table Contact not found.") + self.logger_mock.error.assert_any_call("Model for table Website not found.") + self.logger_mock.error.assert_any_call("Model for table DraftDomain not found.") + self.logger_mock.error.assert_any_call("Model for table HostIp not found.") + self.logger_mock.error.assert_any_call("Model for table Host not found.") + + @override_settings(IS_PRODUCTION=False) + def test_command_logs_other_exceptions(self): + """Test that generic exceptions are handled properly in the handle method""" + with less_console_noise(): + with patch("django.apps.apps.get_model") as get_model_mock: + model_mock = MagicMock() + get_model_mock.return_value = model_mock + model_mock.objects.all().delete.side_effect = Exception("Some error") + + call_command("clean_tables") + + self.logger_mock.error.assert_any_call("Error cleaning table DomainInformation: Some error") + + +class TestExportTables(MockEppLib): + """Test the export_tables script""" + + def setUp(self): + self.logger_patcher = patch("registrar.management.commands.export_tables.logger") + self.logger_mock = self.logger_patcher.start() + + def tearDown(self): + self.logger_patcher.stop() + + @patch("registrar.management.commands.export_tables.os.makedirs") + @patch("registrar.management.commands.export_tables.os.path.exists") + @patch("registrar.management.commands.export_tables.os.remove") + @patch("registrar.management.commands.export_tables.pyzipper.AESZipFile") + @patch("registrar.management.commands.export_tables.getattr") + @patch("builtins.open", new_callable=mock_open, read_data=b"mock_csv_data") + @patch("django.utils.translation.trans_real._translations", {}) + @patch("django.utils.translation.trans_real.translation") + def test_handle( + self, mock_translation, mock_file, mock_getattr, mock_zipfile, mock_remove, mock_path_exists, mock_makedirs + ): + """test that the handle method properly exports tables""" + with less_console_noise(): + # Mock os.makedirs to do nothing + mock_makedirs.return_value = None + + # Mock os.path.exists to always return True + mock_path_exists.return_value = True + + # Mock the resource class and its export method + mock_resource_class = MagicMock() + mock_dataset = MagicMock() + mock_dataset.csv = b"mock_csv_data" + mock_resource_class().export.return_value = mock_dataset + mock_getattr.return_value = mock_resource_class + + # Mock translation function to return a dummy translation object + mock_translation.return_value = MagicMock() + + call_command("export_tables") + + # Check that os.makedirs was called once to create the tmp directory + mock_makedirs.assert_called_once_with("tmp", exist_ok=True) + + # Check that the export_table function was called for each table + table_names = [ + "User", + "Contact", + "Domain", + "DomainRequest", + "DomainInformation", + "UserDomainRole", + "DraftDomain", + "Website", + "HostIp", + "Host", + "PublicContact", + ] + + # Check that the CSV file was written + for table_name in table_names: + mock_file().write.assert_any_call(b"mock_csv_data") + # Check that os.path.exists was called + mock_path_exists.assert_any_call(f"tmp/{table_name}.csv") + # Check that os.remove was called + mock_remove.assert_any_call(f"tmp/{table_name}.csv") + + # Check that the zipfile was created and files were added + mock_zipfile.assert_called_once_with("tmp/exported_tables.zip", "w", compression=pyzipper.ZIP_DEFLATED) + zipfile_instance = mock_zipfile.return_value.__enter__.return_value + for table_name in table_names: + zipfile_instance.write.assert_any_call(f"tmp/{table_name}.csv", f"{table_name}.csv") + + # Verify logging for added files + for table_name in table_names: + self.logger_mock.info.assert_any_call( + f"Added tmp/{table_name}.csv to zip archive tmp/exported_tables.zip" + ) + + # Verify logging for removed files + for table_name in table_names: + self.logger_mock.info.assert_any_call(f"Removed temporary file tmp/{table_name}.csv") + + @patch("registrar.management.commands.export_tables.getattr") + def test_export_table_handles_missing_resource_class(self, mock_getattr): + """Test that missing resource classes are handled properly in the handle method""" + with less_console_noise(): + mock_getattr.side_effect = AttributeError + + # Import the command to avoid any locale or gettext issues + command_class = import_string("registrar.management.commands.export_tables.Command") + command_instance = command_class() + command_instance.export_table("NonExistentTable") + + self.logger_mock.error.assert_called_with( + "Resource class NonExistentTableResource not found in registrar.admin" + ) + + @patch("registrar.management.commands.export_tables.getattr") + def test_export_table_handles_generic_exception(self, mock_getattr): + """Test that general exceptions in the handle method are handled correctly""" + with less_console_noise(): + mock_resource_class = MagicMock() + mock_resource_class().export.side_effect = Exception("Test Exception") + mock_getattr.return_value = mock_resource_class + + # Import the command to avoid any locale or gettext issues + command_class = import_string("registrar.management.commands.export_tables.Command") + command_instance = command_class() + command_instance.export_table("TestTable") + + self.logger_mock.error.assert_called_with("Failed to export TestTable: Test Exception") + + +class TestImportTables(TestCase): + """Test the import_tables script""" + + @patch("registrar.management.commands.import_tables.os.makedirs") + @patch("registrar.management.commands.import_tables.os.path.exists") + @patch("registrar.management.commands.import_tables.os.remove") + @patch("registrar.management.commands.import_tables.pyzipper.AESZipFile") + @patch("registrar.management.commands.import_tables.tablib.Dataset") + @patch("registrar.management.commands.import_tables.open", create=True) + @patch("registrar.management.commands.import_tables.logger") + @patch("registrar.management.commands.import_tables.getattr") + @patch("django.apps.apps.get_model") + def test_handle( + self, + mock_get_model, + mock_getattr, + mock_logger, + mock_open, + mock_dataset, + mock_zipfile, + mock_remove, + mock_path_exists, + mock_makedirs, + ): + """Test that the handle method properly imports tables""" + with less_console_noise(): + # Mock os.makedirs to do nothing + mock_makedirs.return_value = None + + # Mock os.path.exists to always return True + mock_path_exists.return_value = True + + # Mock the zipfile to have extractall return None + mock_zipfile_instance = mock_zipfile.return_value.__enter__.return_value + mock_zipfile_instance.extractall.return_value = None + + # Mock the CSV file content + csv_content = b"mock_csv_data" + + # Mock the open function to return a mock file + mock_open.return_value.__enter__.return_value.read.return_value = csv_content + + # Mock the Dataset class and its load method to return a dataset + mock_dataset_instance = MagicMock(spec=tablib.Dataset) + with patch( + "registrar.management.commands.import_tables.tablib.Dataset.load", return_value=mock_dataset_instance + ): + # Mock the resource class and its import method + mock_resource_class = MagicMock() + mock_resource_instance = MagicMock() + mock_result = MagicMock() + mock_result.has_errors.return_value = False + mock_resource_instance.import_data.return_value = mock_result + mock_resource_class.return_value = mock_resource_instance + mock_getattr.return_value = mock_resource_class + + # Call the command + call_command("import_tables") + + # Check that os.makedirs was called once to create the tmp directory + mock_makedirs.assert_called_once_with("tmp", exist_ok=True) + + # Check that os.path.exists was called once for the zip file + mock_path_exists.assert_any_call("tmp/exported_tables.zip") + + # Check that pyzipper.AESZipFile was called once to open the zip file + mock_zipfile.assert_called_once_with("tmp/exported_tables.zip", "r") + + # Check that extractall was called once to extract the zip file contents + mock_zipfile_instance.extractall.assert_called_once_with("tmp") + + # Check that the import_table function was called for each table + table_names = [ + "User", + "Contact", + "Domain", + "DomainRequest", + "DomainInformation", + "UserDomainRole", + "DraftDomain", + "Website", + "HostIp", + "Host", + "PublicContact", + ] + # Check that os.path.exists was called for each table + for table_name in table_names: + mock_path_exists.assert_any_call(f"tmp/{table_name}.csv") + + # Check that clean_tables is called for Contact + mock_get_model.assert_any_call("registrar", "Contact") + model_mock = mock_get_model.return_value + model_mock.objects.all().delete.assert_called() + + # Check that logger.info was called for each successful import + for table_name in table_names: + mock_logger.info.assert_any_call(f"Successfully imported tmp/{table_name}.csv into {table_name}") + + # Check that logger.error was not called for resource class not found + mock_logger.error.assert_not_called() + + # Check that os.remove was called for each CSV file + for table_name in table_names: + mock_remove.assert_any_call(f"tmp/{table_name}.csv") + + # Check that logger.info was called for each CSV file removal + for table_name in table_names: + mock_logger.info.assert_any_call(f"Removed temporary file tmp/{table_name}.csv") + + @patch("registrar.management.commands.import_tables.logger") + @patch("registrar.management.commands.import_tables.os.makedirs") + @patch("registrar.management.commands.import_tables.os.path.exists") + def test_handle_zip_file_not_found(self, mock_path_exists, mock_makedirs, mock_logger): + """Test the handle method when the zip file doesn't exist""" + with less_console_noise(): + # Mock os.makedirs to do nothing + mock_makedirs.return_value = None + + # Mock os.path.exists to return False + mock_path_exists.return_value = False + + call_command("import_tables") + + # Check that logger.error was called with the correct message + mock_logger.error.assert_called_once_with("Zip file tmp/exported_tables.zip does not exist.")