如何在完成任务时获得任务结果,而不是在Dask中完成任务?

问题描述 投票:0回答:1

我有一个dask数据框,想要计算一些独立的任务。有些任务比其他任务快,但是在完成更长任务后我会得到每个任务的结果。

我创建了一个本地客户端并使用client.compute()发送任务。然后我使用future.result()来获得每个任务的结果。

我正在使用线程同时询问结果并测量每个结果的计算时间,如下所示:

def get_result(future,i):
    t0 = time.time()
    print("calculating result", i)
    result = future.result()
    print("result {} took {}".format(i, time.time() - t0))

client = Client()
df = dd.read_csv(path_to_csv)

future1 = client.compute(df[df.x > 200])
future2 = client.compute(df[df.x > 500])

threading.Thread(target=get_result, args=[future1,1]).start()
threading.Thread(target=get_result, args=[future2,2]).start()

我希望上面代码的输出类似于:

calculating result 1
calculating result 2
result 2 took 10
result 1 took 46

由于第一项任务较大。

但相反,我同时得到了两个

calculating result 1
calculating result 2
result 2 took 46.3046760559082
result 1 took 46.477620363235474

我认为这是因为future2实际上在后台计算并在future1之前完成,但它等到future1完成返回。

有没有办法在结束时获得未来2的结果?

dask dask-distributed
1个回答
1
投票

您不需要以异步方式使线程使用future - 它们本身就是异步的,并且在后台监视它们的状态。如果你想按照他们准备好的顺序得到结果,你应该使用as_completed

但是,根据您的具体情况,您可能只想查看仪表板(或使用df.visulalize())来了解正在发生的计算。这两个期货都依赖于读取CSV,并且在任何一个任务可以运行之前都需要这个任务 - 并且可能占用绝大多数时间。 Dask不知道,如果不扫描所有数据,哪些行具有x的值。

© www.soinside.com 2019 - 2024. All rights reserved.