我正在尝试将芹菜与已实现自定义序列化程序的自定义对象一起使用,但是芹菜工作者尝试使用酸洗。
celeryconfig.py
broker_url = 'redis://localhost'
result_backend = 'redis://localhost'
imports = ('tasks',)
accept_content = ['application/x-json']
task_serializer = 'custom_json'
result_serializer = 'custom_json'
app.py
from celery import Celery
from . import serializers
from . import celeryconfig
from kombu import serialization
serialization.register(
'custom_json',
serializers.dumps,
serializers.loads,
content_type='application/x-json',
content_encoding='utf-8',
)
app = Celery()
app.config_from_object(celeryconfig)
if __name__ == '__main__':
app.start()
main.py
from .tasks import my_task
my_obj = CustomClass()
my_task.delay(my_obj)
如果我的班级是在python中定义的,则此代码可以正常工作:
class CustomClass:
def __init__(self):
...
但是我的CustomClass
实际上来自Boost.Python绑定,该绑定是从.so文件导入的,然后从worker中收到以下错误:
[2020-04-11 16:25:08,102: INFO/MainProcess] Received task: my_task[f73a3119-65d7-4a04-9e0d-2bc25ad19dde]
...
RuntimeError: Pickling of "CustomClass" instances is not enabled (http://www.boost.org/libs/python/doc/v2/pickle.html)
我了解错误消息建议深入研究其泡菜的具体情况。但是,使用自定义json序列化程序的重点并不是要走这条路。
所以我的问题是:为什么芹菜甚至试图使用酸洗?
编辑:作为一个虚拟示例,下面的类(不带boost)是不可腌制的,将产生以下错误:
Unrecoverable error: TypeError("can't pickle generator objects",)
class NonPicklableClass:
def __init__(self, arg):
self.gen = (i for i in arg)
class CustomEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, NonPicklableClass):
return {
'__type__': 'custom',
'raw': list(o.gen),
}
return o
def hook(o):
dtype = o.get('__type__')
if dtype == 'custom':
return NonPicklableClass(o['raw'])
def dumps(o):
return json.dumps(o, cls=CustomEncoder)
def loads(s):
return json.loads(s, object_hook=hook)
我显然一定是在误解某件事
我想我已经知道了,使用自定义序列化程序将作业发送给工人。
但是,在每个工作程序中,数据都是使用常规python酸洗通过每个进程传递的。