我遇到了以下种类的芹菜工作流程的一些非常奇怪的行为:
workflow = group(
chain(task1.s(), task2.s()),
chain(task3.s(), task4.s()),
)
这是在django的背景下。
当我按如下方式调用工作流程时:
workflow.apply_async((n,))
...对于n的任何整数值,每个链中的第一个任务(task1
和task3
)将失败并出现如下的TypeError(取自celery events
):
args: [9, 8, 7, 5, 4, 3]
kwargs: {}
retries: 0
exception: TypeError('task1() takes exactly 1 argument (6 given)',)
state: FAILURE
第一个之后的参数始终是先前调用工作流的参数。所以,在这个例子中,我在这种情况下调用了workflow.apply_async((9,))
,其他数字是之前传递过的值。每次,传递给task1
和task3
的错误论点都是一样的。
我很想把它作为虫子报告给芹菜,但我还不确定这个错误在某种程度上不是我的。
我排除的事情:
workflow.apply_async
的论点。我已经单独构建并记录了我传递的元组,以确保这一点。apply_async
而不是元组并没有任何关系。我肯定传递一个元组(即不可变)。关于我的设置唯一有点不寻常的事情,虽然我看不出它是如何连接的,但task1
和task3
配置了不同的队列。
当我使用芹菜task.chunks()时遇到过类似的问题
我通过将项目列表包含在单个元组中来解决它。例如,
假设任务log_i()
是一个shared_task,基本上记录变量i
,我希望通过分块记录所有i
s的列表我会 -
# log_i shared Task
@shared_task(bind=True)
def log_i(self, i):
logger.info(i)
return i
和
# some calling site
# ...
very_long_list = [{"i1": i, "i2": i+i, "i3": i**2} for i in range(1000)]
res = log_i.chunks(zip(very_long_list), 10)()
print(res.get())
# ...
做自我注意做的事情 -
# ...
res = log_i.chunks(very_long_list, 10)()
# ...
当列表中的项目不是可迭代时,您将会遇到错误。
Zipping将项目原样移动到新元组中,通过这种方式,您可以将其捕获到log_i
任务中的单个参数中。