我正在尝试使用 Dask 在多个系统上分配计算。 但是,有一些我无法理解的概念,因为我无法通过用于 python mutliprocessing 的简单测试重现逻辑行为。
我正在使用这个非常天真的代码:
import dask
from dask.distributed import Client
import time
def costly_simulation(p):
time.sleep(4)
return p * 2
if __name__ == "__main__":
client = Client('localhost:8786')
input_array = [10, 10, 10, 10, 10, 10, 10, 10]
futures = []
for p in input_array:
future = client.submit(costly_simulation, p)
futures.append(future)
results = client.gather(futures)
print(results[:])
我以这种方式在 2 个独立的 shell 中启动一个 dask 集群:
dask-scheduler
和
dask-worker --nworkers 1 --nthreads 1 localhost:8786
所以一切都在本地系统上运行,我应该使用单个进程进行计算。
然而,无论我做什么,执行总是需要大约 4 秒 + 多一点(睡眠时间 + 一些执行的东西)。
我不明白,因为这里应该需要 4 * 8 秒来执行,因为我只有一个进程作为工作进程。即使将 100 项数组作为输入 ([10] * 100),它仍然需要 4 秒才能执行。
注意输出总是好的结果,所以一个 20 的数组。
在调度程序端,我可以看到以下日志:
2023-03-24 14:58:08,929 - distributed.scheduler - INFO - Receive client connection: Client-4a1b3e7f-ca54-11ed-a87a-853d3d7f4042
2023-03-24 14:58:08,930 - distributed.core - INFO - Starting established connection to tcp://127.0.0.1:46334
2023-03-24 14:58:12,981 - distributed.scheduler - INFO - Remove client Client-4a1b3e7f-ca54-11ed-a87a-853d3d7f4042
2023-03-24 14:58:12,981 - distributed.core - INFO - Received 'close-stream' from tcp://127.0.0.1:46334; closing.
2023-03-24 14:58:12,982 - distributed.scheduler - INFO - Remove client Client-4a1b3e7f-ca54-11ed-a87a-853d3d7f4042
2023-03-24 14:58:12,982 - distributed.scheduler - INFO - Close client connection: Client-4a1b3e7f-ca54-11ed-a87a-853d3d7f4042
我想念什么?
这在对相关问题的回答中有所介绍。具体来说,您将相同的输入传递给一个函数,并且
dask
默认情况下假定它将返回相同的值,因此它只计算一次任务。
如果你想要重新计算函数的行为,即使输入是相同的,使用
pure=False
kwarg:
future = client.submit(costly_simulation, p, pure=False)