如何忽略 celery chord 中的错误并允许进程继续

问题描述 投票:0回答:0

我有一个包含多个任务、链和弦的芹菜工作流程。 其中一些任务需要顺序执行,另一些任务可以并行执行。

问题是当一个和弦出现错误时,所有的链都被打断了。即使出现错误,我也需要继续这个过程。

这是重现我的用例的最小示例

from celery import chain, chord

from celery_app import celery_app


@celery_app.task(name="TASK_1")
def task_1():
    print("task_1")


@celery_app.task(name="TASK_2")
def task_2():
    print("task_2")


@celery_app.task(name="PARALLEL_TASK")
def parallel_task():
    # HEAVY HTTP Call
    print("parallel task")


@celery_app.task(name="ERROR_PARALLEL_TASK")
def error_parallel_task():
    print("task error")
    raise ValueError()


@celery_app.task(name="TASK_3")
def task_3():
    print("task_3")


@celery_app.task(name="TASK_4")
def task_4():
    print("task_4")


chain(
    task_1.si(),
    task_2.si(),
    chord(
        [
            parallel_task.si(),
            parallel_task.si(),
            error_parallel_task.si(),
            error_parallel_task.si(),
            parallel_task.si(),
            parallel_task.si()
        ],
        task_3.si(),
    ),
    task_4.si(),
).delay()

这个例子产生一个

celery.exceptions.ChordError: Dependency 10719ad2-cd61-4596-bb9b-92d42d1ea967 raised ValueError()

我需要进程继续执行 task_3 和 task_4,即使该组面临错误。

到目前为止我尝试了什么:

  • 各种芹菜配置变量来忽略错误
ignore_result=True
throws=(ValueError,)
acks_on_failure_or_timeout=True

  • 使用 .on_error() 触发下一个任务,无论追加什么
chain(
    group(
        parallel_task.si(),
        error_parallel_task.si(),
    ).on_error(task_3.si()),
    task_3.si()
)

这个有效,但 task_3 被多次调用,我不想要那个。

  • 做一个永不失败的任务。使用 try / except 我可以让我的任务永不失败。它有效,但我不喜欢它。当任务实际失败时,任务将显示为成功。而且我不能以这种方式轻松使用芹菜重试系统。

有没有办法简单地忽略和弦/链中的错误以允许该过程继续? 芹菜是实现我想要做的事情的正确工具吗?

python celery django-celery chord
© www.soinside.com 2019 - 2024. All rights reserved.