[加载多个实木复合地板文件时保留dask数据帧划分

问题描述 投票:1回答:1

我在数据帧中有一些时间序列数据,其中时间作为索引。对索引进行排序,并将数据存储在多个实木复合地板文件中,每个文件中包含一天的数据。我使用dask 2.9.1

当我从一个实木复合地板文件加载数据时,分区设置正确。

当我从多个文件加载数据时,在得到的dask数据帧中没有得到定义。

下面的示例说明了问题:

import pandas as pd 
import pandas.util.testing as tm
import dask.dataframe as dd

df = tm.makeTimeDataFrame( 48, "H")

df1 = df[:24].sort_index()
df2 = df[24:].sort_index()
dd.from_pandas( df1, npartitions=1 ).to_parquet( "df1d.parq", engine="fastparquet" ) 
dd.from_pandas( df2, npartitions=1 ).to_parquet( "df2d.parq", engine="fastparquet" )
ddf = dd.read_parquet( "df*d.parq", infer_divisions=True, sorted_index=True, engine="fastparquet"  ) 
print(ddf.npartitions, ddf.divisions)

这里有2个分区,(None, None, None)作为分区

我可以让dd.read_parquet将分区设置为实际值吗?


更新

根据我的实际数据,我现在有一个实木复合地板文件。

文件是通过保存来自使用时间戳作为索引的数据帧中的数据来创建的。索引已排序。每个文件的大小为100-150MB,当加载到内存中时,它使用的应用程序为2.5GB RAM,激活索引非常重要,因为重新创建索引确实很繁琐。

我没有设法在read_parquet上找到参数或引擎的组合,从而无法在加载时创建分区。

数据文件被命名为“ yyyy-mm-dd.parquet”,所以我不得不根据该信息创建分区:

from pathlib import Path
files = list (Path("e:/data").glob("2019-06-*.parquet") )
divisions = [  pd.Timestamp( f.stem) for f in files ] + [ pd.Timestamp( files[-1].stem) + pd.Timedelta(1, unit='D' ) ]
ddf = dd.read_parquet( files )
ddf.divisions = divisions

这无法使用索引,在某些情况下,它失败并显示“ TypeError:只能将元组(不是“列表”)连接到元组”

然后我试图将除法设置为元组ddf.divisions = tuple(divisions),然后它起作用了。如果索引设置正确,那么快速就会令人印象深刻]


更新2

一种更好的方法是分别读取dask数据帧,然后将它们串联:

from pathlib import Path
import dask.dataframe as dd
files = list (Path("e:/data").glob("2019-06-*.parquet") )
ddfs = [ dd.read_parquet( f ) for f in files ]
ddf = dd.concat(ddfs, axis=0)

这样设置分隔,它还解决了另一个问题,即随着时间的推移处理列的增加。

python dataframe dask fastparquet
1个回答
0
投票

下面,我将原始问题重写为使用concat,这解决了我的问题

import pandas as pd 
import pandas.util.testing as tm
import dask.dataframe as dd

# create two example parquet files
df = tm.makeTimeDataFrame( 48, "H")
df1 = df[:24].sort_index()
df2 = df[24:].sort_index()
dd.from_pandas( df1, npartitions=1 ).to_parquet( "df1d.parq" ) 
dd.from_pandas( df2, npartitions=1 ).to_parquet( "df2d.parq" )

# read the files and concatenate
ddf = dd.concat([dd.read_parquet( d ) for d in ["df1d.parq", "df2d.parq"] ], axis=0)

print(ddf.npartitions, ddf.divisions)

我仍然可以得到预期的2个分区,但是现在的分区是(Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00'), Timestamp('2000-01-02 23:00:00'))

© www.soinside.com 2019 - 2024. All rights reserved.