dask dataframe groupby导致一个分区内存问题

问题描述 投票:1回答:1

我正在将64个压缩的csv文件(可能是70-80 GB)读入一个dask数据帧,然后使用聚合运行groupby。

作业从未完成,因为groupby只创建一个只有一个分区的数据框。

This postthis post已经解决了这个问题,但是当你的结果数据框太大时,他们只关注计算图而不是你遇到的内存问题。

我尝试了重新分配的解决方法,但工作仍然没有完成。

我做错了什么,我必须使用map_partition吗?这非常令人困惑,因为我预计Dask即使在聚合操作之后也会处理所有内容。

    from dask.distributed import Client, progress
    client = Client(n_workers=4, threads_per_worker=1, memory_limit='8GB',diagnostics_port=5000)
    client

    dask.config.set(scheduler='processes')
    dB3 = dd.read_csv("boden/expansion*.csv",  # read in parallel
                 blocksize=None, # 64 files
                 sep=',',
                 compression='gzip'
    )

    aggs = {
      'boden': ['count','min']
    }
    dBSelect=dB3.groupby(['lng','lat']).agg(aggs).repartition(npartitions=64) 
    dBSelect=dBSelect.reset_index()
    dBSelect.columns=['lng','lat','bodenCount','boden']
    dBSelect=dBSelect.drop('bodenCount',axis=1)
    with ProgressBar(dt=30): dBSelect.compute().to_parquet('boden/final/boden_final.parq',compression=None)  
group-by dask dask-distributed
1个回答
1
投票

大多数groupby聚合输出很小,很容易适合一个分区。显然,情况并非如此。

要解决此问题,您应该使用split_out=参数进行groupby聚合,以请求一定数量的输出分区。

df.groupby(['x', 'y', 'z']).mean(split_out=10)

请注意,使用split_out=将显着增加任务图的大小(它必须提前轻微地对数据进行混洗/排序),因此可能会增加调度开销。

© www.soinside.com 2019 - 2024. All rights reserved.