wip, hardcoded with lots of debugging, needs to be generalized

This commit is contained in:
David Kennedy 2024-05-02 08:29:38 -04:00
parent 4f2a2f90de
commit 18658d027b
No known key found for this signature in database
GPG key ID: 6528A5386E66B96B

View file

@ -1,17 +1,21 @@
from datetime import date
import logging
import copy
import traceback
import warnings
from django import forms
from django.db.models import Value, CharField, Q
from django.db.models.functions import Concat, Coalesce
from django.db.transaction import TransactionManagementError
from django.http import HttpResponseRedirect
from django.shortcuts import redirect
from django_fsm import get_available_FIELD_transitions
from django_fsm import get_available_FIELD_transitions, FSMField
from django.contrib import admin, messages
from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
from django.contrib.auth.models import Group
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.urls import reverse
from dateutil.relativedelta import relativedelta # type: ignore
from epplibwrapper.errors import ErrorCode, RegistryError
@ -28,7 +32,8 @@ from django.utils.safestring import mark_safe
from django.utils.html import escape
from django.contrib.auth.forms import UserChangeForm, UsernameField
from django_admin_multiple_choice_list_filter.list_filters import MultipleChoiceListFilter
from import_export import resources
from import_export import resources, widgets
from import_export.results import RowResult
from import_export.admin import ImportExportModelAdmin
from django.utils.translation import gettext_lazy as _
@ -40,6 +45,42 @@ class UserResource(resources.ModelResource):
class Meta:
model = models.User
import_id_fields = ('id', 'username',)
def import_data(
self,
dataset,
dry_run=False,
raise_errors=False,
use_transactions=None,
collect_failed_rows=False,
rollback_on_validation_errors=False,
**kwargs
):
logger.info("in import_data")
logger.info(dataset)
return super().import_data(dataset,dry_run,raise_errors,use_transactions,collect_failed_rows,rollback_on_validation_errors,**kwargs)
def before_import(self, dataset, using_transactions, dry_run, **kwargs):
logger.info("in before_import")
def import_row(
self,
row,
instance_loader,
using_transactions=True,
dry_run=False,
raise_errors=None,
**kwargs
):
logger.info("in import_row")
logger.info(row)
return super().import_row(row,instance_loader,using_transactions,dry_run,raise_errors,**kwargs)
def after_import_row(self, row, row_result, **kwargs):
logger.info(row_result.original)
logger.info(row_result.instance)
class MyUserAdminForm(UserChangeForm):
@ -1630,8 +1671,276 @@ class DomainInformationInline(admin.StackedInline):
class DomainResource(resources.ModelResource):
WIDGETS_MAP = {
"ManyToManyField": "get_m2m_widget",
"OneToOneField": "get_fk_widget",
"ForeignKey": "get_fk_widget",
"CharField": widgets.CharWidget,
"DecimalField": widgets.DecimalWidget,
"DateTimeField": widgets.DateTimeWidget,
"DateField": widgets.DateWidget,
"TimeField": widgets.TimeWidget,
"DurationField": widgets.DurationWidget,
"FloatField": widgets.FloatWidget,
"IntegerField": widgets.IntegerWidget,
"PositiveIntegerField": widgets.IntegerWidget,
"BigIntegerField": widgets.IntegerWidget,
"PositiveSmallIntegerField": widgets.IntegerWidget,
"SmallIntegerField": widgets.IntegerWidget,
"SmallAutoField": widgets.IntegerWidget,
"AutoField": widgets.IntegerWidget,
"BigAutoField": widgets.IntegerWidget,
"NullBooleanField": widgets.BooleanWidget,
"BooleanField": widgets.BooleanWidget,
"JSONField": widgets.JSONWidget,
}
class Meta:
model = models.Domain
# exclude = ('state')
def import_row(
self,
row,
instance_loader,
using_transactions=True,
dry_run=False,
raise_errors=None,
**kwargs
):
"""
Imports data from ``tablib.Dataset``. Refer to :doc:`import_workflow`
for a more complete description of the whole import process.
:param row: A ``dict`` of the row to import
:param instance_loader: The instance loader to be used to load the row
:param using_transactions: If ``using_transactions`` is set, a transaction
is being used to wrap the import
:param dry_run: If ``dry_run`` is set, or error occurs, transaction
will be rolled back.
"""
if raise_errors is not None:
warnings.warn(
"raise_errors argument is deprecated and "
"will be removed in a future release.",
category=DeprecationWarning,
)
logger.info("in import_row")
skip_diff = self._meta.skip_diff
row_result = self.get_row_result_class()()
if self._meta.store_row_values:
row_result.row_values = row
original = None
try:
self.before_import_row(row, **kwargs)
logger.info("after before_import_row")
instance, new = self.get_or_init_instance(instance_loader, row)
logger.info("after get_or_init_instance")
logger.info(type(instance))
for f in sorted(instance._meta.fields):
if callable(getattr(f, "get_internal_type", None)):
internal_type = f.get_internal_type()
logger.info(f"{f.name} {internal_type} {type(f)}")
logger.info(f"type == fsmfield: {isinstance(f, FSMField)}")
for attr_name, attr_value in vars(instance).items():
logger.info(f"{attr_name}: {attr_value}")
if isinstance(attr_value, FSMField):
logger.info(f"FSMField: {attr_name}: {attr_value}")
self.after_import_instance(instance, new, **kwargs)
if new:
row_result.import_type = RowResult.IMPORT_TYPE_NEW
else:
row_result.import_type = RowResult.IMPORT_TYPE_UPDATE
row_result.new_record = new
if not skip_diff:
original = copy.deepcopy(instance)
diff = self.get_diff_class()(self, original, new)
if self.for_delete(row, instance):
if new:
row_result.import_type = RowResult.IMPORT_TYPE_SKIP
if not skip_diff:
diff.compare_with(self, None, dry_run)
else:
row_result.import_type = RowResult.IMPORT_TYPE_DELETE
row_result.add_instance_info(instance)
if self._meta.store_instance:
row_result.instance = instance
self.delete_instance(instance, using_transactions, dry_run)
if not skip_diff:
diff.compare_with(self, None, dry_run)
else:
import_validation_errors = {}
try:
logger.info("about to import_obj")
self.import_obj(instance, row, dry_run, **kwargs)
logger.info("got past import_obj")
except ValidationError as e:
# Validation errors from import_obj() are passed on to
# validate_instance(), where they can be combined with model
# instance validation errors if necessary
import_validation_errors = e.update_error_dict(
import_validation_errors
)
if self.skip_row(instance, original, row, import_validation_errors):
row_result.import_type = RowResult.IMPORT_TYPE_SKIP
else:
self.validate_instance(instance, import_validation_errors)
self.save_instance(instance, new, using_transactions, dry_run)
self.save_m2m(instance, row, using_transactions, dry_run)
row_result.add_instance_info(instance)
if self._meta.store_instance:
row_result.instance = instance
if not skip_diff:
diff.compare_with(self, instance, dry_run)
if not new:
row_result.original = original
if not skip_diff and not self._meta.skip_html_diff:
row_result.diff = diff.as_html()
self.after_import_row(row, row_result, **kwargs)
except ValidationError as e:
row_result.import_type = RowResult.IMPORT_TYPE_INVALID
row_result.validation_error = e
except Exception as e:
row_result.import_type = RowResult.IMPORT_TYPE_ERROR
# There is no point logging a transaction error for each row
# when only the original error is likely to be relevant
if not isinstance(e, TransactionManagementError):
logger.debug(e, exc_info=e)
tb_info = traceback.format_exc()
row_result.errors.append(self.get_error_result_class()(e, tb_info, row))
return row_result
# def get_fsm_fields_from_model(self):
# logger.info("in get_fsm_fields_from_model")
# fsm_fields = []
# for f in sorted(self._meta.model._meta.fields):
# # if callable(getattr(f, "get_internal_type", None)):
# # internal_type = f.get_internal_type()
# # logger.info(f"{f.name} {internal_type} {type(f)}")
# # logger.info(f"type == fsmfield: {isinstance(f, FSMField)}")
# if isinstance(f, FSMField):
# fsm_fields.append(f.name)
# return fsm_fields
# def get_fsm_fields_from_row(self, fsm_fields, row=None):
# fields_and_values = []
# for f in fsm_fields:
# if f in row:
# fields_and_values.append((f, row[f]))
# return fields_and_values
# def init_instance(self, row=None):
# logger.info("initializing instance")
# logger.info(f"row: {row}")
# #get fields which are fsm fields
# fsm_fields = self.get_fsm_fields_from_model()
# #then get field values from row and return an array of tuples
# fields_and_values = self.get_fsm_fields_from_row(fsm_fields, row)
# logger.info(fields_and_values)
# #then set those tuples to kwargs
# kwargs = dict(fields_and_values)
# #then pass kwargs to model()
# #return self._meta.model(state='ready')
# return self._meta.model(**kwargs)
# #return super().init_instance(row)
def init_instance(self, row=None):
logger.info("Initializing instance")
logger.info(f"Row: {row}")
# Get fields which are fsm fields
fsm_fields = []
for f in sorted(self._meta.model._meta.fields):
if isinstance(f, FSMField):
if row and f.name in row:
fsm_fields.append((f.name, row[f.name]))
logger.info(f"Fsm fields: {fsm_fields}")
# Convert fields_and_values to kwargs
kwargs = dict(fsm_fields)
# Initialize model instance with kwargs
return self._meta.model(**kwargs)
def get_instance(self, instance_loader, row):
"""
If all 'import_id_fields' are present in the dataset, calls
the :doc:`InstanceLoader <api_instance_loaders>`. Otherwise,
returns `None`.
"""
logger.info("get_instance is called")
import_id_fields = [self.fields[f] for f in self.get_import_id_fields()]
for field in import_id_fields:
if field.column_name not in row:
return
return instance_loader.get_instance(row)
def import_data(
self,
dataset,
dry_run=False,
raise_errors=False,
use_transactions=None,
collect_failed_rows=False,
rollback_on_validation_errors=False,
**kwargs
):
logger.info("in import_data")
logger.info(dataset)
return super().import_data(dataset,dry_run,raise_errors,use_transactions,collect_failed_rows,rollback_on_validation_errors,**kwargs)
def before_import(self, dataset, using_transactions, dry_run, **kwargs):
logger.info("in before_import")
# def import_row(
# self,
# row,
# instance_loader,
# using_transactions=True,
# dry_run=False,
# raise_errors=None,
# **kwargs
# ):
# logger.info("in import_row")
# logger.info(row)
# return super().import_row(row,instance_loader,using_transactions,dry_run,raise_errors,**kwargs)
def after_import_row(self, row, row_result, **kwargs):
logger.info(row_result.original)
logger.info(row_result.instance)
def import_field(self, field, obj, data, is_m2m=False, **kwargs):
logger.info(f"import_field: ${field}")
logger.info(field.attribute)
logger.info(field.widget)
logger.info(type(obj))
logger.info(obj._meta.fields)
is_fsm = False
for f in obj._meta.fields:
logger.info(f.name)
logger.info(type(f))
if field.attribute == f.name and isinstance(f, FSMField):
is_fsm = True
# if field.attribute in sorted(obj._meta.fields):
# logger.info("in fields")
# if not isinstance(obj._meta.fields[field.attribute], FSMField):
if not is_fsm:
logger.info("not fsm field")
#if (field.attribute != 'state'):
super().import_field(field, obj, data, is_m2m, **kwargs)
logger.info("finished importing")
class DomainAdmin(ListHeaderAdmin, ImportExportModelAdmin):