我有一个非常大的csv文件要通过此过程进行管理:
这是我的第一次尝试:
file = pd.read_csv('file.csv')
grouped = file.groupby([col1, col2, col3])
for key, df in grouped:
name = 'key.csv'
df = df.sort_values(by=[col4, col5, col6, col7, col8])
df.to_csv(name , index=False)
yield name
此方法的要点:我可以在每次迭代时产生文件名,然后继续执行文件的ETL过程,而无需等待其他文件准备好,因此在写csv之前我直接对数据帧进行了排序。
坏点:文件太大,无法像这样处理,我遇到了内存错误。
所以,我的第二个(也是当前的)尝试:
list_files = []
for chunk in pd.read_csv('file.csv', chunksize=CHUNKSIZE):
grouped = chunk.groupby([col1, col2, col3])
for key, df in grouped:
name = 'key.csv'
if Path(name).exists():
df.to_csv(name, index=False, header=False, mode='a')
else:
list_files.append(name)
df.to_csv(name, index=False)
yield list_files
这里:没有内存问题,因为我读取了带有块的文件。
但是,正如您所看到的,因为我将数据追加到文件(如果存在),因此数据不会排序。因此,我需要产生所有文件的列表,并创建另一个函数:
def sort(list_files):
for filename in list_files:
df = pd.read_csv(filename)
df = df.sort_value(..)
df.to_csv(filename)
yield filename
因此,我需要再次读取每个文件,在这里,该过程需要先创建所有list_files,然后才能传递到ETL过程的下一步
关于此,您是否知道是否有办法(我目前看不到)来解决内存错误问题,并以更快的方式完成此分组/排序过程?也许(当然)是不可能的,但是任何改进都会有所帮助(以一种更聪明的方式将数据追加到文件中,然后就可以对数据进行排序了?)
谢谢
编辑:也许一种方法可以是在读取之前对大文件进行排序,但是我又遇到了内存问题,不知道除了熊猫还有别的方法会更好吗?
Dask实现了大多数熊猫的功能,并且不会产生MemoryError(显然,性能不会那么出色)。类似情况:Killed/MemoryError when creating a large dask.dataframe from delayed collection
我去过那里,我建议您使用Dask,它为分析https://dask.org/提供了高级并行性,与Spark的功能有些相似。然后,您可以使用与第一次尝试中相同的代码