Pandas / Dask - 对大型 CSV 进行分组和聚合会消耗内存和/或需要相当长的时间

问题描述 投票:0回答:1

我正在尝试一个小的 POC 来尝试分组和聚合,以减少 pandas 和 Dask 中大型 CSV 的数据,并且我观察到内存使用率很高和/或比我预期的处理时间慢......有人吗对于 python/pandas/dask 菜鸟来说,有什么改进的建议吗?

背景

我请求构建一个文件摄取工具,该工具可以:

  1. 能够接收几 GB 的文件,其中每行包含用户 ID 和一些其他信息
  2. 做一些转变
  3. 将数据减少到
    { user -> [collection of info]}
  4. 批量发送这些数据到我们的网络服务

根据我的研究,由于文件只有几个GB,我发现Spark等会大材小用,而Pandas/Dask可能是一个不错的选择,因此POC。

问题

  • 处理 1GB csv 对于 pandas 和 Dask 来说都需要大约 1 分钟,并且对于 pandas 消耗 1.5GB 内存,对于 dask 消耗 9GB 内存(!!!)
  • 处理 2GB csv 需要大约 3 分钟,对于 pandas 需要 2.8GB 内存,Dask 崩溃了!

我在这里做错了什么?

  • 对于 pandas,由于我正在以小块的形式处理 CSV,所以我没想到 RAM 使用率会如此高
  • 对于 Dask,我在网上读到的所有内容都表明 Dask 在由
    blocksize
    指示的块中处理 CSV,因此 ram 使用量应该为
    blocksize * size per block
    ,但当块大小达到 9GB 时,我不认为总计会达到 9GB只有6.4MB。我不知道为什么它的内存使用量会飙升至 9GB 对于 1GB csv 输入

(注意:如果我不设置块大小,即使输入 1GB,dask 也会崩溃)

我的代码

我无法共享 CSV,但它有 1 个整数列,后跟 8 个文本列。下面引用的

user_id
order_id
列都是文本列。

  • 1GB csv 有 14000001 行
  • 2GB csv 有 28000001 行
  • 5GB csv 有 70000001 行

我用随机数据生成了这些 csv,并且

user_id
列是我从 10 个预先随机生成的值中随机挑选的,所以我预计最终输出是 10 个用户 ID,每个用户 ID 都有一个谁知道有多少订单的集合id。

熊猫

#!/usr/bin/env python3
from pandas import DataFrame, read_csv
import pandas as pd
import sys

test_csv_location = '1gb.csv'
chunk_size = 100000
pieces = list()

for chunk in pd.read_csv(test_csv_location, chunksize=chunk_size, delimiter='|', iterator=True):
    df = chunk.groupby('user_id')['order_id'].agg(size= len,list= lambda x: list(x))
    pieces.append(df)
final = pd.concat(pieces).groupby('user_id')['list'].agg(size= len,list=sum)

final.to_csv('pandastest.csv', index=False)

达斯克

#!/usr/bin/env python3
from dask.distributed import Client
import dask.dataframe as ddf
import sys

test_csv_location = '1gb.csv'
df = ddf.read_csv(test_csv_location, blocksize=6400000, delimiter='|')

# For each user, reduce to a list of order ids
grouped = df.groupby('user_id')
collection = grouped['order_id'].apply(list, meta=('order_id', 'f8'))

collection.to_csv('./dasktest.csv', single_file=True)
python pandas dataframe dask dask-dataframe
1个回答
1
投票

groupby
操作的成本很高,因为
dask
将尝试在工作人员之间洗牌数据以检查谁拥有哪些
user_id
值。如果
user_id
有很多唯一值(听起来像这样),则需要在工作线程/分区之间进行大量交叉检查。

至少有两种方法可以解决:

  1. user_id
    设置为索引。这在索引阶段会很昂贵,但后续操作会更快,因为现在 dask 不必检查每个分区的
    user_id
    值。
df = df.set_index('user_id')
collection = df.groupby('user_id')['order_id'].apply(list, meta=('order_id', 'f8'))
collection.to_csv('./dasktest.csv', single_file=True)
  1. 如果您的文件具有您了解的结构,例如作为一个极端的例子,如果
    user_id
    进行了某种排序,那么首先 csv 文件仅包含以 1 (或 A,或使用的任何其他符号)开头的
    user_id
    值,然后是 2 等,那么您可以使用该信息以仅在这些“块”内需要
    groupby
    的方式在“块”(松散术语)中形成分区。
© www.soinside.com 2019 - 2024. All rights reserved.