mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-07-04 10:13:30 +02:00
Merge pull request #2309 from cisagov/dk/2205-import-update-epp
Issue #2205 : Import update - handle PublicContacts, Domains and Hosts
This commit is contained in:
commit
fa502151a2
6 changed files with 229 additions and 134 deletions
|
@ -32,9 +32,11 @@ For reference, the zip file will contain the following tables in csv form:
|
|||
* DomainInformation
|
||||
* DomainUserRole
|
||||
* DraftDomain
|
||||
* FederalAgency
|
||||
* Websites
|
||||
* Host
|
||||
* HostIP
|
||||
* PublicContact
|
||||
|
||||
After exporting the file from the target environment, scp the exported_tables.zip
|
||||
file from the target environment to local. Run the below commands from local.
|
||||
|
@ -75,17 +77,25 @@ For reference, this deletes all rows from the following tables:
|
|||
* DomainInformation
|
||||
* DomainRequest
|
||||
* Domain
|
||||
* User (all but the current user)
|
||||
* User
|
||||
* Contact
|
||||
* Websites
|
||||
* DraftDomain
|
||||
* HostIP
|
||||
* Host
|
||||
* PublicContact
|
||||
* FederalAgency
|
||||
|
||||
#### Importing into Target Environment
|
||||
|
||||
Once target environment is prepared, files can be imported.
|
||||
|
||||
If importing tables from stable environment into an OT&E sandbox, there will be a difference
|
||||
between the stable's registry and the sandbox's registry. Therefore, you need to run import_tables
|
||||
with --skipEppSave option set to False. If you set to False, it will attempt to save PublicContact
|
||||
records to the registry on load. If this is unset, or set to True, it will load the database and not
|
||||
attempt to update the registry on load.
|
||||
|
||||
To scp the exported_tables.zip file from local to the sandbox, run the following:
|
||||
|
||||
Get passcode by running:
|
||||
|
@ -107,7 +117,7 @@ cf ssh {target-app}
|
|||
example cleaning getgov-backup:
|
||||
cf ssh getgov-backup
|
||||
/tmp/lifecycle/backup
|
||||
./manage.py import_tables
|
||||
./manage.py import_tables --no-skipEppSave
|
||||
|
||||
For reference, this imports tables in the following order:
|
||||
|
||||
|
@ -118,9 +128,11 @@ For reference, this imports tables in the following order:
|
|||
* HostIP
|
||||
* DraftDomain
|
||||
* Websites
|
||||
* FederalAgency
|
||||
* DomainRequest
|
||||
* DomainInformation
|
||||
* UserDomainRole
|
||||
* PublicContact
|
||||
|
||||
Optional step:
|
||||
* Run fixtures to load fixture users back in
|
|
@ -2478,16 +2478,35 @@ class PublicContactResource(resources.ModelResource):
|
|||
|
||||
class Meta:
|
||||
model = models.PublicContact
|
||||
# may want to consider these bulk options in future, so left in as comments
|
||||
# use_bulk = True
|
||||
# batch_size = 1000
|
||||
# force_init_instance = True
|
||||
|
||||
def import_row(self, row, instance_loader, using_transactions=True, dry_run=False, raise_errors=None, **kwargs):
|
||||
"""Override kwargs skip_epp_save and set to True"""
|
||||
kwargs["skip_epp_save"] = True
|
||||
return super().import_row(
|
||||
row,
|
||||
instance_loader,
|
||||
using_transactions=using_transactions,
|
||||
dry_run=dry_run,
|
||||
raise_errors=raise_errors,
|
||||
def __init__(self):
|
||||
"""Sets global variables for code tidyness"""
|
||||
super().__init__()
|
||||
self.skip_epp_save = False
|
||||
|
||||
def import_data(
|
||||
self,
|
||||
dataset,
|
||||
dry_run=False,
|
||||
raise_errors=False,
|
||||
use_transactions=None,
|
||||
collect_failed_rows=False,
|
||||
rollback_on_validation_errors=False,
|
||||
**kwargs,
|
||||
):
|
||||
"""Override import_data to set self.skip_epp_save if in kwargs"""
|
||||
self.skip_epp_save = kwargs.get("skip_epp_save", False)
|
||||
return super().import_data(
|
||||
dataset,
|
||||
dry_run,
|
||||
raise_errors,
|
||||
use_transactions,
|
||||
collect_failed_rows,
|
||||
rollback_on_validation_errors,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
@ -2503,7 +2522,7 @@ class PublicContactResource(resources.ModelResource):
|
|||
# we don't have transactions and we want to do a dry_run
|
||||
pass
|
||||
else:
|
||||
instance.save(skip_epp_save=True)
|
||||
instance.save(skip_epp_save=self.skip_epp_save)
|
||||
self.after_save_instance(instance, using_transactions, dry_run)
|
||||
|
||||
|
||||
|
@ -2580,11 +2599,20 @@ class PortfolioAdmin(ListHeaderAdmin):
|
|||
super().save_model(request, obj, form, change)
|
||||
|
||||
|
||||
class FederalAgencyAdmin(ListHeaderAdmin):
|
||||
class FederalAgencyResource(resources.ModelResource):
|
||||
"""defines how each field in the referenced model should be mapped to the corresponding fields in the
|
||||
import/export file"""
|
||||
|
||||
class Meta:
|
||||
model = models.FederalAgency
|
||||
|
||||
|
||||
class FederalAgencyAdmin(ListHeaderAdmin, ImportExportModelAdmin):
|
||||
list_display = ["agency"]
|
||||
search_fields = ["agency"]
|
||||
search_help_text = "Search by agency name."
|
||||
ordering = ["agency"]
|
||||
resource_classes = [FederalAgencyResource]
|
||||
|
||||
|
||||
class UserGroupAdmin(AuditedAdmin):
|
||||
|
|
|
@ -28,6 +28,7 @@ class Command(BaseCommand):
|
|||
* DomainInformation
|
||||
* DomainRequest
|
||||
* DraftDomain
|
||||
* FederalAgency
|
||||
* Host
|
||||
* HostIp
|
||||
* PublicContact
|
||||
|
@ -40,14 +41,15 @@ class Command(BaseCommand):
|
|||
table_names = [
|
||||
"DomainInformation",
|
||||
"DomainRequest",
|
||||
"FederalAgency",
|
||||
"PublicContact",
|
||||
"HostIp",
|
||||
"Host",
|
||||
"Domain",
|
||||
"User",
|
||||
"Contact",
|
||||
"Website",
|
||||
"DraftDomain",
|
||||
"HostIp",
|
||||
"Host",
|
||||
]
|
||||
|
||||
for table_name in table_names:
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
from django.core.paginator import Paginator
|
||||
import logging
|
||||
import os
|
||||
import pyzipper
|
||||
import tablib
|
||||
from django.core.management import BaseCommand
|
||||
import registrar.admin
|
||||
|
||||
|
@ -18,6 +20,7 @@ class Command(BaseCommand):
|
|||
"Domain",
|
||||
"DomainRequest",
|
||||
"DomainInformation",
|
||||
"FederalAgency",
|
||||
"UserDomainRole",
|
||||
"DraftDomain",
|
||||
"Website",
|
||||
|
@ -36,28 +39,58 @@ class Command(BaseCommand):
|
|||
zip_filename = "tmp/exported_tables.zip"
|
||||
with pyzipper.AESZipFile(zip_filename, "w", compression=pyzipper.ZIP_DEFLATED) as zipf:
|
||||
for table_name in table_names:
|
||||
csv_filename = f"tmp/{table_name}.csv"
|
||||
if os.path.exists(csv_filename):
|
||||
zipf.write(csv_filename, os.path.basename(csv_filename))
|
||||
logger.info(f"Added {csv_filename} to zip archive {zip_filename}")
|
||||
|
||||
# Remove the CSV files after adding them to the zip file
|
||||
for table_name in table_names:
|
||||
csv_filename = f"tmp/{table_name}.csv"
|
||||
if os.path.exists(csv_filename):
|
||||
os.remove(csv_filename)
|
||||
logger.info(f"Removed temporary file {csv_filename}")
|
||||
# Define the tmp directory and the file pattern
|
||||
tmp_dir = "tmp"
|
||||
pattern = f"{table_name}_"
|
||||
zip_file_path = os.path.join(tmp_dir, "exported_files.zip")
|
||||
|
||||
# Find all files that match the pattern
|
||||
matching_files = [file for file in os.listdir(tmp_dir) if file.startswith(pattern)]
|
||||
for file_path in matching_files:
|
||||
# Add each file to the zip archive
|
||||
zipf.write(f"tmp/{file_path}", os.path.basename(file_path))
|
||||
logger.info(f"Added {file_path} to {zip_file_path}")
|
||||
|
||||
# Remove the file after adding to zip
|
||||
os.remove(f"tmp/{file_path}")
|
||||
logger.info(f"Removed {file_path}")
|
||||
|
||||
def export_table(self, table_name):
|
||||
"""Export a given table to a csv file in the tmp directory"""
|
||||
"""Export a given table to csv files in the tmp directory"""
|
||||
resourcename = f"{table_name}Resource"
|
||||
try:
|
||||
resourceclass = getattr(registrar.admin, resourcename)
|
||||
dataset = resourceclass().export()
|
||||
filename = f"tmp/{table_name}.csv"
|
||||
with open(filename, "w") as outputfile:
|
||||
outputfile.write(dataset.csv)
|
||||
logger.info(f"Successfully exported {table_name} to {filename}")
|
||||
if not isinstance(dataset, tablib.Dataset):
|
||||
raise ValueError(f"Exported data from {resourcename} is not a tablib.Dataset")
|
||||
|
||||
# Determine the number of rows per file
|
||||
rows_per_file = 10000
|
||||
|
||||
# Use Paginator to handle splitting the dataset
|
||||
paginator = Paginator(dataset.dict, rows_per_file)
|
||||
num_files = paginator.num_pages
|
||||
|
||||
logger.info(f"splitting {table_name} into {num_files} files")
|
||||
|
||||
# Export each page to a separate file
|
||||
for page_num in paginator.page_range:
|
||||
page = paginator.page(page_num)
|
||||
|
||||
# Create a new dataset for the chunk
|
||||
chunk = tablib.Dataset(headers=dataset.headers)
|
||||
for row_dict in page.object_list:
|
||||
row = [row_dict[header] for header in dataset.headers]
|
||||
chunk.append(row)
|
||||
|
||||
# Export the chunk to a new file
|
||||
filename = f"tmp/{table_name}_{page_num}.csv"
|
||||
with open(filename, "w") as f:
|
||||
f.write(chunk.export("csv"))
|
||||
|
||||
logger.info(f"Successfully exported {table_name} into {num_files} files.")
|
||||
|
||||
except AttributeError:
|
||||
logger.error(f"Resource class {resourcename} not found in registrar.admin")
|
||||
except Exception as e:
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import pyzipper
|
||||
|
@ -14,6 +15,10 @@ logger = logging.getLogger(__name__)
|
|||
class Command(BaseCommand):
|
||||
help = "Imports tables from a zip file, exported_tables.zip, containing CSV files in the tmp directory."
|
||||
|
||||
def add_arguments(self, parser):
|
||||
"""Add command line arguments."""
|
||||
parser.add_argument("--skipEppSave", default=True, action=argparse.BooleanOptionalAction)
|
||||
|
||||
def handle(self, **options):
|
||||
"""Extracts CSV files from a zip archive and imports them into the respective tables"""
|
||||
|
||||
|
@ -21,6 +26,8 @@ class Command(BaseCommand):
|
|||
logger.error("import_tables cannot be run in production")
|
||||
return
|
||||
|
||||
self.skip_epp_save = options.get("skipEppSave")
|
||||
|
||||
table_names = [
|
||||
"User",
|
||||
"Contact",
|
||||
|
@ -29,6 +36,7 @@ class Command(BaseCommand):
|
|||
"HostIp",
|
||||
"DraftDomain",
|
||||
"Website",
|
||||
"FederalAgency",
|
||||
"DomainRequest",
|
||||
"DomainInformation",
|
||||
"UserDomainRole",
|
||||
|
@ -56,38 +64,46 @@ class Command(BaseCommand):
|
|||
"""Import data from a CSV file into the given table"""
|
||||
|
||||
resourcename = f"{table_name}Resource"
|
||||
csv_filename = f"tmp/{table_name}.csv"
|
||||
try:
|
||||
if not os.path.exists(csv_filename):
|
||||
logger.error(f"CSV file {csv_filename} not found.")
|
||||
return
|
||||
|
||||
# if table_name is Contact, clean the table first
|
||||
# User table is loaded before Contact, and signals create
|
||||
# rows in Contact table which break the import, so need
|
||||
# to be cleaned again before running import on Contact table
|
||||
if table_name == "Contact":
|
||||
self.clean_table(table_name)
|
||||
# if table_name is Contact, clean the table first
|
||||
# User table is loaded before Contact, and signals create
|
||||
# rows in Contact table which break the import, so need
|
||||
# to be cleaned again before running import on Contact table
|
||||
if table_name == "Contact":
|
||||
self.clean_table(table_name)
|
||||
|
||||
resourceclass = getattr(registrar.admin, resourcename)
|
||||
resource_instance = resourceclass()
|
||||
with open(csv_filename, "r") as csvfile:
|
||||
dataset = tablib.Dataset().load(csvfile.read(), format="csv")
|
||||
result = resource_instance.import_data(dataset, dry_run=False, skip_epp_save=True)
|
||||
# Define the directory and the pattern for csv filenames
|
||||
tmp_dir = "tmp"
|
||||
pattern = f"{table_name}_"
|
||||
|
||||
if result.has_errors():
|
||||
logger.error(f"Errors occurred while importing {csv_filename}: {result.row_errors()}")
|
||||
else:
|
||||
logger.info(f"Successfully imported {csv_filename} into {table_name}")
|
||||
resourceclass = getattr(registrar.admin, resourcename)
|
||||
resource_instance = resourceclass()
|
||||
|
||||
except AttributeError:
|
||||
logger.error(f"Resource class {resourcename} not found in registrar.admin")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to import {csv_filename}: {e}")
|
||||
finally:
|
||||
if os.path.exists(csv_filename):
|
||||
os.remove(csv_filename)
|
||||
logger.info(f"Removed temporary file {csv_filename}")
|
||||
# Find all files that match the pattern
|
||||
matching_files = [file for file in os.listdir(tmp_dir) if file.startswith(pattern)]
|
||||
for csv_filename in matching_files:
|
||||
try:
|
||||
with open(f"tmp/{csv_filename}", "r") as csvfile:
|
||||
dataset = tablib.Dataset().load(csvfile.read(), format="csv")
|
||||
result = resource_instance.import_data(dataset, dry_run=False, skip_epp_save=self.skip_epp_save)
|
||||
if result.has_errors():
|
||||
logger.error(f"Errors occurred while importing {csv_filename}:")
|
||||
for row_error in result.row_errors():
|
||||
row_index = row_error[0]
|
||||
errors = row_error[1]
|
||||
for error in errors:
|
||||
logger.error(f"Row {row_index} - {error.error} - {error.row}")
|
||||
else:
|
||||
logger.info(f"Successfully imported {csv_filename} into {table_name}")
|
||||
|
||||
except AttributeError:
|
||||
logger.error(f"Resource class {resourcename} not found in registrar.admin")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to import {csv_filename}: {e}")
|
||||
finally:
|
||||
if os.path.exists(csv_filename):
|
||||
os.remove(csv_filename)
|
||||
logger.info(f"Removed temporary file {csv_filename}")
|
||||
|
||||
def clean_table(self, table_name):
|
||||
"""Delete all rows in the given table"""
|
||||
|
|
|
@ -7,6 +7,7 @@ from django.utils.module_loading import import_string
|
|||
import logging
|
||||
import pyzipper
|
||||
from registrar.management.commands.clean_tables import Command as CleanTablesCommand
|
||||
from registrar.management.commands.export_tables import Command as ExportTablesCommand
|
||||
from registrar.models import (
|
||||
User,
|
||||
Domain,
|
||||
|
@ -873,84 +874,81 @@ class TestExportTables(MockEppLib):
|
|||
"""Test the export_tables script"""
|
||||
|
||||
def setUp(self):
|
||||
self.command = ExportTablesCommand()
|
||||
self.logger_patcher = patch("registrar.management.commands.export_tables.logger")
|
||||
self.logger_mock = self.logger_patcher.start()
|
||||
|
||||
def tearDown(self):
|
||||
self.logger_patcher.stop()
|
||||
|
||||
@patch("registrar.management.commands.export_tables.os.makedirs")
|
||||
@patch("registrar.management.commands.export_tables.os.path.exists")
|
||||
@patch("registrar.management.commands.export_tables.os.remove")
|
||||
@patch("registrar.management.commands.export_tables.pyzipper.AESZipFile")
|
||||
@patch("os.makedirs")
|
||||
@patch("os.path.exists")
|
||||
@patch("os.remove")
|
||||
@patch("pyzipper.AESZipFile")
|
||||
@patch("registrar.management.commands.export_tables.getattr")
|
||||
@patch("builtins.open", new_callable=mock_open, read_data=b"mock_csv_data")
|
||||
@patch("django.utils.translation.trans_real._translations", {})
|
||||
@patch("django.utils.translation.trans_real.translation")
|
||||
@patch("builtins.open", new_callable=mock_open)
|
||||
@patch("os.listdir")
|
||||
def test_handle(
|
||||
self, mock_translation, mock_file, mock_getattr, mock_zipfile, mock_remove, mock_path_exists, mock_makedirs
|
||||
self, mock_listdir, mock_open, mock_getattr, mock_zipfile, mock_remove, mock_path_exists, mock_makedirs
|
||||
):
|
||||
"""test that the handle method properly exports tables"""
|
||||
with less_console_noise():
|
||||
# Mock os.makedirs to do nothing
|
||||
mock_makedirs.return_value = None
|
||||
# Mock os.makedirs to do nothing
|
||||
mock_makedirs.return_value = None
|
||||
|
||||
# Mock os.path.exists to always return True
|
||||
mock_path_exists.return_value = True
|
||||
# Mock os.path.exists to always return True
|
||||
mock_path_exists.return_value = True
|
||||
|
||||
# Mock the resource class and its export method
|
||||
mock_resource_class = MagicMock()
|
||||
mock_dataset = MagicMock()
|
||||
mock_dataset.csv = b"mock_csv_data"
|
||||
mock_resource_class().export.return_value = mock_dataset
|
||||
mock_getattr.return_value = mock_resource_class
|
||||
# Check that the export_table function was called for each table
|
||||
table_names = [
|
||||
"User",
|
||||
"Contact",
|
||||
"Domain",
|
||||
"DomainRequest",
|
||||
"DomainInformation",
|
||||
"FederalAgency",
|
||||
"UserDomainRole",
|
||||
"DraftDomain",
|
||||
"Website",
|
||||
"HostIp",
|
||||
"Host",
|
||||
"PublicContact",
|
||||
]
|
||||
|
||||
# Mock translation function to return a dummy translation object
|
||||
mock_translation.return_value = MagicMock()
|
||||
# Mock directory listing
|
||||
mock_listdir.side_effect = lambda path: [f"{table}_1.csv" for table in table_names]
|
||||
|
||||
call_command("export_tables")
|
||||
# Mock the resource class and its export method
|
||||
mock_dataset = tablib.Dataset()
|
||||
mock_dataset.headers = ["header1", "header2"]
|
||||
mock_dataset.append(["row1_col1", "row1_col2"])
|
||||
mock_resource_class = MagicMock()
|
||||
mock_resource_class().export.return_value = mock_dataset
|
||||
mock_getattr.return_value = mock_resource_class
|
||||
|
||||
# Check that os.makedirs was called once to create the tmp directory
|
||||
mock_makedirs.assert_called_once_with("tmp", exist_ok=True)
|
||||
command_instance = ExportTablesCommand()
|
||||
command_instance.handle()
|
||||
|
||||
# Check that the export_table function was called for each table
|
||||
table_names = [
|
||||
"User",
|
||||
"Contact",
|
||||
"Domain",
|
||||
"DomainRequest",
|
||||
"DomainInformation",
|
||||
"UserDomainRole",
|
||||
"DraftDomain",
|
||||
"Website",
|
||||
"HostIp",
|
||||
"Host",
|
||||
"PublicContact",
|
||||
]
|
||||
# Check that os.makedirs was called once to create the tmp directory
|
||||
mock_makedirs.assert_called_once_with("tmp", exist_ok=True)
|
||||
|
||||
# Check that the CSV file was written
|
||||
for table_name in table_names:
|
||||
mock_file().write.assert_any_call(b"mock_csv_data")
|
||||
# Check that os.path.exists was called
|
||||
mock_path_exists.assert_any_call(f"tmp/{table_name}.csv")
|
||||
# Check that os.remove was called
|
||||
mock_remove.assert_any_call(f"tmp/{table_name}.csv")
|
||||
# Check that the CSV file was written
|
||||
for table_name in table_names:
|
||||
# Check that os.remove was called
|
||||
mock_remove.assert_any_call(f"tmp/{table_name}_1.csv")
|
||||
|
||||
# Check that the zipfile was created and files were added
|
||||
mock_zipfile.assert_called_once_with("tmp/exported_tables.zip", "w", compression=pyzipper.ZIP_DEFLATED)
|
||||
zipfile_instance = mock_zipfile.return_value.__enter__.return_value
|
||||
for table_name in table_names:
|
||||
zipfile_instance.write.assert_any_call(f"tmp/{table_name}.csv", f"{table_name}.csv")
|
||||
# Check that the zipfile was created and files were added
|
||||
mock_zipfile.assert_called_once_with("tmp/exported_tables.zip", "w", compression=pyzipper.ZIP_DEFLATED)
|
||||
zipfile_instance = mock_zipfile.return_value.__enter__.return_value
|
||||
for table_name in table_names:
|
||||
zipfile_instance.write.assert_any_call(f"tmp/{table_name}_1.csv", f"{table_name}_1.csv")
|
||||
|
||||
# Verify logging for added files
|
||||
for table_name in table_names:
|
||||
self.logger_mock.info.assert_any_call(
|
||||
f"Added tmp/{table_name}.csv to zip archive tmp/exported_tables.zip"
|
||||
)
|
||||
# Verify logging for added files
|
||||
for table_name in table_names:
|
||||
self.logger_mock.info.assert_any_call(f"Added {table_name}_1.csv to tmp/exported_files.zip")
|
||||
|
||||
# Verify logging for removed files
|
||||
for table_name in table_names:
|
||||
self.logger_mock.info.assert_any_call(f"Removed temporary file tmp/{table_name}.csv")
|
||||
# Verify logging for removed files
|
||||
for table_name in table_names:
|
||||
self.logger_mock.info.assert_any_call(f"Removed {table_name}_1.csv")
|
||||
|
||||
@patch("registrar.management.commands.export_tables.getattr")
|
||||
def test_export_table_handles_missing_resource_class(self, mock_getattr):
|
||||
|
@ -995,8 +993,10 @@ class TestImportTables(TestCase):
|
|||
@patch("registrar.management.commands.import_tables.logger")
|
||||
@patch("registrar.management.commands.import_tables.getattr")
|
||||
@patch("django.apps.apps.get_model")
|
||||
@patch("os.listdir")
|
||||
def test_handle(
|
||||
self,
|
||||
mock_listdir,
|
||||
mock_get_model,
|
||||
mock_getattr,
|
||||
mock_logger,
|
||||
|
@ -1019,6 +1019,24 @@ class TestImportTables(TestCase):
|
|||
mock_zipfile_instance = mock_zipfile.return_value.__enter__.return_value
|
||||
mock_zipfile_instance.extractall.return_value = None
|
||||
|
||||
# Check that the import_table function was called for each table
|
||||
table_names = [
|
||||
"User",
|
||||
"Contact",
|
||||
"Domain",
|
||||
"DomainRequest",
|
||||
"DomainInformation",
|
||||
"UserDomainRole",
|
||||
"DraftDomain",
|
||||
"Website",
|
||||
"HostIp",
|
||||
"Host",
|
||||
"PublicContact",
|
||||
]
|
||||
|
||||
# Mock directory listing
|
||||
mock_listdir.side_effect = lambda path: [f"{table}_1.csv" for table in table_names]
|
||||
|
||||
# Mock the CSV file content
|
||||
csv_content = b"mock_csv_data"
|
||||
|
||||
|
@ -1054,23 +1072,9 @@ class TestImportTables(TestCase):
|
|||
# Check that extractall was called once to extract the zip file contents
|
||||
mock_zipfile_instance.extractall.assert_called_once_with("tmp")
|
||||
|
||||
# Check that the import_table function was called for each table
|
||||
table_names = [
|
||||
"User",
|
||||
"Contact",
|
||||
"Domain",
|
||||
"DomainRequest",
|
||||
"DomainInformation",
|
||||
"UserDomainRole",
|
||||
"DraftDomain",
|
||||
"Website",
|
||||
"HostIp",
|
||||
"Host",
|
||||
"PublicContact",
|
||||
]
|
||||
# Check that os.path.exists was called for each table
|
||||
for table_name in table_names:
|
||||
mock_path_exists.assert_any_call(f"tmp/{table_name}.csv")
|
||||
mock_path_exists.assert_any_call(f"{table_name}_1.csv")
|
||||
|
||||
# Check that clean_tables is called for Contact
|
||||
mock_get_model.assert_any_call("registrar", "Contact")
|
||||
|
@ -1079,18 +1083,18 @@ class TestImportTables(TestCase):
|
|||
|
||||
# Check that logger.info was called for each successful import
|
||||
for table_name in table_names:
|
||||
mock_logger.info.assert_any_call(f"Successfully imported tmp/{table_name}.csv into {table_name}")
|
||||
mock_logger.info.assert_any_call(f"Successfully imported {table_name}_1.csv into {table_name}")
|
||||
|
||||
# Check that logger.error was not called for resource class not found
|
||||
mock_logger.error.assert_not_called()
|
||||
|
||||
# Check that os.remove was called for each CSV file
|
||||
for table_name in table_names:
|
||||
mock_remove.assert_any_call(f"tmp/{table_name}.csv")
|
||||
mock_remove.assert_any_call(f"{table_name}_1.csv")
|
||||
|
||||
# Check that logger.info was called for each CSV file removal
|
||||
for table_name in table_names:
|
||||
mock_logger.info.assert_any_call(f"Removed temporary file tmp/{table_name}.csv")
|
||||
mock_logger.info.assert_any_call(f"Removed temporary file {table_name}_1.csv")
|
||||
|
||||
@patch("registrar.management.commands.import_tables.logger")
|
||||
@patch("registrar.management.commands.import_tables.os.makedirs")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue