我正在尝试一个小的 POC 来尝试分组和聚合,以减少 pandas 和 Dask 中大型 CSV 的数据,并且我观察到内存使用率很高和/或比我预期的处理时间慢......有人吗对于 python/pandas/dask 菜鸟来说,有什么改进的建议吗?
我请求构建一个文件摄取工具,该工具可以:
{ user -> [collection of info]}
根据我的研究,由于文件只有几个GB,我发现Spark等会大材小用,而Pandas/Dask可能是一个不错的选择,因此POC。
我在这里做错了什么?
blocksize
指示的块中处理 CSV,因此 ram 使用量应该为 blocksize * size per block
,但当块大小达到 9GB 时,我不认为总计会达到 9GB只有6.4MB。我不知道为什么它的内存使用量会飙升至 9GB 对于 1GB csv 输入(注意:如果我不设置块大小,即使输入 1GB,dask 也会崩溃)
我无法共享 CSV,但它有 1 个整数列,后跟 8 个文本列。下面引用的
user_id
和 order_id
列都是文本列。
我用随机数据生成了这些 csv,并且
user_id
列是我从 10 个预先随机生成的值中随机挑选的,所以我预计最终输出是 10 个用户 ID,每个用户 ID 都有一个谁知道有多少订单的集合id。
#!/usr/bin/env python3
from pandas import DataFrame, read_csv
import pandas as pd
import sys
test_csv_location = '1gb.csv'
chunk_size = 100000
pieces = list()
for chunk in pd.read_csv(test_csv_location, chunksize=chunk_size, delimiter='|', iterator=True):
df = chunk.groupby('user_id')['order_id'].agg(size= len,list= lambda x: list(x))
pieces.append(df)
final = pd.concat(pieces).groupby('user_id')['list'].agg(size= len,list=sum)
final.to_csv('pandastest.csv', index=False)
#!/usr/bin/env python3
from dask.distributed import Client
import dask.dataframe as ddf
import sys
test_csv_location = '1gb.csv'
df = ddf.read_csv(test_csv_location, blocksize=6400000, delimiter='|')
# For each user, reduce to a list of order ids
grouped = df.groupby('user_id')
collection = grouped['order_id'].apply(list, meta=('order_id', 'f8'))
collection.to_csv('./dasktest.csv', single_file=True)
groupby
操作的成本很高,因为dask
将尝试在工作人员之间洗牌数据以检查谁拥有哪些user_id
值。如果 user_id
有很多唯一值(听起来像这样),则需要在工作线程/分区之间进行大量交叉检查。
至少有两种方法可以解决:
user_id
设置为索引。这在索引阶段会很昂贵,但后续操作会更快,因为现在 dask 不必检查每个分区的 user_id
值。df = df.set_index('user_id')
collection = df.groupby('user_id')['order_id'].apply(list, meta=('order_id', 'f8'))
collection.to_csv('./dasktest.csv', single_file=True)
user_id
进行了某种排序,那么首先 csv 文件仅包含以 1 (或 A,或使用的任何其他符号)开头的 user_id
值,然后是 2 等,那么您可以使用该信息以仅在这些“块”内需要 groupby
的方式在“块”(松散术语)中形成分区。