我有一个包含多个任务、链和弦的芹菜工作流程。 其中一些任务需要顺序执行,另一些任务可以并行执行。
问题是当一个和弦出现错误时,所有的链都被打断了。即使出现错误,我也需要继续这个过程。
这是重现我的用例的最小示例
from celery import chain, chord
from celery_app import celery_app
@celery_app.task(name="TASK_1")
def task_1():
print("task_1")
@celery_app.task(name="TASK_2")
def task_2():
print("task_2")
@celery_app.task(name="PARALLEL_TASK")
def parallel_task():
# HEAVY HTTP Call
print("parallel task")
@celery_app.task(name="ERROR_PARALLEL_TASK")
def error_parallel_task():
print("task error")
raise ValueError()
@celery_app.task(name="TASK_3")
def task_3():
print("task_3")
@celery_app.task(name="TASK_4")
def task_4():
print("task_4")
chain(
task_1.si(),
task_2.si(),
chord(
[
parallel_task.si(),
parallel_task.si(),
error_parallel_task.si(),
error_parallel_task.si(),
parallel_task.si(),
parallel_task.si()
],
task_3.si(),
),
task_4.si(),
).delay()
这个例子产生一个
celery.exceptions.ChordError: Dependency 10719ad2-cd61-4596-bb9b-92d42d1ea967 raised ValueError()
我需要进程继续执行 task_3 和 task_4,即使该组面临错误。
到目前为止我尝试了什么:
ignore_result=True
throws=(ValueError,)
acks_on_failure_or_timeout=True
chain(
group(
parallel_task.si(),
error_parallel_task.si(),
).on_error(task_3.si()),
task_3.si()
)
这个有效,但 task_3 被多次调用,我不想要那个。
有没有办法简单地忽略和弦/链中的错误以允许该过程继续? 芹菜是实现我想要做的事情的正确工具吗?