具有综合测试的任务分配

问题描述 投票:0回答:1

我正在尝试使用 Dask 在多个系统上分配计算。 但是,有一些我无法理解的概念,因为我无法通过用于 python mutliprocessing 的简单测试重现逻辑行为。

我正在使用这个非常天真的代码:

import dask
from dask.distributed import Client
import time

def costly_simulation(p):
    time.sleep(4)
    return p * 2

if __name__ == "__main__":

    client = Client('localhost:8786')

    input_array = [10, 10, 10, 10, 10, 10, 10, 10]

    futures = []
    for p in input_array:
        future = client.submit(costly_simulation, p)
        futures.append(future)

    results = client.gather(futures)
    print(results[:])

我以这种方式在 2 个独立的 shell 中启动一个 dask 集群:

dask-scheduler

dask-worker --nworkers 1 --nthreads 1 localhost:8786

所以一切都在本地系统上运行,我应该使用单个进程进行计算。

然而,无论我做什么,执行总是需要大约 4 秒 + 多一点(睡眠时间 + 一些执行的东西)。

我不明白,因为这里应该需要 4 * 8 秒来执行,因为我只有一个进程作为工作进程。即使将 100 项数组作为输入 ([10] * 100),它仍然需要 4 秒才能执行。

注意输出总是好的结果,所以一个 20 的数组。

在调度程序端,我可以看到以下日志:

2023-03-24 14:58:08,929 - distributed.scheduler - INFO - Receive client connection: Client-4a1b3e7f-ca54-11ed-a87a-853d3d7f4042
2023-03-24 14:58:08,930 - distributed.core - INFO - Starting established connection to tcp://127.0.0.1:46334
2023-03-24 14:58:12,981 - distributed.scheduler - INFO - Remove client Client-4a1b3e7f-ca54-11ed-a87a-853d3d7f4042
2023-03-24 14:58:12,981 - distributed.core - INFO - Received 'close-stream' from tcp://127.0.0.1:46334; closing.
2023-03-24 14:58:12,982 - distributed.scheduler - INFO - Remove client Client-4a1b3e7f-ca54-11ed-a87a-853d3d7f4042
2023-03-24 14:58:12,982 - distributed.scheduler - INFO - Close client connection: Client-4a1b3e7f-ca54-11ed-a87a-853d3d7f4042

我想念什么?

python dask
1个回答
0
投票

这在对相关问题的回答中有所介绍。具体来说,您将相同的输入传递给一个函数,并且

dask
默认情况下假定它将返回相同的值,因此它只计算一次任务。

如果你想要重新计算函数的行为,即使输入是相同的,使用

pure=False
kwarg:

future = client.submit(costly_simulation, p, pure=False)
© www.soinside.com 2019 - 2024. All rights reserved.