postgres数据库中的celery任务结果是字节格式的?

问题描述 投票:0回答:2

我是芹菜新手。我尝试分别使用 redis 和 postgres db 作为结果后端。对于redis后端,成功检索到结果,对于postgres后端,检索到的结果似乎是字节格式。我想知道我的代码/设置中是否遗漏了任何内容,或者这是预期的?

我在笔记本电脑上的 docker 容器中启动了 redis 和 postgres,如下所示:

docker run -d --name redis_broker -p 6379:6379 redis:latest
docker run -d --name pg_backend -p 5432:5432 -e POSTGRES_PASSWORD=123 postgres:latest

然后我写了以下非常简单的Python代码:

任务.py

from celery import Celery

app = Celery('tasks',
             #backend='redis://localhost:6379/0',
             backend='db+postgresql://postgres:123@localhost:5432/celery',
             broker='redis://localhost:6379/0')

@app.task(name='tasks.add')
def add(x:int, y:int) -> int:
    z = x+y
    return z

来电者.py

from tasks import add

result = add.delay(3,2)
print(result.get())

接下来,我在终端中启动我的芹菜工作人员,如下所示:

celery -A tasks workder --loglevel=INFO

在另一个终端窗口中,我运行 caller.py,如下所示:

python caller.py

caller.py完成后,终端中打印出5,这是正确的。

然后我尝试从结果后端检索 python 中的结果。而对于redis后端,我得到了正确的结果:

>>> import redis
>>> backend = redis.Redis(host='localhost', port=6379)
>>> backend.get('celery-task-meta-1523014d-0ea7-42c2-83b9-272bcdb72891')
b'{"status": "SUCCESS", "result": 5, "traceback": null, "children": [], "date_done": "2021-11-13T07:23:55.012370", "task_id": "1523014d-0ea7-42c2-83b9-272bcdb72891"}'

对于 postgres 后端,我得到以下内容:

>>> import sqlalchemy
>>> from sqlalchemy import create_engine
>>> engine = create_engine('postgresql+psycopg2://postgres:123@localhost:5432/celery')
>>> conn = engine.connect()
>>> qry = 'SELECT * FROM celery_taskmeta;'
>>> cursor = conn.execute(qry)
>>> cursor.fetchall()
(1, 'eb21609d-0f84-47c7-afb4-9bccbe41eda4', 'SUCCESS', <memory at 0x11051ef40>, datetime.datetime(2021, 11, 13, 7, 33, 55, 559665), None, None, None, None, None, None, None)]

我也尝试在 postgres 中查询

celery=> select id, task_id, result, status from celery_taskmeta;
 id |               task_id                |              result              | status
----+--------------------------------------+----------------------------------+---------
  1 | eb21609d-0f84-47c7-afb4-9bccbe41eda4 | \x80054b052e                     | SUCCESS
(1 rows)

celery=>

在这种情况下,如果我的应用程序的某些其他组件需要从我的(postgres)结果后端检索任务结果,我如何读取这个“内存对象”?

我想知道我的设置或代码中是否遗漏了任何内容?

预先感谢您的任何建议!

python postgresql celery backend
2个回答
1
投票

嗯,我通过查看 Celery 的源代码自己弄清楚了(https://github.com/celery/celery/blob/master/celery/backends/database/models.py) 在 Celery 源代码中,对于使用 SQLAlchemy 的数据库后端,结果是序列化的,其类型为:

PickleType
。因此,我的问题的答案很简单!只需简单地通过调用
pickle.loads()
来反序列化它,如下所示:

>>> import sqlalchemy, pickle
>>> from sqlalchemy import create_engine
>>> engine = create_engine('postgresql+psycopg2://postgres:123@localhost:5432/celery')
>>> conn = engine.connect()
>>> qry = 'SELECT * FROM celery_taskmeta;'
>>> cursor = conn.execute(qry)
>>> res = cursor.fetchall()
>>> res
[(1, 'eb21609d-0f84-47c7-afb4-9bccbe41eda4', 'SUCCESS', <memory at 0x11051ef40>, datetime.datetime(2021, 11, 13, 7, 33, 55, 559665), None, None, None, None, None, None, None)]
>>> pickle.loads(res[0][3])
5

编辑:上面的示例代码仅用于故障排除。在生产环境中,应该改用ORM(对象关系映射)。在这种情况下,序列化/反序列化由

SQLAlchemy
自动处理,不需要显式调用
pickle
方法。


0
投票

您可以使用

AsyncResult
方法得到结果。

添加caller.py

from tasks import app  # tasks is your tasks.py module

result = app.AsyncResult(task_id)
print("status", result.status, "result", result.result)
© www.soinside.com 2019 - 2024. All rights reserved.