Python异步并发对象问题使用Celery模块,报错

问题描述 投票:0回答:0

我正在使用 Celery 模块运行异步并发对象,但是当我使用

LetView.delay()
函数调用时报错

Celery功能代码ChatGPT帮助代码这里用到了

import dill
register('dill', dill.dumps, dill.loads, content_type='application/x-python-serialize')
celery_app = Celery('tasks', broker='amqp:*****', backend='redis://*****',task_serializer='dill', result_serializer='dill',accept_content=['dill'])


CELERY_BEAT_SCHEDULE = {
    'add-every-day': {
        'task': 'tasks.LetView',
        'schedule': crontab(),
    },
}

LetView 功能

@celery_app.task()
@PowerApp.get('/')
async def LetView():
    async with db_session() as session:
        try:
           Function internal code
        except Exception as e:
            print(e)

main.py

@app.get("/")
async def root():
    response_data = {"message": "hahahah"}
    LetView.delay()
    return JSONResponse(content=response_data)

当我运行main函数时,Celery控制台给出如下错误

LetView[14ac01a6-f357-4fce-9623-565a6f313acb] raised unexpected: EncodeError(TypeError("cannot pickle 'coroutine' object"))
Traceback (most recent call last):
  File "d:\python3\lib\site-packages\kombu\serialization.py", line 39, in _reraise_errors
    yield
  File "d:\python3\lib\site-packages\kombu\serialization.py", line 210, in dumps
    payload = encoder(data)
  File "d:\python3\lib\site-packages\dill\_dill.py", line 263, in dumps
    dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio)
  File "d:\python3\lib\site-packages\dill\_dill.py", line 235, in dump
    Pickler(file, protocol, **_kwds).dump(obj)
  File "d:\python3\lib\site-packages\dill\_dill.py", line 394, in dump
    StockPickler.dump(self, obj)
  File "d:\python3\lib\pickle.py", line 487, in dump
    self.save(obj)
  File "d:\python3\lib\site-packages\dill\_dill.py", line 388, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "d:\python3\lib\pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "d:\python3\lib\site-packages\dill\_dill.py", line 1186, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "d:\python3\lib\pickle.py", line 971, in save_dict
    self._batch_setitems(obj.items())
  File "d:\python3\lib\pickle.py", line 997, in _batch_setitems
    save(v)
  File "d:\python3\lib\site-packages\dill\_dill.py", line 388, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "d:\python3\lib\pickle.py", line 578, in save
    rv = reduce(self.proto)
TypeError: cannot pickle 'coroutine' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "d:\python3\lib\site-packages\celery\app\trace.py", line 518, in trace_task
    task.backend.mark_as_done(
  File "d:\python3\lib\site-packages\celery\backends\base.py", line 162, in mark_as_done
    self.store_result(task_id, result, state, request=request)
  File "d:\python3\lib\site-packages\celery\backends\base.py", line 528, in store_result
    self._store_result(task_id, result, state, traceback,
  File "d:\python3\lib\site-packages\celery\backends\base.py", line 962, in _store_result
    self._set_with_state(self.get_key_for_task(task_id), self.encode(meta), state)
  File "d:\python3\lib\site-packages\celery\backends\base.py", line 418, in encode
    _, _, payload = self._encode(data)
  File "d:\python3\lib\site-packages\celery\backends\base.py", line 422, in _encode
    return dumps(data, serializer=self.serializer)
  File "d:\python3\lib\site-packages\kombu\serialization.py", line 210, in dumps
    payload = encoder(data)
  File "d:\python3\lib\contextlib.py", line 135, in __exit__
    self.gen.throw(type, value, traceback)
  File "d:\python3\lib\site-packages\kombu\serialization.py", line 43, in _reraise_errors
    reraise(wrapper, wrapper(exc), sys.exc_info()[2])
  File "d:\python3\lib\site-packages\kombu\exceptions.py", line 21, in reraise
    raise value.with_traceback(tb)
  File "d:\python3\lib\site-packages\kombu\serialization.py", line 39, in _reraise_errors
    yield
  File "d:\python3\lib\site-packages\kombu\serialization.py", line 210, in dumps
    payload = encoder(data)
  File "d:\python3\lib\site-packages\dill\_dill.py", line 263, in dumps
    dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio)
  File "d:\python3\lib\site-packages\dill\_dill.py", line 235, in dump
    Pickler(file, protocol, **_kwds).dump(obj)
  File "d:\python3\lib\site-packages\dill\_dill.py", line 394, in dump
    StockPickler.dump(self, obj)
  File "d:\python3\lib\pickle.py", line 487, in dump
    self.save(obj)
  File "d:\python3\lib\site-packages\dill\_dill.py", line 388, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "d:\python3\lib\pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "d:\python3\lib\site-packages\dill\_dill.py", line 1186, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "d:\python3\lib\pickle.py", line 971, in save_dict
    self._batch_setitems(obj.items())
  File "d:\python3\lib\pickle.py", line 997, in _batch_setitems
    save(v)
  File "d:\python3\lib\site-packages\dill\_dill.py", line 388, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "d:\python3\lib\pickle.py", line 578, in save
    rv = reduce(self.proto)
kombu.exceptions.EncodeError: cannot pickle 'coroutine' object
[2023-03-28 20:51:41,479: WARNING/MainProcess] d:\python3\lib\site-packages\eventlet\greenio\base.py:425: RuntimeWarning: coroutine 'LetView' was never awaited
  f = howlong.__float__

是否可以直接将并发对象转换为Celery可以使用的对象?我该如何转换它?

python-3.x celery celery-task celerybeat
© www.soinside.com 2019 - 2024. All rights reserved.