如何对每个worker上的dask进行排队以允许顺序执行进程?

问题描述 投票:1回答:1

我需要工人一次处理一个任务,并在开始新任务之前完成当前流程。我无法做到:(1)每个工人最多只能运行一个任务,(2)让工人在开始新程序之前完成一个程序;原子交易。

我在具有40个节点的集群上使用dask.distributed Client;每个4芯和15GB内存。我处理的管道具有大约8-10GB的任务,因此在工作上有两个任务将导致应用程序的失败。

我尝试用dask-worker scheduler-ip:port --nprocs 1 --resources process=1futures = [client.submit(func, f, resources={'process': 1}) for f in futures]分配我的工作人员资源和任务分配但是没有成功。

我的代码如下:

import dask
from dask.distributed import Client


@dask.delayed
def load():
  ...


@dask.delayed
def foo():
  ...


@dask.delayed
def save():
  ...

client = Client(scheduler-ip:port)

# Process file from a given path
paths = ['list', 'of', 'path']

results = []
for path in paths:
  img = load(path)

  for _ in range(n):
    img = foo(img)

  results.append(save(output-filename))

client.scatter(results)
futures = client.compute(results)

def identity(x):
  return x
client.scatter(futures)
futures = [client.submit(same, f, resources={'process': 1}) for f in futures]

client.gather(futures)

截至目前,我有两个案例:

1-我运行所有输入,应用程序终止与MemoryError

2-我运行一个子样本,但它运行如下:

负载(IMG-1) - >负载(IMG-2) - > FOO(IMG-1) - >负载(IMG-3) - > ...->保存(IMG-1) - >保存(IMG-2 ) - > ...

TLDR:这是我想对每个工作人员做的事情:

负载(IMG-1) - > FOO(IMG-1) - >保存(IMG-1) - >负载(IMG-7) - > ...

dask dask-distributed dask-delayed
1个回答
0
投票

这里最简单的事情可能是只用一个线程启动你的工人

dask-worker ... --nthreads 1

那个工人一次只能做一件事

© www.soinside.com 2019 - 2024. All rights reserved.