熊猫:管理大型CSV文件:组+在新文件中排序?

问题描述 投票:2回答:2

我有一个非常大的csv文件要通过此过程进行管理:

  • 按3列将文件分组
  • 对于每个组,将数据框分为5列
  • 将此数据帧写入csv文件中

这是我的第一次尝试:

file = pd.read_csv('file.csv')
grouped = file.groupby([col1, col2, col3])
for key, df in grouped: 
    name = 'key.csv'
    df = df.sort_values(by=[col4, col5, col6, col7, col8])
    df.to_csv(name , index=False)
    yield name 

此方法的要点:我可以在每次迭代时产生文件名,然后继续执行文件的ETL过程,而无需等待其他文件准备好,因此在写csv之前我直接对数据帧进行了排序。

坏点:文件太大,无法像这样处理,我遇到了内存错误。

所以,我的第二个(也是当前的)尝试:

list_files = []
for chunk in pd.read_csv('file.csv', chunksize=CHUNKSIZE):
    grouped = chunk.groupby([col1, col2, col3])
    for key, df in grouped:
        name = 'key.csv'
        if Path(name).exists():
            df.to_csv(name, index=False, header=False, mode='a')
        else:
            list_files.append(name)
            df.to_csv(name, index=False)
yield list_files

这里:没有内存问题,因为我读取了带有块的文件。

但是,正如您所看到的,因为我将数据追加到文件(如果存在),因此数据不会排序。因此,我需要产生所有文件的列表,并创建另一个函数:

def sort(list_files):
    for filename in list_files:
        df = pd.read_csv(filename)
        df = df.sort_value(..)
        df.to_csv(filename)
        yield filename

因此,我需要再次读取每个文件,在这里,该过程需要先创建所有list_files,然后才能传递到ETL过程的下一步

关于此,您是否知道是否有办法(我目前看不到)来解决内存错误问题,并以更快的方式完成此分组/排序过程?也许(当然)是不可能的,但是任何改进都会有所帮助(以一种更聪明的方式将数据追加到文件中,然后就可以对数据进行排序了?)

谢谢

编辑:也许一种方法可以是在读取之前对大文件进行排序,但是我又遇到了内存问题,不知道除了熊猫还有别的方法会更好吗?

python pandas csv out-of-memory
2个回答
0
投票

Dask实现了大多数熊猫的功能,并且不会产生MemoryError(显然,性能不会那么出色)。类似情况:Killed/MemoryError when creating a large dask.dataframe from delayed collection


0
投票

我去过那里,我建议您使用Dask,它为分析https://dask.org/提供了高级并行性,与Spark的功能有些相似。然后,您可以使用与第一次尝试中相同的代码

© www.soinside.com 2019 - 2024. All rights reserved.