我有一个简单的 Celery
task.py
与 RabbitMQ 消息代理和 Redis 数据存储一起运行
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//', backend="redis://localhost:6379/0")
@app.task
def add(x, y):
return x + y
和一个
listener.py
服务,具有简单的功能
def on_add(result):
# Do something with the result.
我想以即发即弃的方式调用
add()
,并让另一个实现 on_add()
的服务处理结果。
这是工作流程图:
如何在 Celery 的后端 Redis 上创建一个订阅任务完成事件的监听器?
这里至少有两个选择:
@task_postrun.connect
def task_postrun_handler(task_id, task, args, retval, **kwargs):
if task.name == "add":
on_add(retval)
注意,它将在同一个 celery worker 中运行。
您可以在任务完成后使用“链接”运行任务:
add.s().apply_async(
link=notify.s()
)
这将运行
add
任务,完成后它将运行 notify
并添加结果。
PS:还有一个link_error,如果你的任务失败了就运行任务