当Dask任务运行多次时,使用哪个结果?

问题描述 投票:0回答:1

首先,请阅读以下问题:Repeated task execution using the distributed Dask scheduler

现在,当Dask由于工作人员窃取或任务失败(例如,由于每个进程的内存限制)而决定重新运行任务时,哪个任务结果将传递到DAG的下一个节点?我们正在使用嵌套任务,例如

@dask.delayed
def add(n):
    return n+1

t_a = add(1)
t_b = add(t_a)
the_output = add(add(add(t_b)))

因此,如果这些任务之一失败或被盗,并且运行了两次,那么结果将传递到DAG中的下一个节点?

有兴趣者的进一步背景:出现这种情况的原因是我们的任务写入了数据库。如果它运行两次,我们将收到一个完整性错误,因为它试图两次插入同一条记录(限制在idversion的组合上)。当前的计划是通过捕获任务中的完整性错误来使任务成为幂等,但我仍然不了解Dask如何“选择”结果。

dask dask-distributed dask-delayed
1个回答
0
投票
[如果您遇到add(add(add(t_b)))之类的情况

或更笼统地说

x = add(1) y = add(x) z = add(y)

即使这些都使用相同的功能,它们都是单独的任务。 Dask看到它们具有不同的输入,因此对它们进行了不同的处理。 

因此,如果这些任务之一失败或被盗,并且运行了两次,那么结果将传递到DAG中的下一个节点?

在所有这些情况下,群集上一次只有一个有效结果。被盗的任务仅在新计算机上运行,​​而不在旧计算机上运行。如果任务结果丢失并且必须重新运行,则新值将出现在任何位置(旧值丢失了,请记住)。

© www.soinside.com 2019 - 2024. All rights reserved.