Celery 与 postgresql 给出错误“无法适应类型‘AsyncResult’”

问题描述 投票:0回答:1

我使用

postgresql
作为 Celery (v5.3.5) 的后端。

当我对任务调用 read() 时,Celery 返回 SQL 错误

ASyncResult
:

sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'AsyncResult'
[SQL: SELECT celery_taskmeta.id AS celery_taskmeta_id, celery_taskmeta.task_id AS celery_taskmeta_task_id, celery_taskmeta.status AS celery_taskmeta_status, celery_taskmeta.result AS celery_taskmeta_result, celery_taskmeta.date_done AS celery_taskmeta_date_done, celery_taskmeta.traceback AS celery_taskmeta_traceback, celery_taskmeta.name AS celery_taskmeta_name, celery_taskmeta.args AS celery_taskmeta_args, celery_taskmeta.kwargs AS celery_taskmeta_kwargs, celery_taskmeta.worker AS celery_taskmeta_worker, celery_taskmeta.retries AS celery_taskmeta_retries, celery_taskmeta.queue AS celery_taskmeta_queue 
FROM celery_taskmeta 
WHERE celery_taskmeta.task_id = %(task_id_1)s]
[parameters: {'task_id_1': <AsyncResult: 0ae578c2-85b2-4d13-9002-50604329a480>}]
(Background on this error at: https://sqlalche.me/e/20/f405)

There is quite a long traceback, mostly sqlalchemy, the last Celery trace was:
  File "/dist-packages/celery/backends/database/\_\_init__.py", line 152, in _get_task_meta_for
    task = list(session.query(self.task_cls).filter(self.task_cls.task_id == task_id))

这是任务工作脚本(称为task_queue.py):

from celery import Celery, current_app, Task

broker = 'sqla+postgresql://user:password@server/db'
backend = 'db+postgresql://user:password@server/db'
app : Celery = Celery('tasks', broker=broker, backend=backend)

@app.task(bind=True)
def long_running_task(self, seconds : int):
    """ A task that takes a number of seconds to complete. """
    print('Starting count off of {0} seconds'.format(seconds))
    for i in range(seconds):
        print('{0} seconds left'.format(seconds - i))
        sleep(1)

这就是它的名字:

from task_queue import long_running_task
from celery.result import AsyncResult

id = long_running_task.delay(5)
print(f"Task ID {id} queued")
task : AsyncResult = AsyncResult(id, app=long_running_task.app)

while not task.ready():
    print("Waiting for task to complete")
    sleep(1)

我看不出我在这里做错了什么,如果我使用 rpc 作为后端,同样的代码也可以工作。 显然,来自 Celery 的 SQL 调用需要任务 ID,但得到的是 AsyncResult。这是芹菜中的错误吗? 任何想法都非常感激。

(附录:尝试从任务结果中获取任何信息时会发生同样的错误,例如:task.name、task.result、task.args)

python postgresql sqlalchemy celery
1个回答
0
投票

在阅读此处的评论时完全偶然发现了答案:如何检查 Celery 中的任务状态?

似乎调用“delay”已经返回一个AsyncResult,但它与“原始”celery.result.AsyncResult 不同。此 AsyncResult 返回 task_id 作为默认属性,因此在传递给 sqlalchemy 时它可以工作以获取任务元数据。

因此,修复方法只是查询从“延迟”返回的对象 - 无需获取 AsyncResult:

改变:

id = long_running_task.delay(5)
print(f"Task ID {id} queued")
task : AsyncResult = AsyncResult(id, app=long_running_task.app)

至:

task = long_running_task.delay(5)
print(f"Task ID {task} queued, state {task.state}, task_id type {type(task)}")

有效!

© www.soinside.com 2019 - 2024. All rights reserved.