Dask如何决定是否重新运行任务

问题描述 投票:0回答:1

我对Dask并不陌生,他试图构建一个系统来执行具有依赖关系的计算图。但是,有些任务虽然具有静态签名,却被执行两次,这让我感到困惑。例如:

Python 3.7.5 (default, Nov 12 2019, 11:34:05)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-18)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask.distributed import Client
>>> client = Client()
>>> def a():
...   print("a")
...
>>> client.gather(client.submit(a))
a
>>> client.gather(client.submit(a))
a
>>> client.submit(a)
<Future: pending, key: a-a5eb50e9015acdf60b1094aa4e467e00>
a
>>> client.submit(a)
<Future: finished, type: builtins.NoneType, key: a-a5eb50e9015acdf60b1094aa4e467e00>
>>> client.gather(client.submit(a))
>>> client.gather(client.submit(a))
>>>

因此,看起来每次使用a()进行的调用都将执行client.gather(client.submit(a)),但是直到我自己调用client.submit(a)为止,此后再次使用相同的Future并且不再调用该函数。这是为什么?

在我的计算图中,当两个任务依赖同一任务时,这将是一个问题,该任务只能执行一次。我当前(递归地)处理此类依赖关系的方法如下:

from dask.distributed import Client, worker_client

def x(n):
    dgraph = {
        'a': [],
        'b': ['a'],
        'c': ['b', 'a'],
        'd': ['b', 'c']
        }
    print(n)
    with worker_client() as client:
        client.gather(list(client.submit(x, d) for d in dgraph[n]))

if __name__ == '__main__':
    client = Client()
    result = client.submit(x, 'd')
    client.gather(result)

有趣的是,执行该脚本时python的输出不稳定:

$ python test_dask2.py
d
b
c
a
b
a
a
$ python test_dask2.py
d
b
c
a
distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': 'x-bff0c0d6e4239ae9c5beaed070018a1e'}
distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': 'x-59dc11a9fc2db8a0885e47d3e5891304'}
$

是否有办法确保给定任务和给定输入仅执行一次,即使我多次提交?如果我对documentation的理解正确,那应该是正常的行为。如果对print的调用产生了副作用,那么它为什么会不一致,以及如何例如阻止生成输出文件的任务执行两次?

而且末尾发生什么错误,这种错误并不总是发生?

编辑:

[我想我想出了为什么我的某些任务在第二个代码段中要运行几次:dask的submit分配给任务以进行标识的哈希值有时看起来有时会有所不同,即使多次提交同一任务(和甚至在任务完成并超出范围之前)。将key中的submit参数设置为固定值(例如任务名称)可以解决该问题。

python dask dask-distributed
1个回答
1
投票

简短的答案是:dask在需要的时候将结果保存在内存中。在这些情况下,“需要”可以是会话中的未来,也可以是取决于结果的其他任务。

在类似client.gather(client.submit(a))的行中,由submit生成的未来在被收集之后会立即被忘记。在类似client.submit(a)的行中,生成的将来存储在会话的“最后结果”变量_中,因此仍会记住,并且群集不会清除它。

如果需要更多控制,可以分配以下变量:

fut = client.submit(a)  # sets func running, keeps hold of the future
fut2 = client.submit(a)  # uses already existing task to get result
client.gather(fut), fut.result() # get results
del fut2, fut  #  "forget" futures, and have cluster release them

请记住使用仪表板查看集群的当前状态。

© www.soinside.com 2019 - 2024. All rights reserved.