Dask 分布式 - 如何为每个工作线程运行一个任务,使该任务在工作线程可用的所有核心上运行?

问题描述 投票:0回答:2

我对使用

distributed
python 库非常陌生。我有 4 个工作线程,并且我已经为每个工作线程使用 14 个核心(在 16 个可用核心中)成功启动了一些并行运行,从而导致 4*14=56 个任务并行运行。

但是,如果我只想让每个工人同时执行一项任务,该怎么办?这样,我期望在工作线程上并行使用 14 个核心来执行一项任务。

python dask-distributed cpu-cores
2个回答
6
投票

Dask 工作线程维护一个用于启动任务的线程池。每个任务始终消耗该池中的一个线程。您无法告诉任务从该池中获取许多线程。

但是,还有其他方法可以控制和限制 Dask Worker 内的并发性。在您的情况下,您可能会考虑定义工人资源。这可以让您阻止许多大任务同时在相同的工作线程上运行。

在以下示例中,我们定义每个工作人员拥有一个

Foo
资源,并且每个任务需要一个
Foo
才能运行。这将阻止任何两个任务在同一工作线程上同时运行。

dask-worker scheduler-address:8786 --resources Foo=1
dask-worker scheduler-address:8786 --resources Foo=1

.

from dask.distributed import Client
client = Client('scheduler-address:8786')
futures = client.map(my_expensive_function, ..., resources={'Foo': 1})

3
投票

这里是一个示例,说明您希望在 python 中而不是在命令行中启动工作程序时分配资源限制:

from dask.distributed import Client
from dask import delayed
import time
import os

client_with_foo = Client(processes = False,
    n_workers= 2,
    threads_per_worker=10,
    resources = {'foo':1}
               )

@delayed
def do_work(cmd=None, interval=2):
    time.sleep(interval)
    return None


task_graph = []
for i in range(10):
    task_graph.append(do_work())

start = time.time()
result = client_with_foo.compute(task_graph, resources = {'foo':1})
output = client_with_foo.gather(result)
end = time.time()
print(end - start)

分布在两个工作人员之间的 10 个 2 秒任务需要 10 秒才能执行,因此上述代码的输出约为 10。

© www.soinside.com 2019 - 2024. All rights reserved.