abstracted into FsmModelResource

This commit is contained in:
David Kennedy 2024-05-02 13:17:04 -04:00
parent 18658d027b
commit 011a974a1e
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B

View file

@ -41,6 +41,50 @@ from django.utils.translation import gettext_lazy as _
logger = logging.getLogger(__name__)
class FsmModelResource(resources.ModelResource):
def init_instance(self, row=None):
logger.info("Initializing instance")
logger.info(f"Row: {row}")
# Get fields which are fsm fields
fsm_fields = []
for f in sorted(self._meta.model._meta.fields):
if isinstance(f, FSMField):
if row and f.name in row:
fsm_fields.append((f.name, row[f.name]))
logger.info(f"Fsm fields: {fsm_fields}")
# Convert fields_and_values to kwargs
kwargs = dict(fsm_fields)
# Initialize model instance with kwargs
return self._meta.model(**kwargs)
def import_field(self, field, obj, data, is_m2m=False, **kwargs):
# logger.info(f"import_field: ${field}")
# logger.info(field.attribute)
# logger.info(field.widget)
# logger.info(type(obj))
# logger.info(obj._meta.fields)
is_fsm = False
for f in obj._meta.fields:
# logger.info(f.name)
# logger.info(type(f))
if field.attribute == f.name and isinstance(f, FSMField):
is_fsm = True
# if field.attribute in sorted(obj._meta.fields):
# logger.info("in fields")
# if not isinstance(obj._meta.fields[field.attribute], FSMField):
if not is_fsm:
# logger.info("not fsm field")
#if (field.attribute != 'state'):
super().import_field(field, obj, data, is_m2m, **kwargs)
# logger.info("finished importing")
class UserResource(resources.ModelResource):
class Meta:
@ -1129,7 +1173,7 @@ class DomainInformationAdmin(ListHeaderAdmin, ImportExportModelAdmin):
return readonly_fields # Read-only fields for analysts
class DomainRequestResource(resources.ModelResource):
class DomainRequestResource(FsmModelResource):
class Meta:
model = models.DomainRequest
@ -1669,278 +1713,10 @@ class DomainInformationInline(admin.StackedInline):
return DomainInformationAdmin.get_readonly_fields(self, request, obj=None)
class DomainResource(resources.ModelResource):
WIDGETS_MAP = {
"ManyToManyField": "get_m2m_widget",
"OneToOneField": "get_fk_widget",
"ForeignKey": "get_fk_widget",
"CharField": widgets.CharWidget,
"DecimalField": widgets.DecimalWidget,
"DateTimeField": widgets.DateTimeWidget,
"DateField": widgets.DateWidget,
"TimeField": widgets.TimeWidget,
"DurationField": widgets.DurationWidget,
"FloatField": widgets.FloatWidget,
"IntegerField": widgets.IntegerWidget,
"PositiveIntegerField": widgets.IntegerWidget,
"BigIntegerField": widgets.IntegerWidget,
"PositiveSmallIntegerField": widgets.IntegerWidget,
"SmallIntegerField": widgets.IntegerWidget,
"SmallAutoField": widgets.IntegerWidget,
"AutoField": widgets.IntegerWidget,
"BigAutoField": widgets.IntegerWidget,
"NullBooleanField": widgets.BooleanWidget,
"BooleanField": widgets.BooleanWidget,
"JSONField": widgets.JSONWidget,
}
class DomainResource(FsmModelResource):
class Meta:
model = models.Domain
# exclude = ('state')
def import_row(
self,
row,
instance_loader,
using_transactions=True,
dry_run=False,
raise_errors=None,
**kwargs
):
"""
Imports data from ``tablib.Dataset``. Refer to :doc:`import_workflow`
for a more complete description of the whole import process.
:param row: A ``dict`` of the row to import
:param instance_loader: The instance loader to be used to load the row
:param using_transactions: If ``using_transactions`` is set, a transaction
is being used to wrap the import
:param dry_run: If ``dry_run`` is set, or error occurs, transaction
will be rolled back.
"""
if raise_errors is not None:
warnings.warn(
"raise_errors argument is deprecated and "
"will be removed in a future release.",
category=DeprecationWarning,
)
logger.info("in import_row")
skip_diff = self._meta.skip_diff
row_result = self.get_row_result_class()()
if self._meta.store_row_values:
row_result.row_values = row
original = None
try:
self.before_import_row(row, **kwargs)
logger.info("after before_import_row")
instance, new = self.get_or_init_instance(instance_loader, row)
logger.info("after get_or_init_instance")
logger.info(type(instance))
for f in sorted(instance._meta.fields):
if callable(getattr(f, "get_internal_type", None)):
internal_type = f.get_internal_type()
logger.info(f"{f.name} {internal_type} {type(f)}")
logger.info(f"type == fsmfield: {isinstance(f, FSMField)}")
for attr_name, attr_value in vars(instance).items():
logger.info(f"{attr_name}: {attr_value}")
if isinstance(attr_value, FSMField):
logger.info(f"FSMField: {attr_name}: {attr_value}")
self.after_import_instance(instance, new, **kwargs)
if new:
row_result.import_type = RowResult.IMPORT_TYPE_NEW
else:
row_result.import_type = RowResult.IMPORT_TYPE_UPDATE
row_result.new_record = new
if not skip_diff:
original = copy.deepcopy(instance)
diff = self.get_diff_class()(self, original, new)
if self.for_delete(row, instance):
if new:
row_result.import_type = RowResult.IMPORT_TYPE_SKIP
if not skip_diff:
diff.compare_with(self, None, dry_run)
else:
row_result.import_type = RowResult.IMPORT_TYPE_DELETE
row_result.add_instance_info(instance)
if self._meta.store_instance:
row_result.instance = instance
self.delete_instance(instance, using_transactions, dry_run)
if not skip_diff:
diff.compare_with(self, None, dry_run)
else:
import_validation_errors = {}
try:
logger.info("about to import_obj")
self.import_obj(instance, row, dry_run, **kwargs)
logger.info("got past import_obj")
except ValidationError as e:
# Validation errors from import_obj() are passed on to
# validate_instance(), where they can be combined with model
# instance validation errors if necessary
import_validation_errors = e.update_error_dict(
import_validation_errors
)
if self.skip_row(instance, original, row, import_validation_errors):
row_result.import_type = RowResult.IMPORT_TYPE_SKIP
else:
self.validate_instance(instance, import_validation_errors)
self.save_instance(instance, new, using_transactions, dry_run)
self.save_m2m(instance, row, using_transactions, dry_run)
row_result.add_instance_info(instance)
if self._meta.store_instance:
row_result.instance = instance
if not skip_diff:
diff.compare_with(self, instance, dry_run)
if not new:
row_result.original = original
if not skip_diff and not self._meta.skip_html_diff:
row_result.diff = diff.as_html()
self.after_import_row(row, row_result, **kwargs)
except ValidationError as e:
row_result.import_type = RowResult.IMPORT_TYPE_INVALID
row_result.validation_error = e
except Exception as e:
row_result.import_type = RowResult.IMPORT_TYPE_ERROR
# There is no point logging a transaction error for each row
# when only the original error is likely to be relevant
if not isinstance(e, TransactionManagementError):
logger.debug(e, exc_info=e)
tb_info = traceback.format_exc()
row_result.errors.append(self.get_error_result_class()(e, tb_info, row))
return row_result
# def get_fsm_fields_from_model(self):
# logger.info("in get_fsm_fields_from_model")
# fsm_fields = []
# for f in sorted(self._meta.model._meta.fields):
# # if callable(getattr(f, "get_internal_type", None)):
# # internal_type = f.get_internal_type()
# # logger.info(f"{f.name} {internal_type} {type(f)}")
# # logger.info(f"type == fsmfield: {isinstance(f, FSMField)}")
# if isinstance(f, FSMField):
# fsm_fields.append(f.name)
# return fsm_fields
# def get_fsm_fields_from_row(self, fsm_fields, row=None):
# fields_and_values = []
# for f in fsm_fields:
# if f in row:
# fields_and_values.append((f, row[f]))
# return fields_and_values
# def init_instance(self, row=None):
# logger.info("initializing instance")
# logger.info(f"row: {row}")
# #get fields which are fsm fields
# fsm_fields = self.get_fsm_fields_from_model()
# #then get field values from row and return an array of tuples
# fields_and_values = self.get_fsm_fields_from_row(fsm_fields, row)
# logger.info(fields_and_values)
# #then set those tuples to kwargs
# kwargs = dict(fields_and_values)
# #then pass kwargs to model()
# #return self._meta.model(state='ready')
# return self._meta.model(**kwargs)
# #return super().init_instance(row)
def init_instance(self, row=None):
logger.info("Initializing instance")
logger.info(f"Row: {row}")
# Get fields which are fsm fields
fsm_fields = []
for f in sorted(self._meta.model._meta.fields):
if isinstance(f, FSMField):
if row and f.name in row:
fsm_fields.append((f.name, row[f.name]))
logger.info(f"Fsm fields: {fsm_fields}")
# Convert fields_and_values to kwargs
kwargs = dict(fsm_fields)
# Initialize model instance with kwargs
return self._meta.model(**kwargs)
def get_instance(self, instance_loader, row):
"""
If all 'import_id_fields' are present in the dataset, calls
the :doc:`InstanceLoader <api_instance_loaders>`. Otherwise,
returns `None`.
"""
logger.info("get_instance is called")
import_id_fields = [self.fields[f] for f in self.get_import_id_fields()]
for field in import_id_fields:
if field.column_name not in row:
return
return instance_loader.get_instance(row)
def import_data(
self,
dataset,
dry_run=False,
raise_errors=False,
use_transactions=None,
collect_failed_rows=False,
rollback_on_validation_errors=False,
**kwargs
):
logger.info("in import_data")
logger.info(dataset)
return super().import_data(dataset,dry_run,raise_errors,use_transactions,collect_failed_rows,rollback_on_validation_errors,**kwargs)
def before_import(self, dataset, using_transactions, dry_run, **kwargs):
logger.info("in before_import")
# def import_row(
# self,
# row,
# instance_loader,
# using_transactions=True,
# dry_run=False,
# raise_errors=None,
# **kwargs
# ):
# logger.info("in import_row")
# logger.info(row)
# return super().import_row(row,instance_loader,using_transactions,dry_run,raise_errors,**kwargs)
def after_import_row(self, row, row_result, **kwargs):
logger.info(row_result.original)
logger.info(row_result.instance)
def import_field(self, field, obj, data, is_m2m=False, **kwargs):
logger.info(f"import_field: ${field}")
logger.info(field.attribute)
logger.info(field.widget)
logger.info(type(obj))
logger.info(obj._meta.fields)
is_fsm = False
for f in obj._meta.fields:
logger.info(f.name)
logger.info(type(f))
if field.attribute == f.name and isinstance(f, FSMField):
is_fsm = True
# if field.attribute in sorted(obj._meta.fields):
# logger.info("in fields")
# if not isinstance(obj._meta.fields[field.attribute], FSMField):
if not is_fsm:
logger.info("not fsm field")
#if (field.attribute != 'state'):
super().import_field(field, obj, data, is_m2m, **kwargs)
logger.info("finished importing")
class DomainAdmin(ListHeaderAdmin, ImportExportModelAdmin):