我在多个图像上运行管道。管道包括从文件系统读取图像,对每个图像进行处理,然后将图像保存到文件系统。但是,由于MemoryError,dask worker失败了。有没有办法确保dask工作者不会在内存中加载太多图像?即等到工作人员有足够的空间,然后再在新图像上启动处理管道。
我有一个调度程序和40个工作人员,4核,15GB内存和运行Centos7。我试图批量处理125张图像;每个图像都相当大,但小到足以适合工人;大约3GB需要整个过程。
我试图处理少量的图像,效果很好。
EDITED
from dask.distributed import Client, LocalCluster
# LocalCluster is used to show the config of the workers on the actual cluster
client = Client(LocalCluster(n_workers=2, resources={'process': 1}))
paths = ['list', 'of', 'paths']
# Read the file data from each path
data = client.map(read, path, resources={'process': 1)
# Apply foo to the data n times
for _ in range(n):
data = client.map(foo, x, resources={'process': 1)
# Save the processed data
data.map(save, x, resources={'process': 1)
# Retrieve results
client.gather(data)
我预计图像会在工作空间可用时进行处理,但似乎图像全部同时加载到不同的工作人员上。
编辑:我的问题是所有任务都分配给工人,他们没有足够的内存。我发现如何限制一个工人在一个时刻处理的任务数量[https://distributed.readthedocs.io/en/latest/resources.html#resources-are-applied-separately-to-each-worker-process](see here)。但是,有了这个限制,当我执行我的任务时,他们都完成了读取步骤,然后是处理步骤,最后是保存步骤。这是一个问题,因为图像溢出到磁盘。
是否有办法在开始新任务之前完成每项任务?例如在Worker-1上:读取(img1) - >进程(img1) - >保存(img1) - >读取(img2) - > ...
Dask通常不知道任务需要多少内存,它只能知道输出的大小,并且只有在完成后才能知道。这是因为Dask只是执行一个pthon函数,然后等待它完成;但所有事情都可以在python函数中发生。您通常应该期望尽可能多的任务开始,因为您有可用的工作核心 - 正如您所发现的那样。
如果你想要一个较小的总内存负载,那么你的解决方案应该很简单:拥有足够少的工作人员,这样如果他们都使用你可以期望的最大内存,你仍然有一些备用系统来应对。
编辑:你可能想在提交之前尝试在图表上运行优化(尽管这应该发生,我认为),因为它听起来像你的线性任务链应该“融合”。 http://docs.dask.org/en/latest/optimize.html