我是芹菜新手。我尝试分别使用 redis 和 postgres db 作为结果后端。对于redis后端,成功检索到结果,对于postgres后端,检索到的结果似乎是字节格式。我想知道我的代码/设置中是否遗漏了任何内容,或者这是预期的?
我在笔记本电脑上的 docker 容器中启动了 redis 和 postgres,如下所示:
docker run -d --name redis_broker -p 6379:6379 redis:latest
docker run -d --name pg_backend -p 5432:5432 -e POSTGRES_PASSWORD=123 postgres:latest
然后我写了以下非常简单的Python代码:
任务.py
from celery import Celery
app = Celery('tasks',
#backend='redis://localhost:6379/0',
backend='db+postgresql://postgres:123@localhost:5432/celery',
broker='redis://localhost:6379/0')
@app.task(name='tasks.add')
def add(x:int, y:int) -> int:
z = x+y
return z
来电者.py
from tasks import add
result = add.delay(3,2)
print(result.get())
接下来,我在终端中启动我的芹菜工作人员,如下所示:
celery -A tasks workder --loglevel=INFO
在另一个终端窗口中,我运行 caller.py,如下所示:
python caller.py
caller.py完成后,终端中打印出5,这是正确的。
然后我尝试从结果后端检索 python 中的结果。而对于redis后端,我得到了正确的结果:
>>> import redis
>>> backend = redis.Redis(host='localhost', port=6379)
>>> backend.get('celery-task-meta-1523014d-0ea7-42c2-83b9-272bcdb72891')
b'{"status": "SUCCESS", "result": 5, "traceback": null, "children": [], "date_done": "2021-11-13T07:23:55.012370", "task_id": "1523014d-0ea7-42c2-83b9-272bcdb72891"}'
对于 postgres 后端,我得到以下内容:
>>> import sqlalchemy
>>> from sqlalchemy import create_engine
>>> engine = create_engine('postgresql+psycopg2://postgres:123@localhost:5432/celery')
>>> conn = engine.connect()
>>> qry = 'SELECT * FROM celery_taskmeta;'
>>> cursor = conn.execute(qry)
>>> cursor.fetchall()
(1, 'eb21609d-0f84-47c7-afb4-9bccbe41eda4', 'SUCCESS', <memory at 0x11051ef40>, datetime.datetime(2021, 11, 13, 7, 33, 55, 559665), None, None, None, None, None, None, None)]
我也尝试在 postgres 中查询
celery=> select id, task_id, result, status from celery_taskmeta;
id | task_id | result | status
----+--------------------------------------+----------------------------------+---------
1 | eb21609d-0f84-47c7-afb4-9bccbe41eda4 | \x80054b052e | SUCCESS
(1 rows)
celery=>
在这种情况下,如果我的应用程序的某些其他组件需要从我的(postgres)结果后端检索任务结果,我如何读取这个“内存对象”?
我想知道我的设置或代码中是否遗漏了任何内容?
预先感谢您的任何建议!
嗯,我通过查看 Celery 的源代码自己弄清楚了(https://github.com/celery/celery/blob/master/celery/backends/database/models.py) 在 Celery 源代码中,对于使用 SQLAlchemy 的数据库后端,结果是序列化的,其类型为:
PickleType
。因此,我的问题的答案很简单!只需简单地通过调用 pickle.loads()
来反序列化它,如下所示:
>>> import sqlalchemy, pickle
>>> from sqlalchemy import create_engine
>>> engine = create_engine('postgresql+psycopg2://postgres:123@localhost:5432/celery')
>>> conn = engine.connect()
>>> qry = 'SELECT * FROM celery_taskmeta;'
>>> cursor = conn.execute(qry)
>>> res = cursor.fetchall()
>>> res
[(1, 'eb21609d-0f84-47c7-afb4-9bccbe41eda4', 'SUCCESS', <memory at 0x11051ef40>, datetime.datetime(2021, 11, 13, 7, 33, 55, 559665), None, None, None, None, None, None, None)]
>>> pickle.loads(res[0][3])
5
编辑:上面的示例代码仅用于故障排除。在生产环境中,应该改用ORM(对象关系映射)。在这种情况下,序列化/反序列化由
SQLAlchemy
自动处理,不需要显式调用 pickle
方法。
您可以使用
AsyncResult
方法得到结果。
添加caller.py:
from tasks import app # tasks is your tasks.py module
result = app.AsyncResult(task_id)
print("status", result.status, "result", result.result)