优化dask.分布式调度,减少数据量。

问题描述 投票:0回答:1

我有一个关于dask.distributed中任务的调度执行顺序的问题,对于一个大型原始数据集的强数据减少的情况。

我们正在使用dask.distributed用于从电影帧中提取信息的代码。它的具体应用是晶体学,但很一般的步骤是。

  1. 读取存储在HDF5文件中作为3D数组的电影帧(或其中的几个数组)到dask数组中。这显然是相当重的IO
  2. 将这些帧分组为连续的子堆栈,通常是10张移动静态照片,将其中的帧进行汇总(求和或平均),从而得到一张2D图像。
  3. 在2D图像上运行几个计算量很大的分析函数(比如某些特征的位置),返回一个结果字典,与电影本身相比,这个结果可以忽略不计。

我们通过使用dask.array API来实现步骤1和步骤2(后者使用了 map_blocks 的一个或几个聚合子栈的区块块大小),然后将数组块转换为dask.delayed对象(使用 to_delayed),它们被传递给一个做实际数据还原函数的函数(步骤3)。我们注意正确地对齐步骤2中的HDF5数组、dask计算和聚合范围的分块,使每个最终延迟对象的任务图(元素的 tasks)是非常干净的。下面是示例代码。

def sum_sub_stacks(mov):
    # aggregation function
    sub_stk = []
    for k in range(mov.shape[0]//10):
        sub_stk.append(mov[k*10:k*10+10,...].sum(axis=0, keepdims=True))
    return np.concatenate(sub_stk)

def get_info(mov):
    # reduction function
    results = []
    for frame in mov:
        results.append({
            'sum': frame.sum(),
            'variance': frame.var()
            # ...actually much more complex/expensive stuff
        })
    return results

# connect to dask.distributed scheduler
client = Client(address='127.0.0.1:8786')

# 1: get the movie
fh = h5py.File('movie_stack.h5')
movie = da.from_array(fh['/entry/data/raw_counts'], chunks=(100,-1,-1))

# 2: sum sub-stacks within movie
movie_aggregated = movie.map_blocks(sum_sub_stacks, 
                                    chunks=(10,) + movie.chunks[1:],
                                    dtype=movie.dtype)

# 3: create and run reduction tasks
tasks = [delayed(get_info)(chk) 
         for chk in movie_aggregated.to_delayed().ravel()]

info = client.compute(tasks, sync=True)

理想的操作调度显然是让每个worker在一个chunk上执行1-2-3的序列,然后再转到下一个,这样可以保持IO负载不变,CPU最大,内存最小。

相反,发生的情况是,首先所有的工作者都试图从文件中读取尽可能多的chunks(第1步),这就形成了一个IO瓶颈,并迅速耗尽工作者的内存,导致本地驱动器的颤动。通常情况下,在某些时候,worker最终会移动到步骤23,这可以快速释放内存并适当使用所有CPU,但在其他情况下,worker会以一种不协调的方式被杀死,或者整个计算停滞。另外中间的情况也会发生,幸存的工人只在一段时间内表现合理。

有没有什么方法可以给调度器提示,让它按照上述的优先顺序处理任务,或者有其他方法来改善调度行为?还是说这种代码方式本身就有一些愚蠢的地方?

python dask dask-distributed
1个回答
0
投票

首先,你所做的事情本质上一点也不愚蠢!

一般来说,Dask 试图减少它所持有的临时性数量,它也会平衡这一点和并行性(图的宽度和工人数量)。 调度是复杂的,Dask还使用了另一种优化,它将任务融合在一起,使其更加优化。对于很多小块任务,你可能会遇到问题。https:/docs.dask.orgenlatestarray-best-practices.html?highlight=chunk%20size#select-a-good-chunk-size。

Dask确实有一些 优化配置 我建议您在考虑了其他块状尺寸后再使用。 我也鼓励你阅读 下期 因为围绕着调度配置进行了健康的讨论。

最后,您可以考虑增加 内存配置 因为您可能希望更严格地控制每个工人应该使用多少内存。

© www.soinside.com 2019 - 2024. All rights reserved.