首先,请阅读以下问题:Repeated task execution using the distributed Dask scheduler
现在,当Dask由于工作人员窃取或任务失败(例如,由于每个进程的内存限制)而决定重新运行任务时,哪个任务结果将传递到DAG的下一个节点?我们正在使用嵌套任务,例如
@dask.delayed
def add(n):
return n+1
t_a = add(1)
t_b = add(t_a)
the_output = add(add(add(t_b)))
因此,如果这些任务之一失败或被盗,并且运行了两次,那么结果将传递到DAG中的下一个节点?
有兴趣者的进一步背景:出现这种情况的原因是我们的任务写入了数据库。如果它运行两次,我们将收到一个完整性错误,因为它试图两次插入同一条记录(限制在id
和version
的组合上)。当前的计划是通过捕获任务中的完整性错误来使任务成为幂等,但我仍然不了解Dask如何“选择”结果。
add(add(add(t_b)))
之类的情况或更笼统地说
x = add(1)
y = add(x)
z = add(y)
即使这些都使用相同的功能,它们都是单独的任务。 Dask看到它们具有不同的输入,因此对它们进行了不同的处理。因此,如果这些任务之一失败或被盗,并且运行了两次,那么结果将传递到DAG中的下一个节点?在所有这些情况下,群集上一次只有一个有效结果。被盗的任务仅在新计算机上运行,而不在旧计算机上运行。如果任务结果丢失并且必须重新运行,则新值将出现在任何位置(旧值丢失了,请记住)。