Python celery - 如何等待和弦中的所有子任务

问题描述 投票:0回答:3

我正在对 celery 任务进行单元测试。 我有连锁任务,也有小组,所以产生了一个和弦。

测试应如下所示:

  • 运行 celery 任务(延迟)
  • 等待任务和所有子任务
  • 断言

我尝试了以下方法:

def wait_for_result(result):
    result.get()
    for child in result.children or list():
        if isinstance(child, GroupResult):
           # tried looping over task result in group
           # until tasks are ready, but without success 
           pass
        wait_for_result(child)

这会造成死锁,chord_unlock 会永远重试。 我对任务结果不感兴趣。 如何才能等待所有子任务完成?

python celery celery-task chord
3个回答
5
投票

虽然这是一个老问题,但我只是想分享我如何摆脱僵局问题,以防万一它对某人有帮助。

就像 celery 日志所说,永远不要在任务中使用

get()
。这确实会造成僵局。

我有一组类似的芹菜任务,其中包括一系列小组任务,因此使其成为一个和弦。我使用龙卷风通过发出 HTTP 请求来调用这些任务。所以我做了这样的事情:

@task
def someFunction():
    ....


@task
def someTask():
    ....


@task
def celeryTask():
    groupTask = group([someFunction.s(i) for i in range(10)])

    job = (groupTask| someTask.s())

    return job

celeryTask()
被tornado调用时,链将开始执行,
someTask()
的UUID将保存在
job
中。它看起来像这样

异步结果:765b29a8-7873-4b28-b05c-7e19c33e950c

返回此 UUID,并且

celeryTask()
在链开始执行(理想情况下)之前退出,从而为另一个进程的运行留下空间。

然后我使用龙卷风层来检查任务的状态。有关龙卷风层的详细信息可以在这个stackoverflow问题

中找到

2
投票

你尝试过和弦+回调吗?

http://docs.celeryproject.org/en/latest/userguide/canvas.html#chords

>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()
9900

0
投票

您可以使用group来获取并等待所有响应。然而,如果我们谈论的是长期任务,它可能并不总是最合适的解决方案。要处理多个任务,您还可以使用 chainschords。这个链接非常有解释性,帮助我做出决定:

https://sayari3.com/articles/18-chains-groups-and-chords-in-celery/

© www.soinside.com 2019 - 2024. All rights reserved.