Dask数据湖的做法正确吗?

问题描述 投票:0回答:0

所以我使用Dask来存储大量的数据。我们每天有大约5000万行新数据。栏宽不多。我目前使用df.to_parquet(long_term_storage_directory)存储数据。当我得到新的数据时,我把这个附加到long_term_storage_directory目录中。一切都还好,但速度很慢。

正在使用的索引是时间,我希望当我添加数据时,它会简单地被添加到long_term_storage_directory中的长长的parquet文件列表中。(long_term_storage_directory也是由同一个时间字段索引的)我担心我所采取的方法有一定的缺陷。也许我需要使用火花或其他东西来存储数据?

注意:ddf_new_data的索引与ddf_long_term_storage_directory中使用的索引相同。 我希望由于新进来的数据与当前长期数据存储目录中的数据具有相同的索引,将数据添加到长期数据存储中会更快。

ddf_long_term_storage_directory = dd.read_parquet(path=long_term_storage_directory, engine='pyarrow')
ddf_new_data = dd.read_parquet(path=directory_to_add_to_long_term_storage, engine='pyarrow')

ddf_new_data = ddf_new_data.set_index(index_name, sorted=False, drop=True)

ddf = dd.concat([ddf_long_term_storage_directory, ddf_new_data], axis=0)
ddf = ddf.repartition(partition_size='200MB') #??? Do I need to do this every time I add new data
ddf.to_parquet(long_term_storage_directory)

dask dask-dataframe
© www.soinside.com 2019 - 2024. All rights reserved.