为什么dask worker因“小”大小的任务导致MemoryError失败? [Dask.bag]

问题描述 投票:1回答:1

我在多个图像上运行管道。管道包括从文件系统读取图像,对每个图像进行处理,然后将图像保存到文件系统。但是,由于MemoryError,dask worker失败了。有没有办法确保dask工作者不会在内存中加载太多图像?即等到工作人员有足够的空间,然后再在新图像上启动处理管道。

我有一个调度程序和40个工作人员,4核,15GB内存和运行Centos7。我试图批量处理125张图像;每个图像都相当大,但小到足以适合工人;大约3GB需要整个过程。

我试图处理少量的图像,效果很好。

EDITED

from dask.distributed import Client, LocalCluster

# LocalCluster is used to show the config of the workers on the actual cluster
client = Client(LocalCluster(n_workers=2, resources={'process': 1}))

paths = ['list', 'of', 'paths']

# Read the file data from each path
data = client.map(read, path, resources={'process': 1)

# Apply foo to the data n times
for _ in range(n):
    data = client.map(foo, x, resources={'process': 1)

# Save the processed data
data.map(save, x, resources={'process': 1)

# Retrieve results
client.gather(data)

我预计图像会在工作空间可用时进行处理,但似乎图像全部同时加载到不同的工作人员上。

编辑:我的问题是所有任务都分配给工人,他们没有足够的内存。我发现如何限制一个工人在一个时刻处理的任务数量[https://distributed.readthedocs.io/en/latest/resources.html#resources-are-applied-separately-to-each-worker-process](see here)。但是,有了这个限制,当我执行我的任务时,他们都完成了读取步骤,然后是处理步骤,最后是保存步骤。这是一个问题,因为图像溢出到磁盘。

是否有办法在开始新任务之前完成每项任务?例如在Worker-1上:读取(img1) - >进程(img1) - >保存(img1) - >读取(img2) - > ...

dask dask-distributed
1个回答
1
投票

Dask通常不知道任务需要多少内存,它只能知道输出的大小,并且只有在完成后才能知道。这是因为Dask只是执行一个pthon函数,然后等待它完成;但所有事情都可以在python函数中发生。您通常应该期望尽可能多的任务开始,因为您有可用的工作核心 - 正如您所发现的那样。

如果你想要一个较小的总内存负载,那么你的解决方案应该很简单:拥有足够少的工作人员,这样如果他们都使用你可以期望的最大内存,你仍然有一些备用系统来应对。

编辑:你可能想在提交之前尝试在图表上运行优化(尽管这应该发生,我认为),因为它听起来像你的线性任务链应该“融合”。 http://docs.dask.org/en/latest/optimize.html

© www.soinside.com 2019 - 2024. All rights reserved.