我一直在搜索,但是没有找到解决方案。我一直在研究Dask词典,但团队正在研究延迟的对象。我需要将dsk {}转换为上一步延迟的对象。
我现在要做什么:
def add(x, y):
return x+y
dsk = {
'step1' : (add, 1, 2),
'step2' : (add, 'step1', 3),
'final' : (add, 'step2', 'step1'),
}
dask.visualize(dsk)
client.get(dsk, 'final')
通过这种工作方式,我所有的功能都是普通的python函数。但是,这与我们的团队不同。
团队正在做什么:
@dask.delayed
def add(x, y)
return x+y
step1 = add(1, 2)
step2 = add(step1, 3)
final = add(step2, step1)
final.visualize()
client.submit(final)
然后,他们将使用最后一步延迟对象进一步安排工作。如何将dsk最后一步final转换为延迟的对象?
我目前的想法(尚不可行)
from dask.optimization import cull
outputs = ['final']
dsk1, dependencies = cull(dsk, outputs) # remove unnecessary tasks from the graph
此后,我不确定如何构造延迟的对象。
谢谢!
最后,我找到了解决方法。这个想法是遍历dsk来创建延迟的对象和依赖项。
# Covnert dsk dictionary to dask.delayed objects
for dsk_name, dsk_values in dsk.items():
args = []
dsk_function = dsk_values[0]
dsk_arguments = dsk_values[1:]
for arg in dsk_arguments:
if isinstance(arg, str):
# try to find the arguments in globals and return dependent dask object
args.append( globals().get(arg, arg) )
else:
args.append(arg)
globals()[dsk_name] = dask.delayed(dsk_function)(*args)