diff --git a/src/registrar/admin.py b/src/registrar/admin.py index 3219be625..1be5131e1 100644 --- a/src/registrar/admin.py +++ b/src/registrar/admin.py @@ -41,6 +41,50 @@ from django.utils.translation import gettext_lazy as _ logger = logging.getLogger(__name__) +class FsmModelResource(resources.ModelResource): + + def init_instance(self, row=None): + logger.info("Initializing instance") + logger.info(f"Row: {row}") + + # Get fields which are fsm fields + fsm_fields = [] + + for f in sorted(self._meta.model._meta.fields): + if isinstance(f, FSMField): + if row and f.name in row: + fsm_fields.append((f.name, row[f.name])) + + logger.info(f"Fsm fields: {fsm_fields}") + + # Convert fields_and_values to kwargs + kwargs = dict(fsm_fields) + + # Initialize model instance with kwargs + return self._meta.model(**kwargs) + + def import_field(self, field, obj, data, is_m2m=False, **kwargs): + # logger.info(f"import_field: ${field}") + # logger.info(field.attribute) + # logger.info(field.widget) + # logger.info(type(obj)) + # logger.info(obj._meta.fields) + is_fsm = False + for f in obj._meta.fields: + # logger.info(f.name) + # logger.info(type(f)) + if field.attribute == f.name and isinstance(f, FSMField): + is_fsm = True + # if field.attribute in sorted(obj._meta.fields): + # logger.info("in fields") + # if not isinstance(obj._meta.fields[field.attribute], FSMField): + if not is_fsm: + # logger.info("not fsm field") + #if (field.attribute != 'state'): + super().import_field(field, obj, data, is_m2m, **kwargs) + # logger.info("finished importing") + + class UserResource(resources.ModelResource): class Meta: @@ -1129,7 +1173,7 @@ class DomainInformationAdmin(ListHeaderAdmin, ImportExportModelAdmin): return readonly_fields # Read-only fields for analysts -class DomainRequestResource(resources.ModelResource): +class DomainRequestResource(FsmModelResource): class Meta: model = models.DomainRequest @@ -1669,278 +1713,10 @@ class DomainInformationInline(admin.StackedInline): return DomainInformationAdmin.get_readonly_fields(self, request, obj=None) -class DomainResource(resources.ModelResource): - - WIDGETS_MAP = { - "ManyToManyField": "get_m2m_widget", - "OneToOneField": "get_fk_widget", - "ForeignKey": "get_fk_widget", - "CharField": widgets.CharWidget, - "DecimalField": widgets.DecimalWidget, - "DateTimeField": widgets.DateTimeWidget, - "DateField": widgets.DateWidget, - "TimeField": widgets.TimeWidget, - "DurationField": widgets.DurationWidget, - "FloatField": widgets.FloatWidget, - "IntegerField": widgets.IntegerWidget, - "PositiveIntegerField": widgets.IntegerWidget, - "BigIntegerField": widgets.IntegerWidget, - "PositiveSmallIntegerField": widgets.IntegerWidget, - "SmallIntegerField": widgets.IntegerWidget, - "SmallAutoField": widgets.IntegerWidget, - "AutoField": widgets.IntegerWidget, - "BigAutoField": widgets.IntegerWidget, - "NullBooleanField": widgets.BooleanWidget, - "BooleanField": widgets.BooleanWidget, - "JSONField": widgets.JSONWidget, - } +class DomainResource(FsmModelResource): class Meta: model = models.Domain - # exclude = ('state') - - - def import_row( - self, - row, - instance_loader, - using_transactions=True, - dry_run=False, - raise_errors=None, - **kwargs - ): - """ - Imports data from ``tablib.Dataset``. Refer to :doc:`import_workflow` - for a more complete description of the whole import process. - - :param row: A ``dict`` of the row to import - - :param instance_loader: The instance loader to be used to load the row - - :param using_transactions: If ``using_transactions`` is set, a transaction - is being used to wrap the import - - :param dry_run: If ``dry_run`` is set, or error occurs, transaction - will be rolled back. - """ - if raise_errors is not None: - warnings.warn( - "raise_errors argument is deprecated and " - "will be removed in a future release.", - category=DeprecationWarning, - ) - - logger.info("in import_row") - skip_diff = self._meta.skip_diff - row_result = self.get_row_result_class()() - if self._meta.store_row_values: - row_result.row_values = row - original = None - try: - self.before_import_row(row, **kwargs) - logger.info("after before_import_row") - instance, new = self.get_or_init_instance(instance_loader, row) - logger.info("after get_or_init_instance") - logger.info(type(instance)) - for f in sorted(instance._meta.fields): - if callable(getattr(f, "get_internal_type", None)): - internal_type = f.get_internal_type() - logger.info(f"{f.name} {internal_type} {type(f)}") - logger.info(f"type == fsmfield: {isinstance(f, FSMField)}") - for attr_name, attr_value in vars(instance).items(): - logger.info(f"{attr_name}: {attr_value}") - if isinstance(attr_value, FSMField): - logger.info(f"FSMField: {attr_name}: {attr_value}") - self.after_import_instance(instance, new, **kwargs) - if new: - row_result.import_type = RowResult.IMPORT_TYPE_NEW - else: - row_result.import_type = RowResult.IMPORT_TYPE_UPDATE - row_result.new_record = new - if not skip_diff: - original = copy.deepcopy(instance) - diff = self.get_diff_class()(self, original, new) - if self.for_delete(row, instance): - if new: - row_result.import_type = RowResult.IMPORT_TYPE_SKIP - if not skip_diff: - diff.compare_with(self, None, dry_run) - else: - row_result.import_type = RowResult.IMPORT_TYPE_DELETE - row_result.add_instance_info(instance) - if self._meta.store_instance: - row_result.instance = instance - self.delete_instance(instance, using_transactions, dry_run) - if not skip_diff: - diff.compare_with(self, None, dry_run) - else: - import_validation_errors = {} - try: - logger.info("about to import_obj") - self.import_obj(instance, row, dry_run, **kwargs) - logger.info("got past import_obj") - except ValidationError as e: - # Validation errors from import_obj() are passed on to - # validate_instance(), where they can be combined with model - # instance validation errors if necessary - import_validation_errors = e.update_error_dict( - import_validation_errors - ) - - if self.skip_row(instance, original, row, import_validation_errors): - row_result.import_type = RowResult.IMPORT_TYPE_SKIP - else: - self.validate_instance(instance, import_validation_errors) - self.save_instance(instance, new, using_transactions, dry_run) - self.save_m2m(instance, row, using_transactions, dry_run) - row_result.add_instance_info(instance) - if self._meta.store_instance: - row_result.instance = instance - if not skip_diff: - diff.compare_with(self, instance, dry_run) - if not new: - row_result.original = original - - if not skip_diff and not self._meta.skip_html_diff: - row_result.diff = diff.as_html() - self.after_import_row(row, row_result, **kwargs) - - except ValidationError as e: - row_result.import_type = RowResult.IMPORT_TYPE_INVALID - row_result.validation_error = e - except Exception as e: - row_result.import_type = RowResult.IMPORT_TYPE_ERROR - # There is no point logging a transaction error for each row - # when only the original error is likely to be relevant - if not isinstance(e, TransactionManagementError): - logger.debug(e, exc_info=e) - tb_info = traceback.format_exc() - row_result.errors.append(self.get_error_result_class()(e, tb_info, row)) - - return row_result - - # def get_fsm_fields_from_model(self): - # logger.info("in get_fsm_fields_from_model") - # fsm_fields = [] - # for f in sorted(self._meta.model._meta.fields): - # # if callable(getattr(f, "get_internal_type", None)): - # # internal_type = f.get_internal_type() - # # logger.info(f"{f.name} {internal_type} {type(f)}") - # # logger.info(f"type == fsmfield: {isinstance(f, FSMField)}") - # if isinstance(f, FSMField): - # fsm_fields.append(f.name) - # return fsm_fields - - # def get_fsm_fields_from_row(self, fsm_fields, row=None): - # fields_and_values = [] - # for f in fsm_fields: - # if f in row: - # fields_and_values.append((f, row[f])) - # return fields_and_values - - # def init_instance(self, row=None): - # logger.info("initializing instance") - # logger.info(f"row: {row}") - # #get fields which are fsm fields - # fsm_fields = self.get_fsm_fields_from_model() - # #then get field values from row and return an array of tuples - # fields_and_values = self.get_fsm_fields_from_row(fsm_fields, row) - # logger.info(fields_and_values) - # #then set those tuples to kwargs - # kwargs = dict(fields_and_values) - # #then pass kwargs to model() - # #return self._meta.model(state='ready') - # return self._meta.model(**kwargs) - # #return super().init_instance(row) - - def init_instance(self, row=None): - logger.info("Initializing instance") - logger.info(f"Row: {row}") - - # Get fields which are fsm fields - fsm_fields = [] - - for f in sorted(self._meta.model._meta.fields): - if isinstance(f, FSMField): - if row and f.name in row: - fsm_fields.append((f.name, row[f.name])) - - logger.info(f"Fsm fields: {fsm_fields}") - - # Convert fields_and_values to kwargs - kwargs = dict(fsm_fields) - - # Initialize model instance with kwargs - return self._meta.model(**kwargs) - - def get_instance(self, instance_loader, row): - """ - If all 'import_id_fields' are present in the dataset, calls - the :doc:`InstanceLoader `. Otherwise, - returns `None`. - """ - logger.info("get_instance is called") - import_id_fields = [self.fields[f] for f in self.get_import_id_fields()] - for field in import_id_fields: - if field.column_name not in row: - return - return instance_loader.get_instance(row) - - def import_data( - self, - dataset, - dry_run=False, - raise_errors=False, - use_transactions=None, - collect_failed_rows=False, - rollback_on_validation_errors=False, - **kwargs - ): - logger.info("in import_data") - logger.info(dataset) - return super().import_data(dataset,dry_run,raise_errors,use_transactions,collect_failed_rows,rollback_on_validation_errors,**kwargs) - - - def before_import(self, dataset, using_transactions, dry_run, **kwargs): - logger.info("in before_import") - - # def import_row( - # self, - # row, - # instance_loader, - # using_transactions=True, - # dry_run=False, - # raise_errors=None, - # **kwargs - # ): - # logger.info("in import_row") - # logger.info(row) - # return super().import_row(row,instance_loader,using_transactions,dry_run,raise_errors,**kwargs) - - def after_import_row(self, row, row_result, **kwargs): - logger.info(row_result.original) - logger.info(row_result.instance) - - def import_field(self, field, obj, data, is_m2m=False, **kwargs): - logger.info(f"import_field: ${field}") - logger.info(field.attribute) - logger.info(field.widget) - logger.info(type(obj)) - logger.info(obj._meta.fields) - is_fsm = False - for f in obj._meta.fields: - logger.info(f.name) - logger.info(type(f)) - if field.attribute == f.name and isinstance(f, FSMField): - is_fsm = True - # if field.attribute in sorted(obj._meta.fields): - # logger.info("in fields") - # if not isinstance(obj._meta.fields[field.attribute], FSMField): - if not is_fsm: - logger.info("not fsm field") - #if (field.attribute != 'state'): - super().import_field(field, obj, data, is_m2m, **kwargs) - logger.info("finished importing") class DomainAdmin(ListHeaderAdmin, ImportExportModelAdmin):