RabbitMQ 和 Celery:订阅工作完成事件

问题描述 投票:0回答:2

我有一个简单的 Celery

task.py
RabbitMQ 消息代理和 Redis 数据存储一起运行

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//', backend="redis://localhost:6379/0")

@app.task
def add(x, y):
    return x + y

和一个

listener.py
服务,具有简单的功能

def on_add(result):
    # Do something with the result.

我想以即发即弃的方式调用

add()
,并让另一个实现
on_add()
的服务处理结果。

这是工作流程图:

如何在 Celery 的后端 Redis 上创建一个订阅任务完成事件的监听器?

python redis rabbitmq celery
2个回答
1
投票

这里至少有两个选择:

  1. 使用信号 - task-postrun 例如:
@task_postrun.connect
def task_postrun_handler(task_id, task, args, retval, **kwargs):
    if task.name == "add":
        on_add(retval)

注意,它将在同一个 celery worker 中运行。

  1. 如果需要在单独的进程中,可以采取flower的方式,监听broker的事件(比较复杂)。

0
投票

您可以在任务完成后使用“链接”运行任务:

add.s().apply_async(
   link=notify.s()
)

这将运行

add
任务,完成后它将运行
notify
并添加结果。

PS:还有一个link_error,如果你的任务失败了就运行任务

© www.soinside.com 2019 - 2024. All rights reserved.