我怎样才能结合顺序以及延迟函数调用的并行执行?

问题描述 投票:0回答:1

我被困在一个陌生的地方。我有一大堆的延迟函数调用,我想在一定的顺序来执行。而并行执行是简单的:

res = client.compute([myfuncs])
res = client.gather(res)

我似乎无法找到一种方法,按顺序执行它们,在无阻塞的方式。

这里有一个小例子:

import numpy as np
from time import sleep
from datetime import datetime

from dask import delayed
from dask.distributed import LocalCluster, Client


@delayed
def dosomething(name):
    res = {"name": name, "beg": datetime.now()}
    sleep(np.random.randint(10))
    res.update(rand=np.random.rand())
    res.update(end=datetime.now())
    return res


seq1 = [dosomething(name) for name in ["foo", "bar", "baz"]]
par1 = dosomething("whaat")
par2 = dosomething("ahem")
pipeline = [seq1, par1, par2]

鉴于上面的例子中,我想在并行运行seq1par1,和par2,但seq1的成分:“foo”的“条”和“baz”的,在序列。

dask dask-distributed dask-delayed
1个回答
1
投票

你一定能欺骗并添加一个可选的依赖于你的功能如下:

@dask.delayed
def dosomething(name, *args):
     ...

这样就可以使任务依赖于一个另一个,甚至认为你没有在函数的下一次运行使用一个结果:

inputs = ["foo", "bar", "baz"]
seq1 = [dosomething(inputs[0])]
for bit in inputs[1:]:
    seq1.append(dosomething(bit, seq1[-1]))

另外,您也可以阅读关于分布式调度的“期货”的界面,让你可以监控任务的实时进度。

© www.soinside.com 2019 - 2024. All rights reserved.