创建一个支持 json 序列化的类,以便与 Celery 一起使用

问题描述 投票:0回答:3

我正在使用 Celery 来运行一些后台任务。其中一项任务返回我创建的 python 类。考虑到有关使用 pickle 的警告,我想使用 json 来序列化和反序列化此类。

有没有一种简单的内置方法可以实现这一点?

该类非常简单,它包含 3 个属性,所有属性都是命名元组的列表。它包含一些对属性执行一些计算的方法。

我的想法是序列化/反序列化 3 个属性,因为它定义了类。

这是我对编码器的想法,但我不确定如何再次解码数据?

import json

class JSONSerializable(object):
    def __repr__(self):
        return json.dumps(self.__dict__)

class MySimpleClass(JSONSerializable):
    def __init__(self, p1, p2, p3): # I only care about p1, p2, p3
        self.p1 = p1
        self.p2 = p2
        self.p3 = p2
        self.abc = p1 + p2 + p2

    def some_calc(self):
        ...
python json celery
3个回答
6
投票

首先但并非最不重要的是:针对 pickle 的警告主要是如果您可以让第三部分在工作流上注入 pickle 数据。如果您确定自己的系统正在创建所有要使用的腌制数据,则根本不存在安全问题。至于兼容性,如果您的 Pickle 文件的生产者和消费者使用相同的 Python 版本,那么它相对容易处理,并且是自动的。

也就是说,对于 JSON,您必须创建 Python 的

json.JSONEncoder
json.JSONDecoder
的子类 - 每个子类都需要作为
cls
参数传递给所有
json.dump(s)
json.load(s)
调用.

一个建议是编码器上的

default
方法对类
__module__
、其
__name__
和标识符键进行编码,例如
__custom__
以确保它应该被自定义解码,作为字典的键,以及对象的数据作为“数据”键。

在编码器上,您检查

__custom__
键,然后它们使用
__new__
方法实例化一个类,并填充其字典。与 pickle 一样,在类
__init__
上触发的副作用不会运行。

您稍后可以增强解码器和编码器,例如,它们在类中搜索只能处理所需属性的

__json_encode__
方法。

示例实现:

import json

class GenericJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        try:
            return super().default(obj)
        except TypeError:
            pass
        cls = type(obj)
        result = {
            '__custom__': True,
            '__module__': cls.__module__,
            '__name__': cls.__name__,
            'data': obj.__dict__ if not hasattr(cls, '__json_encode__') else obj.__json_encode__
        }
        return result


class GenericJSONDecoder(json.JSONDecoder):
    def decode(self, str):
        result = super().decode(str)
        if not isinstance(result, dict) or not result.get('__custom__', False):
            return result
        import sys
        module = result['__module__']
        if not module in sys.modules:
            __import__(module)
        cls = getattr(sys.modules[module], result['__name__'])
        if hasattr(cls, '__json_decode__'):
            return cls.__json_decode__(result['data'])
        instance = cls.__new__(cls)
        instance.__dict__.update(result['data'])
        return instance

控制台交互测试:

In [36]: class A:
    ...:     def __init__(self, a):
    ...:         self.a = a
    ...:         

In [37]: a = A('test')

In [38]: b = json.loads(json.dumps(a, cls=GenericJSONEncoder),  cls=GenericJSONDecoder)

In [39]: b.a
Out[39]: 'test'

1
投票

这是 @jsbueno 提供的出色解决方案的改进版本,它也适用于嵌套自定义类型。

import json
import collections
import six

def is_iterable(arg):
    return isinstance(arg, collections.Iterable) and not isinstance(arg, six.string_types)


class GenericJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        try:
            return super().default(obj)
        except TypeError:
            pass
        cls = type(obj)
        result = {
            '__custom__': True,
            '__module__': cls.__module__,
            '__name__': cls.__name__,
            'data': obj.__dict__ if not hasattr(cls, '__json_encode__') else obj.__json_encode__
        }
        return result


class GenericJSONDecoder(json.JSONDecoder):
    def decode(self, str):
        result = super().decode(str)
        return GenericJSONDecoder.instantiate_object(result)

    @staticmethod
    def instantiate_object(result):
        if not isinstance(result, dict):  # or
            if is_iterable(result):
                return [GenericJSONDecoder.instantiate_object(v) for v in result]
            else:
                return result

        if not result.get('__custom__', False):
            return {k: GenericJSONDecoder.instantiate_object(v) for k, v in result.items()}

        import sys
        module = result['__module__']
        if module not in sys.modules:
            __import__(module)
        cls = getattr(sys.modules[module], result['__name__'])
        if hasattr(cls, '__json_decode__'):
            return cls.__json_decode__(result['data'])
        instance = cls.__new__(cls)
        data = {k: GenericJSONDecoder.instantiate_object(v) for k, v in result['data'].items()}
        instance.__dict__.update(data)
        return instance


class C:

    def __init__(self):
        self.c = 133

    def __repr__(self):
        return "C<" + str(self.__dict__) + ">"


class B:

    def __init__(self):
        self.b = {'int': 123, "c": C()}
        self.l = [123, C()]
        self.t = (234, C())
        self.s = "Blah"

    def __repr__(self):
        return "B<" + str(self.__dict__) + ">"


class A:
    class_y = 13

    def __init__(self):
        self.x = B()

    def __repr__(self):
        return "A<" + str(self.__dict__) + ">"


def dumps(obj, *args, **kwargs):
    return json.dumps(obj, *args, cls=GenericJSONEncoder, **kwargs)


def dump(obj, *args, **kwargs):
    return json.dump(obj, *args, cls=GenericJSONEncoder, **kwargs)


def loads(obj, *args, **kwargs):
    return json.loads(obj, *args, cls=GenericJSONDecoder, **kwargs)


def load(obj, *args, **kwargs):
    return json.load(obj, *args, cls=GenericJSONDecoder, **kwargs)

检查一下:

e = dumps(A())
print("ENCODED:\n\n", e)
b = json.loads(e, cls=GenericJSONDecoder)
b = loads(e)
print("\nDECODED:\n\n", b)

打印:

 A<{'x': B<{'b': {'int': 123, 'c': C<{'c': 133}>}, 'l': [123, C<{'c': 133}>], 't': [234, C<{'c': 133}>], 's': 'Blah'}>}>

原始版本仅正确重建

A
,而
B
C
的所有实例都没有实例化而是保留为字典:

A<{'x': {'__custom__': True, '__module__': '__main__', '__name__': 'B', 'data': {'b': {'int': 123, 'c': {'__custom__': True, '__module__': '__main__', '__name__': 'C', 'data': {'c': 133}}}, 'l': [123, {'__custom__': True, '__module__': '__main__', '__name__': 'C', 'data': {'c': 133}}], 't': [234, {'__custom__': True, '__module__': '__main__', '__name__': 'C', 'data': {'c': 133}}], 's': 'Blah'}}}>

请注意,如果类型包含列表或元组等集合,则在解码过程中无法恢复集合的实际类型。这是因为所有这些集合在编码为 json 时都会转换为列表。


0
投票

如果有人看到此内容并遇到类似问题:

Kombu尝试使用魔术方法'json'来对对象进行json序列化:https://github.com/celery/kombu/blob/562b6f6fefa8f00fb5667225e4f55277fc4bfd9b/kombu/utils/json.py#L26.

© www.soinside.com 2019 - 2024. All rights reserved.