我正在将64个压缩的csv文件(可能是70-80 GB)读入一个dask数据帧,然后使用聚合运行groupby。
作业从未完成,因为groupby只创建一个只有一个分区的数据框。
This post和this post已经解决了这个问题,但是当你的结果数据框太大时,他们只关注计算图而不是你遇到的内存问题。
我尝试了重新分配的解决方法,但工作仍然没有完成。
我做错了什么,我必须使用map_partition吗?这非常令人困惑,因为我预计Dask即使在聚合操作之后也会处理所有内容。
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1, memory_limit='8GB',diagnostics_port=5000)
client
dask.config.set(scheduler='processes')
dB3 = dd.read_csv("boden/expansion*.csv", # read in parallel
blocksize=None, # 64 files
sep=',',
compression='gzip'
)
aggs = {
'boden': ['count','min']
}
dBSelect=dB3.groupby(['lng','lat']).agg(aggs).repartition(npartitions=64)
dBSelect=dBSelect.reset_index()
dBSelect.columns=['lng','lat','bodenCount','boden']
dBSelect=dBSelect.drop('bodenCount',axis=1)
with ProgressBar(dt=30): dBSelect.compute().to_parquet('boden/final/boden_final.parq',compression=None)
大多数groupby聚合输出很小,很容易适合一个分区。显然,情况并非如此。
要解决此问题,您应该使用split_out=
参数进行groupby聚合,以请求一定数量的输出分区。
df.groupby(['x', 'y', 'z']).mean(split_out=10)
请注意,使用split_out=
将显着增加任务图的大小(它必须提前轻微地对数据进行混洗/排序),因此可能会增加调度开销。