我有一个关于dask.distributed中任务的调度执行顺序的问题,对于一个大型原始数据集的强数据减少的情况。
我们正在使用dask.distributed用于从电影帧中提取信息的代码。它的具体应用是晶体学,但很一般的步骤是。
我们通过使用dask.array API来实现步骤1和步骤2(后者使用了 map_blocks
的一个或几个聚合子栈的区块块大小),然后将数组块转换为dask.delayed对象(使用 to_delayed
),它们被传递给一个做实际数据还原函数的函数(步骤3)。我们注意正确地对齐步骤2中的HDF5数组、dask计算和聚合范围的分块,使每个最终延迟对象的任务图(元素的 tasks
)是非常干净的。下面是示例代码。
def sum_sub_stacks(mov):
# aggregation function
sub_stk = []
for k in range(mov.shape[0]//10):
sub_stk.append(mov[k*10:k*10+10,...].sum(axis=0, keepdims=True))
return np.concatenate(sub_stk)
def get_info(mov):
# reduction function
results = []
for frame in mov:
results.append({
'sum': frame.sum(),
'variance': frame.var()
# ...actually much more complex/expensive stuff
})
return results
# connect to dask.distributed scheduler
client = Client(address='127.0.0.1:8786')
# 1: get the movie
fh = h5py.File('movie_stack.h5')
movie = da.from_array(fh['/entry/data/raw_counts'], chunks=(100,-1,-1))
# 2: sum sub-stacks within movie
movie_aggregated = movie.map_blocks(sum_sub_stacks,
chunks=(10,) + movie.chunks[1:],
dtype=movie.dtype)
# 3: create and run reduction tasks
tasks = [delayed(get_info)(chk)
for chk in movie_aggregated.to_delayed().ravel()]
info = client.compute(tasks, sync=True)
理想的操作调度显然是让每个worker在一个chunk上执行1-2-3的序列,然后再转到下一个,这样可以保持IO负载不变,CPU最大,内存最小。
相反,发生的情况是,首先所有的工作者都试图从文件中读取尽可能多的chunks(第1步),这就形成了一个IO瓶颈,并迅速耗尽工作者的内存,导致本地驱动器的颤动。通常情况下,在某些时候,worker最终会移动到步骤23,这可以快速释放内存并适当使用所有CPU,但在其他情况下,worker会以一种不协调的方式被杀死,或者整个计算停滞。另外中间的情况也会发生,幸存的工人只在一段时间内表现合理。
有没有什么方法可以给调度器提示,让它按照上述的优先顺序处理任务,或者有其他方法来改善调度行为?还是说这种代码方式本身就有一些愚蠢的地方?
首先,你所做的事情本质上一点也不愚蠢!
一般来说,Dask 试图减少它所持有的临时性数量,它也会平衡这一点和并行性(图的宽度和工人数量)。 调度是复杂的,Dask还使用了另一种优化,它将任务融合在一起,使其更加优化。对于很多小块任务,你可能会遇到问题。https:/docs.dask.orgenlatestarray-best-practices.html?highlight=chunk%20size#select-a-good-chunk-size。
Dask确实有一些 优化配置 我建议您在考虑了其他块状尺寸后再使用。 我也鼓励你阅读 下期 因为围绕着调度配置进行了健康的讨论。
最后,您可以考虑增加 内存配置 因为您可能希望更严格地控制每个工人应该使用多少内存。