Dask字典到延迟的对象适配器

问题描述 投票:0回答:1

我一直在搜索,但是没有找到解决方案。我一直在研究Dask词典,但团队正在研究延迟的对象。我需要将dsk {}转换为上一步延迟的对象。

我现在要做什么:

def add(x, y):
    return x+y

dsk = {
      'step1' : (add, 1, 2),
      'step2' : (add, 'step1', 3),
      'final' : (add, 'step2', 'step1'),
}

dask.visualize(dsk)
client.get(dsk, 'final')

通过这种工作方式,我所有的功能都是普通的python函数。但是,这与我们的团队不同。

团队正在做什么:

@dask.delayed
def add(x, y)
    return x+y

step1 = add(1, 2)
step2 = add(step1, 3)
final = add(step2, step1)

final.visualize()
client.submit(final)

然后,他们将使用最后一步延迟对象进一步安排工作。如何将dsk最后一步final转换为延迟的对象?

我目前的想法(尚不可行)

from dask.optimization import cull

outputs = ['final']
dsk1, dependencies = cull(dsk, outputs)  # remove unnecessary tasks from the graph

此后,我不确定如何构造延迟的对象。

谢谢!

python dask dask-distributed
1个回答
0
投票

最后,我找到了解决方法。这个想法是遍历dsk来创建延迟的对象和依赖项。

# Covnert dsk dictionary to dask.delayed objects
for dsk_name, dsk_values in dsk.items():
    args = []
    dsk_function = dsk_values[0]
    dsk_arguments = dsk_values[1:]
    for arg in dsk_arguments:
        if isinstance(arg, str):
            # try to find the arguments in globals and return dependent dask object
            args.append( globals().get(arg, arg) )
        else:
            args.append(arg)
    globals()[dsk_name] = dask.delayed(dsk_function)(*args)
© www.soinside.com 2019 - 2024. All rights reserved.